Commit bcb30ac2 authored by Marc Vef's avatar Marc Vef Committed by Alberto Miranda
Browse files

Refactoring daemon I/O logic and fixing truncate etc.

The `ChunkStorage` backend class on the daemon was throwing `system_errors` without being caught, crashing the server in the process. `ChunkStorage` now uses a designated error class for errors that might occur. In addition the dependency to Argobots was removed which was used to trigger `ABT_eventuals`, laying ground work for future non-Argobots IO implementations. Further, the whole class was refactored for consistency and failure resistance.

A new class `ChunkOperation` is introduced which wraps Argobots' IO task operations which allows the removal of IO queue specific code within RPC handlers, i.e., read and write handlers. The idea is to separate eventuals, tasks and their arguments from handler logic into a designated class. Therefore, an object of an inherited class of `ChunkOperation` is instantiated within the handlers that drives all IO tasks. The corresponding code was added to the read and write RPC handlers. Note, `ChunkOperation` is not thread-safe and is supposed to be called by a single thread.

In addition, truncate was reworked for error handling (it crashed the server on error) and that it uses the IO queue as well since truncate causes a write operation and should not overtake IO tasks in the queue.

The chunk stat rpc handler was refactored for error handling and to use error codes as well. 

Further minor changes:
- dead chunk stat code has been removed
- some namespaces were missing: `gkfs::rpc`
- more flexible handler cleanup and response code
- fixed a bug where the chunk dir wasn't removed when the metadata didn't exist on the same node
parent 2a236e33
Loading
Loading
Loading
Loading
+0 −2
Original line number Diff line number Diff line
@@ -47,8 +47,6 @@ int gkfs_statx(int dirfd, const std::string& path, int flags, unsigned int mask,

int gkfs_statfs(struct statfs* buf);

int gkfs_statvfs(struct statvfs* buf);

off64_t gkfs_lseek(unsigned int fd, off64_t offset, unsigned int whence);

off64_t gkfs_lseek(std::shared_ptr<gkfs::filemap::OpenFile> gkfs_fd, off64_t offset, unsigned int whence);
+10 −1
Original line number Diff line number Diff line
@@ -2092,11 +2092,13 @@ struct chunk_stat {

    public:
        output() :
                m_err(),
                m_chunk_size(),
                m_chunk_total(),
                m_chunk_free() {}

        output(uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) :
        output(int32_t err, uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) :
                m_err(err),
                m_chunk_size(chunk_size),
                m_chunk_total(chunk_total),
                m_chunk_free(chunk_free) {}
@@ -2111,11 +2113,17 @@ struct chunk_stat {

        explicit
        output(const rpc_chunk_stat_out_t& out) {
            m_err = out.err;
            m_chunk_size = out.chunk_size;
            m_chunk_total = out.chunk_total;
            m_chunk_free = out.chunk_free;
        }

        int32_t
        err() const {
            return m_err;
        }

        uint64_t
        chunk_size() const {
            return m_chunk_size;
@@ -2132,6 +2140,7 @@ struct chunk_stat {
        }

    private:
        int32_t m_err;
        uint64_t m_chunk_size;
        uint64_t m_chunk_total;
        uint64_t m_chunk_free;
+22 −21
Original line number Diff line number Diff line
@@ -14,13 +14,12 @@
#ifndef GEKKOFS_CHUNK_STORAGE_HPP
#define GEKKOFS_CHUNK_STORAGE_HPP

extern "C" {
#include <abt.h>
}
#include <global/global_defs.hpp>

#include <limits>
#include <string>
#include <memory>
#include <system_error>

/* Forward declarations */
namespace spdlog {
@@ -36,42 +35,44 @@ struct ChunkStat {
    unsigned long chunk_free;
};

class ChunkStorageException : public std::system_error {
public:
    ChunkStorageException(const int err_code, const std::string& s) : std::system_error(err_code,
                                                                                        std::system_category(), s) {};
};

class ChunkStorage {
private:
    static constexpr const char* LOGGER_NAME = "ChunkStorage";

    std::shared_ptr<spdlog::logger> log;
    std::shared_ptr<spdlog::logger> log_;

    std::string root_path;
    size_t chunksize;
    std::string root_path_;
    size_t chunksize_;

    inline std::string absolute(const std::string& internal_path) const;

    static inline std::string get_chunks_dir(const std::string& file_path);

    static inline std::string get_chunk_path(const std::string& file_path, unsigned int chunk_id);
    static inline std::string get_chunk_path(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id);

    void init_chunk_space(const std::string& file_path) const;

public:
    ChunkStorage(const std::string& path, size_t chunksize);
    ChunkStorage(std::string path, size_t chunksize);

    void write_chunk(const std::string& file_path, unsigned int chunk_id,
                     const char* buff, size_t size, off64_t offset,
                     ABT_eventual& eventual) const;

    void read_chunk(const std::string& file_path, unsigned int chunk_id,
                    char* buff, size_t size, off64_t offset,
                    ABT_eventual& eventual) const;
    void destroy_chunk_space(const std::string& file_path) const;

    void trim_chunk_space(const std::string& file_path, unsigned int chunk_start,
                          unsigned int chunk_end = std::numeric_limits<unsigned int>::max());
    ssize_t
    write_chunk(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, const char* buff, size_t size,
                off64_t offset) const;

    void delete_chunk(const std::string& file_path, unsigned int chunk_id);
    ssize_t read_chunk(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, char* buf, size_t size,
                       off64_t offset) const;

    void truncate_chunk(const std::string& file_path, unsigned int chunk_id, off_t length);
    void trim_chunk_space(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_start);

    void destroy_chunk_space(const std::string& file_path) const;
    void truncate_chunk_file(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, off_t length);

    ChunkStat chunk_stat() const;
};
+8 −2
Original line number Diff line number Diff line
@@ -17,14 +17,20 @@
#include <string>
#include <stdexcept>

namespace gkfs {
namespace metadata {

class DBException : public std::runtime_error {
public:
    DBException(const std::string& s) : std::runtime_error(s) {};
    explicit DBException(const std::string& s) : std::runtime_error(s) {};
};

class NotFoundException : public DBException {
public:
    NotFoundException(const std::string& s) : DBException(s) {};
    explicit NotFoundException(const std::string& s) : DBException(s) {};
};

} // namespace metadata
} // namespace gkfs

#endif //GEKKOFS_DB_EXCEPTIONS_HPP
+27 −2
Original line number Diff line number Diff line
@@ -51,16 +51,41 @@ inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t*
    return ret;
}

template<typename I, typename O>
inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) {
template<typename O>
inline hg_return_t respond(hg_handle_t* handle, O* output) {
    auto ret = HG_SUCCESS;
    if (output && handle) {
        ret = margo_respond(*handle, output);
        if (ret != HG_SUCCESS)
            return ret;
    }
    return ret;
}

template<typename I, typename O>
inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) {
    auto ret = respond(handle, output);
    if (ret != HG_SUCCESS)
        return ret;
    return cleanup(handle, input, static_cast<O*>(nullptr), bulk_handle);
}

template<typename I, typename O>
inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output) {
    return cleanup_respond(handle, input, output, nullptr);
}

template<typename O>
inline hg_return_t cleanup_respond(hg_handle_t* handle, O* output) {
    auto ret = respond(handle, output);
    if (ret != HG_SUCCESS)
        return ret;
    if (handle) {
        ret = margo_destroy(*handle);
        if (ret != HG_SUCCESS)
            return ret;
    }
    return ret;
}

} // namespace rpc
Loading