From 89523413b4520789ef787bbb14f761807412dbbf Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Tue, 25 Feb 2020 12:08:00 +0100 Subject: [PATCH 01/10] Refactoring daemon I/O logic and fixing truncate etc. The `ChunkStorage` backend class on the daemon was throwing `system_errors` without being caught, crashing the server in the process. `ChunkStorage` now uses a designated error class for errors that might occur. In addition the dependency to Argobots was removed which was used to trigger `ABT_eventuals`, laying ground work for future non-Argobots IO implementations. Further, the whole class was refactored for consistency and failure resistance. A new class `ChunkOperation` is introduced which wraps Argobots' IO task operations which allows the removal of IO queue specific code within RPC handlers, i.e., read and write handlers. The idea is to separate eventuals, tasks and their arguments from handler logic into a designated class. Therefore, an object of an inherited class of `ChunkOperation` is instantiated within the handlers that drives all IO tasks. The corresponding code was added to the read and write RPC handlers. Note, `ChunkOperation` is not thread-safe and is supposed to be called by a single thread. In addition, truncate was reworked for error handling (it crashed the server on error) and that it uses the IO queue as well since truncate causes a write operation and should not overtake IO tasks in the queue. The chunk stat rpc handler was refactored for error handling and to use error codes as well. Further minor changes: - dead chunk stat code has been removed - some namespaces were missing: `gkfs::rpc` - more flexible handler cleanup and response code - fixed a bug where the chunk dir wasn't removed when the metadata didn't exist on the same node --- include/client/gkfs_functions.hpp | 2 - include/client/rpc/rpc_types.hpp | 11 +- include/daemon/backend/data/chunk_storage.hpp | 43 +- include/daemon/backend/exceptions.hpp | 10 +- include/daemon/handler/rpc_util.hpp | 29 +- include/daemon/ops/data.hpp | 180 ++++++++ include/daemon/ops/metadentry.hpp | 2 +- include/global/rpc/rpc_types.hpp | 147 +++--- include/global/rpc/rpc_util.hpp | 6 + src/client/gkfs_functions.cpp | 26 +- src/client/preload_util.cpp | 2 +- src/client/rpc/forward_data.cpp | 25 +- src/daemon/CMakeLists.txt | 4 +- src/daemon/backend/data/CMakeLists.txt | 9 +- src/daemon/backend/data/chunk_storage.cpp | 286 +++++++----- src/daemon/daemon.cpp | 2 +- src/daemon/handler/srv_data.cpp | 355 ++++----------- src/daemon/handler/srv_metadata.cpp | 54 ++- src/daemon/ops/data.cpp | 426 ++++++++++++++++++ src/daemon/ops/metadentry.cpp | 11 +- src/daemon/util.cpp | 2 +- src/global/rpc/rpc_util.cpp | 8 +- 22 files changed, 1118 insertions(+), 522 deletions(-) create mode 100644 include/daemon/ops/data.hpp create mode 100644 src/daemon/ops/data.cpp diff --git a/include/client/gkfs_functions.hpp b/include/client/gkfs_functions.hpp index 7ae71ce99..15e5a215f 100644 --- a/include/client/gkfs_functions.hpp +++ b/include/client/gkfs_functions.hpp @@ -37,8 +37,6 @@ int gkfs_stat(const std::string& path, struct stat* buf, bool follow_links = tru int gkfs_statfs(struct statfs* buf); -int gkfs_statvfs(struct statvfs* buf); - off64_t gkfs_lseek(unsigned int fd, off64_t offset, unsigned int whence); off64_t gkfs_lseek(std::shared_ptr gkfs_fd, off64_t offset, unsigned int whence); diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index 95babcb0c..ab1f575a8 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -2092,11 +2092,13 @@ struct chunk_stat { public: output() : + m_err(), m_chunk_size(), m_chunk_total(), m_chunk_free() {} - output(uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) : + output(int32_t err, uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) : + m_err(err), m_chunk_size(chunk_size), m_chunk_total(chunk_total), m_chunk_free(chunk_free) {} @@ -2111,11 +2113,17 @@ struct chunk_stat { explicit output(const rpc_chunk_stat_out_t& out) { + m_err = out.err; m_chunk_size = out.chunk_size; m_chunk_total = out.chunk_total; m_chunk_free = out.chunk_free; } + int32_t + err() const { + return m_err; + } + uint64_t chunk_size() const { return m_chunk_size; @@ -2132,6 +2140,7 @@ struct chunk_stat { } private: + int32_t m_err; uint64_t m_chunk_size; uint64_t m_chunk_total; uint64_t m_chunk_free; diff --git a/include/daemon/backend/data/chunk_storage.hpp b/include/daemon/backend/data/chunk_storage.hpp index 8a6db6784..08da9d56c 100644 --- a/include/daemon/backend/data/chunk_storage.hpp +++ b/include/daemon/backend/data/chunk_storage.hpp @@ -14,17 +14,16 @@ #ifndef GEKKOFS_CHUNK_STORAGE_HPP #define GEKKOFS_CHUNK_STORAGE_HPP -extern "C" { -#include -} +#include #include #include #include +#include /* Forward declarations */ namespace spdlog { - class logger; +class logger; } namespace gkfs { @@ -36,42 +35,44 @@ struct ChunkStat { unsigned long chunk_free; }; +class ChunkStorageException : public std::system_error { +public: + ChunkStorageException(const int err_code, const std::string& s) : std::system_error(err_code, + std::system_category(), s) {}; +}; + class ChunkStorage { private: static constexpr const char* LOGGER_NAME = "ChunkStorage"; - std::shared_ptr log; + std::shared_ptr log_; - std::string root_path; - size_t chunksize; + std::string root_path_; + size_t chunksize_; inline std::string absolute(const std::string& internal_path) const; static inline std::string get_chunks_dir(const std::string& file_path); - static inline std::string get_chunk_path(const std::string& file_path, unsigned int chunk_id); + static inline std::string get_chunk_path(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id); void init_chunk_space(const std::string& file_path) const; public: - ChunkStorage(const std::string& path, size_t chunksize); + ChunkStorage(std::string path, size_t chunksize); - void write_chunk(const std::string& file_path, unsigned int chunk_id, - const char* buff, size_t size, off64_t offset, - ABT_eventual& eventual) const; - - void read_chunk(const std::string& file_path, unsigned int chunk_id, - char* buff, size_t size, off64_t offset, - ABT_eventual& eventual) const; + void destroy_chunk_space(const std::string& file_path) const; - void trim_chunk_space(const std::string& file_path, unsigned int chunk_start, - unsigned int chunk_end = std::numeric_limits::max()); + ssize_t + write_chunk(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, const char* buff, size_t size, + off64_t offset) const; - void delete_chunk(const std::string& file_path, unsigned int chunk_id); + ssize_t read_chunk(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, char* buf, size_t size, + off64_t offset) const; - void truncate_chunk(const std::string& file_path, unsigned int chunk_id, off_t length); + void trim_chunk_space(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_start); - void destroy_chunk_space(const std::string& file_path) const; + void truncate_chunk_file(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, off_t length); ChunkStat chunk_stat() const; }; diff --git a/include/daemon/backend/exceptions.hpp b/include/daemon/backend/exceptions.hpp index 36e80cfc1..b5cc11d53 100644 --- a/include/daemon/backend/exceptions.hpp +++ b/include/daemon/backend/exceptions.hpp @@ -17,14 +17,20 @@ #include #include +namespace gkfs { +namespace metadata { + class DBException : public std::runtime_error { public: - DBException(const std::string& s) : std::runtime_error(s) {}; + explicit DBException(const std::string& s) : std::runtime_error(s) {}; }; class NotFoundException : public DBException { public: - NotFoundException(const std::string& s) : DBException(s) {}; + explicit NotFoundException(const std::string& s) : DBException(s) {}; }; +} // namespace metadata +} // namespace gkfs + #endif //GEKKOFS_DB_EXCEPTIONS_HPP diff --git a/include/daemon/handler/rpc_util.hpp b/include/daemon/handler/rpc_util.hpp index 38ef10941..ea90df5de 100644 --- a/include/daemon/handler/rpc_util.hpp +++ b/include/daemon/handler/rpc_util.hpp @@ -51,16 +51,41 @@ inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* return ret; } -template -inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t respond(hg_handle_t* handle, O* output) { auto ret = HG_SUCCESS; if (output && handle) { ret = margo_respond(*handle, output); if (ret != HG_SUCCESS) return ret; } + return ret; +} + +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { + auto ret = respond(handle, output); + if (ret != HG_SUCCESS) + return ret; return cleanup(handle, input, static_cast(nullptr), bulk_handle); +} +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output) { + return cleanup_respond(handle, input, output, nullptr); +} + +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, O* output) { + auto ret = respond(handle, output); + if (ret != HG_SUCCESS) + return ret; + if (handle) { + ret = margo_destroy(*handle); + if (ret != HG_SUCCESS) + return ret; + } + return ret; } } // namespace rpc diff --git a/include/daemon/ops/data.hpp b/include/daemon/ops/data.hpp new file mode 100644 index 000000000..be4177e2e --- /dev/null +++ b/include/daemon/ops/data.hpp @@ -0,0 +1,180 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#ifndef GEKKOFS_DAEMON_DATA_HPP +#define GEKKOFS_DAEMON_DATA_HPP + +#include +#include + +#include +#include + +extern "C" { +#include +#include +} + +namespace gkfs { +namespace data { + +class ChunkOpException : public std::runtime_error { +public: + explicit ChunkOpException(const std::string& s) : std::runtime_error(s) {}; +}; + +class ChunkWriteOpException : public ChunkOpException { +public: + explicit ChunkWriteOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +class ChunkReadOpException : public ChunkOpException { +public: + explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +class ChunkMetaOpException : public ChunkOpException { +public: + explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +/** + * Classes to encapsulate asynchronous chunk operations. + * All operations on chunk files must go through the Argobots' task queues. + * Otherwise operations may overtake operations in the queues. + * This applies to write, read, and truncate which may modify the middle of a chunk, essentially a write operation. + * + * Note: This class is not thread-safe. + * + * In the future, this class may be used to provide failure tolerance for IO tasks + * + * Abstract base class without public constructor: + */ +class ChunkOperation { + +protected: + + std::string path_; + + std::vector abt_tasks_; + std::vector task_eventuals_; + + virtual void cancel_all_tasks(); + + explicit ChunkOperation(std::string path); + + ChunkOperation(std::string path, size_t n); + + ~ChunkOperation(); +}; + + +class ChunkTruncateOperation : public ChunkOperation { + +private: + struct chunk_truncate_args { + const std::string* path; + size_t size; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void truncate_abt(void* _arg); + +public: + + explicit ChunkTruncateOperation(std::string path); + + ChunkTruncateOperation(std::string path, size_t n); + + void cancel_all_tasks() override; + + void truncate(size_t idx, size_t size); + + int wait_for_tasks(); +}; + +class ChunkWriteOperation : public ChunkOperation { +private: + + struct chunk_write_args { + const std::string* path; + const char* buf; + gkfs::types::rpc_chnk_id_t chnk_id; + size_t size; + off64_t off; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void write_file_abt(void* _arg); + +public: + + ChunkWriteOperation(std::string path, size_t n); + + void cancel_all_tasks() override; + + void write_async(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset); + + std::pair wait_for_tasks(); + +}; + + +class ChunkReadOperation : public ChunkOperation { + +private: + + struct chunk_read_args { + const std::string* path; + char* buf; + gkfs::types::rpc_chnk_id_t chnk_id; + size_t size; + off64_t off; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void read_file_abt(void* _arg); + +public: + + struct bulk_args { + margo_instance_id mid; + hg_addr_t origin_addr; + hg_bulk_t origin_bulk_handle; + std::vector* origin_offsets; + hg_bulk_t local_bulk_handle; + std::vector* local_offsets; + std::vector* chunk_ids; + }; + + ChunkReadOperation(std::string path, size_t n); + + void cancel_all_tasks() override; + + void read_async(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset); + + std::pair + wait_for_tasks_and_push_back(const bulk_args& args); + +}; + +} // namespace data +} // namespace gkfs + +#endif //GEKKOFS_DAEMON_DATA_HPP diff --git a/include/daemon/ops/metadentry.hpp b/include/daemon/ops/metadentry.hpp index 1c903a3ad..fd41aa712 100644 --- a/include/daemon/ops/metadentry.hpp +++ b/include/daemon/ops/metadentry.hpp @@ -35,7 +35,7 @@ void update(const std::string& path, Metadata& md); void update_size(const std::string& path, size_t io_size, off_t offset, bool append); -void remove_node(const std::string& path); +void remove(const std::string& path); } // namespace metadata } // namespace gkfs diff --git a/include/global/rpc/rpc_types.hpp b/include/global/rpc/rpc_types.hpp index 5e6d57f12..65b086ad2 100644 --- a/include/global/rpc/rpc_types.hpp +++ b/include/global/rpc/rpc_types.hpp @@ -23,88 +23,95 @@ extern "C" { /* visible API for RPC data types used in RPCS */ // misc generic rpc types -MERCURY_GEN_PROC(rpc_err_out_t, ((hg_int32_t) (err))) +MERCURY_GEN_PROC(rpc_err_out_t, + ((hg_int32_t) (err))) // Metadentry MERCURY_GEN_PROC(rpc_mk_node_in_t, - ((hg_const_string_t) (path))\ -((uint32_t) (mode))) + ((hg_const_string_t) (path)) + ((uint32_t) (mode))) -MERCURY_GEN_PROC(rpc_path_only_in_t, ((hg_const_string_t) (path))) +MERCURY_GEN_PROC(rpc_path_only_in_t, + ((hg_const_string_t) (path))) -MERCURY_GEN_PROC(rpc_stat_out_t, ((hg_int32_t) (err)) - ((hg_const_string_t) (db_val))) +MERCURY_GEN_PROC(rpc_stat_out_t, + ((hg_int32_t) (err)) + ((hg_const_string_t) (db_val))) -MERCURY_GEN_PROC(rpc_rm_node_in_t, ((hg_const_string_t) (path))) +MERCURY_GEN_PROC(rpc_rm_node_in_t, + ((hg_const_string_t) (path))) MERCURY_GEN_PROC(rpc_trunc_in_t, - ((hg_const_string_t) (path)) \ -((hg_uint64_t) (length))) + ((hg_const_string_t) (path)) + ((hg_uint64_t) (length))) MERCURY_GEN_PROC(rpc_update_metadentry_in_t, - ((hg_const_string_t) (path))\ -((uint64_t) (nlink))\ -((hg_uint32_t) (mode))\ -((hg_uint32_t) (uid))\ -((hg_uint32_t) (gid))\ -((hg_int64_t) (size))\ -((hg_int64_t) (blocks))\ -((hg_int64_t) (atime))\ -((hg_int64_t) (mtime))\ -((hg_int64_t) (ctime))\ -((hg_bool_t) (nlink_flag))\ -((hg_bool_t) (mode_flag))\ -((hg_bool_t) (size_flag))\ -((hg_bool_t) (block_flag))\ -((hg_bool_t) (atime_flag))\ -((hg_bool_t) (mtime_flag))\ -((hg_bool_t) (ctime_flag))) - -MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, ((hg_const_string_t) (path)) - ((hg_uint64_t) (size)) - ((hg_int64_t) (offset)) - ((hg_bool_t) (append))) - -MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, ((hg_int32_t) (err)) - ((hg_int64_t) (ret_size))) - -MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, ((hg_int32_t) (err)) - ((hg_int64_t) (ret_size))) + ((hg_const_string_t) (path)) + ((uint64_t) (nlink)) + ((hg_uint32_t) (mode)) + ((hg_uint32_t) (uid)) + ((hg_uint32_t) (gid)) + ((hg_int64_t) (size)) + ((hg_int64_t) (blocks)) + ((hg_int64_t) (atime)) + ((hg_int64_t) (mtime)) + ((hg_int64_t) (ctime)) + ((hg_bool_t) (nlink_flag)) + ((hg_bool_t) (mode_flag)) + ((hg_bool_t) (size_flag)) + ((hg_bool_t) (block_flag)) + ((hg_bool_t) (atime_flag)) + ((hg_bool_t) (mtime_flag)) + ((hg_bool_t) (ctime_flag))) + +MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, + ((hg_const_string_t) (path)) + ((hg_uint64_t) (size)) + ((hg_int64_t) (offset)) + ((hg_bool_t) (append))) + +MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, + ((hg_int32_t) (err)) + ((hg_int64_t) (ret_size))) + +MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, + ((hg_int32_t) (err)) + ((hg_int64_t) (ret_size))) #ifdef HAS_SYMLINKS MERCURY_GEN_PROC(rpc_mk_symlink_in_t, - ((hg_const_string_t) (path))\ -((hg_const_string_t) (target_path)) + ((hg_const_string_t) (path)) + ((hg_const_string_t) (target_path)) ) #endif // data MERCURY_GEN_PROC(rpc_read_data_in_t, - ((hg_const_string_t) (path))\ -((int64_t) (offset))\ -((hg_uint64_t) (host_id))\ -((hg_uint64_t) (host_size))\ -((hg_uint64_t) (chunk_n))\ -((hg_uint64_t) (chunk_start))\ -((hg_uint64_t) (chunk_end))\ -((hg_uint64_t) (total_chunk_size))\ -((hg_bulk_t) (bulk_handle))) + ((hg_const_string_t) (path)) + ((int64_t) (offset)) + ((hg_uint64_t) (host_id)) + ((hg_uint64_t) (host_size)) + ((hg_uint64_t) (chunk_n)) + ((hg_uint64_t) (chunk_start)) + ((hg_uint64_t) (chunk_end)) + ((hg_uint64_t) (total_chunk_size)) + ((hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC(rpc_data_out_t, - ((int32_t) (err))\ -((hg_size_t) (io_size))) + ((int32_t) (err)) + ((hg_size_t) (io_size))) MERCURY_GEN_PROC(rpc_write_data_in_t, - ((hg_const_string_t) (path))\ -((int64_t) (offset))\ -((hg_uint64_t) (host_id))\ -((hg_uint64_t) (host_size))\ -((hg_uint64_t) (chunk_n))\ -((hg_uint64_t) (chunk_start))\ -((hg_uint64_t) (chunk_end))\ -((hg_uint64_t) (total_chunk_size))\ -((hg_bulk_t) (bulk_handle))) + ((hg_const_string_t) (path)) + ((int64_t) (offset)) + ((hg_uint64_t) (host_id)) + ((hg_uint64_t) (host_size)) + ((hg_uint64_t) (chunk_n)) + ((hg_uint64_t) (chunk_start)) + ((hg_uint64_t) (chunk_end)) + ((hg_uint64_t) (total_chunk_size)) + ((hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC(rpc_get_dirents_in_t, ((hg_const_string_t) (path)) @@ -117,15 +124,16 @@ MERCURY_GEN_PROC(rpc_get_dirents_out_t, ) -MERCURY_GEN_PROC(rpc_config_out_t, ((hg_const_string_t) (mountdir)) - ((hg_const_string_t) (rootdir)) \ -((hg_bool_t) (atime_state)) \ -((hg_bool_t) (mtime_state)) \ -((hg_bool_t) (ctime_state)) \ -((hg_bool_t) (link_cnt_state)) \ -((hg_bool_t) (blocks_state)) \ -((hg_uint32_t) (uid)) \ -((hg_uint32_t) (gid)) \ +MERCURY_GEN_PROC(rpc_config_out_t, + ((hg_const_string_t) (mountdir)) + ((hg_const_string_t) (rootdir)) + ((hg_bool_t) (atime_state)) + ((hg_bool_t) (mtime_state)) + ((hg_bool_t) (ctime_state)) + ((hg_bool_t) (link_cnt_state)) + ((hg_bool_t) (blocks_state)) + ((hg_uint32_t) (uid)) + ((hg_uint32_t) (gid)) ) @@ -134,7 +142,8 @@ MERCURY_GEN_PROC(rpc_chunk_stat_in_t, ) MERCURY_GEN_PROC(rpc_chunk_stat_out_t, - ((hg_uint64_t) (chunk_size)) + ((hg_int32_t) (err)) + ((hg_uint64_t) (chunk_size)) ((hg_uint64_t) (chunk_total)) ((hg_uint64_t) (chunk_free)) ) diff --git a/include/global/rpc/rpc_util.hpp b/include/global/rpc/rpc_util.hpp index 2822c67c0..cdac93a2b 100644 --- a/include/global/rpc/rpc_util.hpp +++ b/include/global/rpc/rpc_util.hpp @@ -22,10 +22,16 @@ extern "C" { #include +namespace gkfs { +namespace rpc { + hg_bool_t bool_to_merc_bool(bool state); std::string get_my_hostname(bool short_hostname = false); std::string get_host_by_name(const std::string& hostname); +} // namespace rpc +} // namespace gkfs + #endif //GEKKOFS_GLOBAL_RPC_UTILS_HPP diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 029304a77..037dab5eb 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -234,7 +234,13 @@ int gkfs_stat(const string& path, struct stat* buf, bool follow_links) { } int gkfs_statfs(struct statfs* buf) { - auto blk_stat = gkfs::rpc::forward_get_chunk_stat(); + gkfs::rpc::ChunkStat blk_stat{}; + try { + blk_stat = gkfs::rpc::forward_get_chunk_stat(); + } catch (const std::exception& e) { + LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + return -1; + } buf->f_type = 0; buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; @@ -250,24 +256,6 @@ int gkfs_statfs(struct statfs* buf) { return 0; } -int gkfs_statvfs(struct statvfs* buf) { - gkfs::preload::init_ld_env_if_needed(); - auto blk_stat = gkfs::rpc::forward_get_chunk_stat(); - buf->f_bsize = blk_stat.chunk_size; - buf->f_blocks = blk_stat.chunk_total; - buf->f_bfree = blk_stat.chunk_free; - buf->f_bavail = blk_stat.chunk_free; - buf->f_files = 0; - buf->f_ffree = 0; - buf->f_favail = 0; - buf->f_fsid = 0; - buf->f_namemax = path::max_length; - buf->f_frsize = 0; - buf->f_flag = - ST_NOATIME | ST_NODIRATIME | ST_NOSUID | ST_NODEV | ST_SYNCHRONOUS; - return 0; -} - off_t gkfs_lseek(unsigned int fd, off_t offset, unsigned int whence) { return gkfs_lseek(CTX->file_map()->get(fd), offset, whence); } diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 4db07f20e..e9663604a 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -195,7 +195,7 @@ void load_hosts() { LOG(INFO, "Hosts pool size: {}", hosts.size()); - auto local_hostname = get_my_hostname(true); + auto local_hostname = gkfs::rpc::get_my_hostname(true); bool local_host_found = false; std::vector addrs; diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index f68f65e48..3c850a218 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -415,6 +415,11 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si return error ? -1 : 0; } +/** + * Performs a chunk stat RPC to all hosts + * @return rpc::ChunkStat + * @throws std::runtime_error + */ ChunkStat forward_get_chunk_stat() { std::vector> handles; @@ -444,25 +449,37 @@ ChunkStat forward_get_chunk_stat() { unsigned long chunk_total = 0; unsigned long chunk_free = 0; + int error = 0; + // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { - gkfs::rpc::chunk_stat::output out; + gkfs::rpc::chunk_stat::output out{}; try { // XXX We might need a timeout here to not wait forever for an // output that never comes? out = handles[i].get().at(0); + if (out.err() != 0) { + error = out.err(); + LOG(ERROR, "Host '{}' reported err code '{}' during stat chunk.", CTX->hosts().at(i).to_string(), + error); + // we don't break here to ensure all responses are processed + continue; + } assert(out.chunk_size() == chunk_size); chunk_total += out.chunk_total(); chunk_free += out.chunk_free(); - } catch (const std::exception& ex) { - throw std::runtime_error( - fmt::format("Failed to get rpc output for target host: {}]", i)); + errno = EBUSY; + throw std::runtime_error(fmt::format("Failed to get RPC output from host: {}", i)); } } + if (error != 0) { + errno = error; + throw std::runtime_error("chunk stat failed on one host"); + } return {chunk_size, chunk_total, chunk_free}; } diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index e4ba89177..945a21983 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -7,10 +7,11 @@ set(DAEMON_SRC daemon.cpp util.cpp ops/metadentry.cpp + ops/data.cpp classes/fs_data.cpp classes/rpc_data.cpp - handler/srv_metadata.cpp handler/srv_data.cpp + handler/srv_metadata.cpp handler/srv_management.cpp ) set(DAEMON_HEADERS @@ -23,6 +24,7 @@ set(DAEMON_HEADERS ../../include/global/path_util.hpp ../../include/daemon/daemon.hpp ../../include/daemon/util.hpp + ../../include/daemon/ops/data.hpp ../../include/daemon/ops/metadentry.hpp ../../include/daemon/classes/fs_data.hpp ../../include/daemon/classes/rpc_data.hpp diff --git a/src/daemon/backend/data/CMakeLists.txt b/src/daemon/backend/data/CMakeLists.txt index 41f1ec0f9..1a7e2a59a 100644 --- a/src/daemon/backend/data/CMakeLists.txt +++ b/src/daemon/backend/data/CMakeLists.txt @@ -5,6 +5,7 @@ target_sources(storage ${INCLUDE_DIR}/daemon/backend/data/chunk_storage.hpp PRIVATE ${INCLUDE_DIR}/global/path_util.hpp + ${INCLUDE_DIR}/global/global_defs.hpp ${CMAKE_CURRENT_LIST_DIR}/chunk_storage.cpp ) @@ -12,10 +13,8 @@ target_link_libraries(storage PRIVATE spdlog Boost::filesystem - ${ABT_LIBRARIES} ) -target_include_directories(storage - PRIVATE - ${ABT_INCLUDE_DIRS} - ) +#target_include_directories(storage +# PRIVATE +# ) diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index 3d9b69fec..dae4da983 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -15,6 +15,8 @@ #include #include +#include + #include #include @@ -28,22 +30,11 @@ using namespace std; namespace gkfs { namespace data { +// private functions + string ChunkStorage::absolute(const string& internal_path) const { assert(gkfs::path::is_relative(internal_path)); - return root_path + '/' + internal_path; -} - -ChunkStorage::ChunkStorage(const string& path, const size_t chunksize) : - root_path(path), - chunksize(chunksize) { - //TODO check path: absolute, exists, permission to write etc... - assert(gkfs::path::is_absolute(root_path)); - - /* Initialize logger */ - log = spdlog::get(LOGGER_NAME); - assert(log); - - log->debug("Chunk storage initialized with path: '{}'", root_path); + return fmt::format("{}/{}", root_path_, internal_path); } string ChunkStorage::get_chunks_dir(const string& file_path) { @@ -53,169 +44,256 @@ string ChunkStorage::get_chunks_dir(const string& file_path) { return chunk_dir; } -string ChunkStorage::get_chunk_path(const string& file_path, unsigned int chunk_id) { - return get_chunks_dir(file_path) + '/' + ::to_string(chunk_id); -} - -void ChunkStorage::destroy_chunk_space(const string& file_path) const { - auto chunk_dir = absolute(get_chunks_dir(file_path)); - try { - bfs::remove_all(chunk_dir); - } catch (const bfs::filesystem_error& e) { - log->error("Failed to remove chunk directory. Path: '{}', Error: '{}'", chunk_dir, e.what()); - } +string ChunkStorage::get_chunk_path(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id) { + return fmt::format("{}/{}", get_chunks_dir(file_path), chunk_id); } +/** + * Creates a chunk directories are all chunk files are placed in. + * The path to the real file will be used as the directory name + * @param file_path + * @returns 0 on success or errno on failure + */ void ChunkStorage::init_chunk_space(const string& file_path) const { auto chunk_dir = absolute(get_chunks_dir(file_path)); auto err = mkdir(chunk_dir.c_str(), 0750); if (err == -1 && errno != EEXIST) { - log->error("Failed to create chunk dir. Path: '{}', Error: '{}'", chunk_dir, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to create chunk directory"); + auto err_str = fmt::format("{}() Failed to create chunk directory. File: '{}', Error: '{}'", __func__, + file_path, errno); + throw ChunkStorageException(errno, err_str); } } -/* Delete all chunks stored on this node that falls in the gap [chunk_start, chunk_end] +// public functions + +/** * - * This is pretty slow method because it cycle over all the chunks sapce for this file. + * @param path + * @param chunksize + * @throws ChunkStorageException */ -void ChunkStorage::trim_chunk_space(const string& file_path, - unsigned int chunk_start, unsigned int chunk_end) { - - auto chunk_dir = absolute(get_chunks_dir(file_path)); - const bfs::directory_iterator end; - - for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { - auto chunk_path = chunk_file->path(); - auto chunk_id = ::stoul(chunk_path.filename().c_str()); - if (chunk_id >= chunk_start && chunk_id <= chunk_end) { - int ret = unlink(chunk_path.c_str()); - if (ret == -1) { - log->error("Failed to remove chunk file. File: '{}', Error: '{}'", chunk_path.native(), - ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to remove chunk file"); - } - } - } -} - -void ChunkStorage::delete_chunk(const string& file_path, unsigned int chunk_id) { - auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - int ret = unlink(chunk_path.c_str()); - if (ret == -1) { - log->error("Failed to remove chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to remove chunk file"); - } +ChunkStorage::ChunkStorage(string path, const size_t chunksize) : + root_path_(std::move(path)), + chunksize_(chunksize) { + /* Initialize logger */ + log_ = spdlog::get(LOGGER_NAME); + assert(log_); + assert(gkfs::path::is_absolute(root_path_)); + // Verify that we have sufficient write access + // This will throw on error, canceling daemon initialization + auto test_file_path = "/.__chunk_dir_test"s; + init_chunk_space(test_file_path); + destroy_chunk_space(test_file_path); + log_->debug("{}() Chunk storage initialized with path: '{}'", __func__, root_path_); } -void ChunkStorage::truncate_chunk(const string& file_path, unsigned int chunk_id, off_t length) { - auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - assert(length > 0 && (unsigned int) length <= chunksize); - int ret = truncate(chunk_path.c_str(), length); - if (ret == -1) { - log->error("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to truncate chunk file"); +/** + * Removes chunk directory with all its files + * @param file_path + * @throws ChunkStorageException + */ +void ChunkStorage::destroy_chunk_space(const string& file_path) const { + auto chunk_dir = absolute(get_chunks_dir(file_path)); + try { + // Note: remove_all does not throw an error when path doesn't exist. + auto n = bfs::remove_all(chunk_dir); + log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); + } catch (const bfs::filesystem_error& e) { + auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, + chunk_dir, e.what()); + throw ChunkStorageException(e.code().value(), err_str); } } -void ChunkStorage::write_chunk(const string& file_path, unsigned int chunk_id, - const char* buff, size_t size, off64_t offset, ABT_eventual& eventual) const { - - assert((offset + size) <= chunksize); +/** + * Writes a chunk file. + * On failure returns a negative error code corresponding to `-errno`. + * + * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pwrite behavior + * + * @param file_path + * @param chunk_id + * @param buff + * @param size + * @param offset + * @param eventual + * @throws ChunkStorageException (caller will handle eventual signalling) + */ +ssize_t +ChunkStorage::write_chunk(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, const char* buff, size_t size, + off64_t offset) const { + assert((offset + size) <= chunksize_); + // may throw ChunkStorageException on failure init_chunk_space(file_path); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); int fd = open(chunk_path.c_str(), O_WRONLY | O_CREAT, 0640); if (fd < 0) { - log->error("Failed to open chunk file for write. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to open chunk file for write"); + auto err_str = fmt::format("Failed to open chunk file for write. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } auto wrote = pwrite(fd, buff, size, offset); if (wrote < 0) { - log->error("Failed to write chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", - chunk_path, size, offset, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to write chunk file"); + auto err_str = fmt::format("Failed to write chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", + chunk_path, size, offset, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - ABT_eventual_set(eventual, &wrote, sizeof(size_t)); - - auto err = close(fd); - if (err < 0) { - log->error("Failed to close chunk file after write. File: '{}', Error: '{}'", + // if close fails we just write an entry into the log erroring out + if (close(fd) < 0) { + log_->warn("Failed to close chunk file after write. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - //throw ::system_error(errno, ::system_category(), "Failed to close chunk file"); } + return wrote; } -void ChunkStorage::read_chunk(const string& file_path, unsigned int chunk_id, - char* buff, size_t size, off64_t offset, ABT_eventual& eventual) const { - assert((offset + size) <= chunksize); +/** + * Read from a chunk file. + * On failure returns a negative error code corresponding to `-errno`. + * + * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pread behavior + * @param file_path + * @param chunk_id + * @param buf + * @param size + * @param offset + * @param eventual + */ +ssize_t +ChunkStorage::read_chunk(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, char* buf, size_t size, + off64_t offset) const { + assert((offset + size) <= chunksize_); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); int fd = open(chunk_path.c_str(), O_RDONLY); if (fd < 0) { - log->error("Failed to open chunk file for read. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to open chunk file for read"); + auto err_str = fmt::format("Failed to open chunk file for read. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } size_t tot_read = 0; ssize_t read = 0; do { read = pread64(fd, - buff + tot_read, + buf + tot_read, size - tot_read, offset + tot_read); if (read == 0) { + /* + * A value of zero indicates end-of-file (except if the value of the size argument is also zero). + * This is not considered an error. If you keep calling read while at end-of-file, + * it will keep returning zero and doing nothing else. + * Hence, we break here. + */ break; } if (read < 0) { - log->error("Failed to read chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", - chunk_path, size, offset, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to read chunk file"); + auto err_str = fmt::format("Failed to read chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", + chunk_path, size, offset, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } #ifndef NDEBUG if (tot_read + read < size) { - log->warn("Read less bytes than requested: '{}'/{}. Total read was '{}'", read, size - tot_read, size); + log_->debug("Read less bytes than requested: '{}'/{}. Total read was '{}'. This is not an error!", read, + size - tot_read, size); } #endif assert(read > 0); tot_read += read; - - } while (tot_read != size); - ABT_eventual_set(eventual, &tot_read, sizeof(size_t)); - auto err = close(fd); - if (err < 0) { - log->error("Failed to close chunk file after read. File: '{}', Error: '{}'", + // if close fails we just write an entry into the log erroring out + if (close(fd) < 0) { + log_->warn("Failed to close chunk file after read. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - //throw ::system_error(errno, ::system_category(), "Failed to close chunk file"); } + return tot_read; +} + +/** + * Delete all chunks starting with chunk a chunk id. + * Note eventual consistency here: While chunks are removed, there is no lock that prevents + * other processes from modifying anything in that directory. + * It is the application's responsibility to stop modifying the file while truncate is executed + * + * If an error is encountered when removing a chunk file, the function will still remove all files and + * report the error afterwards with ChunkStorageException. + * + * @param file_path + * @param chunk_start + * @param chunk_end + * @throws ChunkStorageException + */ +void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_start) { + + auto chunk_dir = absolute(get_chunks_dir(file_path)); + const bfs::directory_iterator end; + auto err_flag = false; + for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { + auto chunk_path = chunk_file->path(); + auto chunk_id = ::stoul(chunk_path.filename().c_str()); + if (chunk_id >= chunk_start) { + auto err = unlink(chunk_path.c_str()); + if (err == -1 && errno != ENOENT) { + err_flag = true; + log_->warn("{}() Failed to remove chunk file. File: '{}', Error: '{}'", __func__, chunk_path.native(), + ::strerror(errno)); + } + } + } + if (err_flag) + throw ChunkStorageException(EIO, fmt::format("{}() One or more errors occurred when truncating '{}'", __func__, + file_path)); } +/** + * Truncates a single chunk file to a given length + * @param file_path + * @param chunk_id + * @param length + * @throws ChunkStorageException + */ +void ChunkStorage::truncate_chunk_file(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, off_t length) { + auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); + assert(length > 0 && static_cast(length) <= chunksize_); + int ret = truncate64(chunk_path.c_str(), length); + if (ret == -1) { + auto err_str = fmt::format("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); + } +} + +/** + * Calls statfs on the chunk directory to get statistic on its used and free size left + * @return ChunkStat + * @throws ChunkStorageException + */ ChunkStat ChunkStorage::chunk_stat() const { struct statfs sfs{}; - if (statfs(root_path.c_str(), &sfs) != 0) { - log->error("Failed to get filesystem statistic for chunk directory." - " Error: '{}'", ::strerror(errno)); - throw ::system_error(errno, ::system_category(), - "statfs() failed on chunk directory"); + + if (statfs(root_path_.c_str(), &sfs) != 0) { + auto err_str = fmt::format("Failed to get filesystem statistic for chunk directory. Error: '{}'", + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - log->debug("Chunksize '{}', total '{}', free '{}'", sfs.f_bsize, sfs.f_blocks, sfs.f_bavail); + log_->debug("Chunksize '{}', total '{}', free '{}'", sfs.f_bsize, sfs.f_blocks, sfs.f_bavail); auto bytes_total = static_cast(sfs.f_bsize) * static_cast(sfs.f_blocks); auto bytes_free = static_cast(sfs.f_bsize) * static_cast(sfs.f_bavail); - return {chunksize, - bytes_total / chunksize, - bytes_free / chunksize}; + return {chunksize_, + bytes_total / chunksize_, + bytes_free / chunksize_}; } } // namespace data diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 9fbf1d3f7..81968bb7f 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -329,7 +329,7 @@ int main(int argc, const char* argv[]) { if (vm.count("listen")) { addr = vm["listen"].as(); } else { - addr = get_my_hostname(true); + addr = gkfs::rpc::get_my_hostname(true); } GKFS_DATA->bind_addr(fmt::format("{}://{}", RPC_PROTOCOL, addr)); diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 553a9eaa5..289a294b8 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -16,111 +16,19 @@ #include #include #include +#include #include #include #include - using namespace std; -struct write_chunk_args { - const std::string* path; - const char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; - size_t size; - off64_t off; - ABT_eventual eventual; -}; - -/** - * Used by an argobots threads. Argument args has the following fields: - * const std::string* path; - const char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; - size_t size; - off64_t off; - ABT_eventual* eventual; - * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. - * This function is called by tasklets, as this function cannot be allowed to block. - * @return written_size is put into eventual and returned that way - */ -void write_file_abt(void* _arg) { - // Unpack args - auto* arg = static_cast(_arg); - const std::string& path = *(arg->path); - - try { - GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, - arg->buf, arg->size, arg->off, arg->eventual); - } catch (const std::system_error& serr) { - GKFS_DATA->spdlogger()->error("{}() Error writing chunk {} of file {}", __func__, arg->chnk_id, path); - ssize_t wrote = -(serr.code().value()); - ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); - } - -} - -struct read_chunk_args { - const std::string* path; - char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; - size_t size; - off64_t off; - ABT_eventual eventual; -}; - /** - * Used by an argobots threads. Argument args has the following fields: - * const std::string* path; - char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; - size_t size; - off64_t off; - ABT_eventual* eventual; - * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. - * This function is called by tasklets, as this function cannot be allowed to block. - * @return read_size is put into eventual and returned that way - */ -void read_file_abt(void* _arg) { - //unpack args - auto* arg = static_cast(_arg); - const std::string& path = *(arg->path); - - try { - GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, - arg->buf, arg->size, arg->off, arg->eventual); - } catch (const std::system_error& serr) { - GKFS_DATA->spdlogger()->error("{}() Error reading chunk {} of file {}", __func__, arg->chnk_id, path); - ssize_t read = -(serr.code().value()); - ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); - } -} - -/** - * Free Argobots tasks and eventual constructs in a given vector until max_idx. - * Nothing is done for a vector if nullptr is given - * @param abt_tasks - * @param abt_eventuals - * @param max_idx + * RPC handler for an incoming write RPC + * @param handle * @return */ -void cancel_abt_io(vector* abt_tasks, vector* abt_eventuals, uint64_t max_idx) { - if (abt_tasks != nullptr) { - for (uint64_t i = 0; i < max_idx; i++) { - ABT_task_cancel(abt_tasks->at(i)); - ABT_task_free(&abt_tasks->at(i)); - } - } - if (abt_eventuals != nullptr) { - for (uint64_t i = 0; i < max_idx; i++) { - ABT_eventual_reset(abt_eventuals->at(i)); - ABT_eventual_free(&abt_eventuals->at(i)); - } - } -} - - static hg_return_t rpc_srv_write(hg_handle_t handle) { /* * 1. Setup @@ -165,7 +73,6 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); - auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -189,10 +96,8 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size : gkfs::config::rpc::chunksize; uint64_t origin_offset; uint64_t local_offset; - // task structures for async writing - vector abt_tasks(in.chunk_n); - vector task_eventuals(in.chunk_n); - vector task_args(in.chunk_n); + // object for asynchronous disk IO + gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n}; /* * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk */ @@ -205,16 +110,18 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { // if only 1 destination and 1 chunk (small write) the transfer_size == bulk_size - auto offset_transfer_size = (in.offset + bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size - : static_cast( - gkfs::config::rpc::chunksize - in.offset); + size_t offset_transfer_size = 0; + if (in.offset + bulk_size <= gkfs::config::rpc::chunksize) + offset_transfer_size = bulk_size; + else + offset_transfer_size = static_cast(gkfs::config::rpc::chunksize - in.offset); ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, offset_transfer_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__, chnk_id_file, in.chunk_start, in.chunk_end - 1); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } bulk_buf_ptrs[chnk_id_curr] = chnk_ptr; @@ -233,7 +140,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { if (chnk_id_curr == in.chunk_n - 1) transfer_size = chnk_size_left_host; GKFS_DATA->spdlogger()->trace( - "{}() BULK_TRANSFER hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}", + "{}() BULK_TRANSFER_PULL hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}", __func__, host_id, in.path, chnk_id_file, in.total_chunk_size, chnk_size_left_host, origin_offset, local_offset, transfer_size); // RDMA the data to here @@ -242,8 +149,8 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})", __func__, - *path, chnk_id_file, in.chunk_start, (in.chunk_end - 1)); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr); + in.path, chnk_id_file, in.chunk_start, (in.chunk_end - 1)); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } bulk_buf_ptrs[chnk_id_curr] = chnk_ptr; @@ -251,22 +158,13 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; } - // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool - // Starting tasklets for parallel I/O - ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value - auto& task_arg = task_args[chnk_id_curr]; - task_arg.path = path.get(); - task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; - task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; - task_arg.size = chnk_sizes[chnk_id_curr]; - // only the first chunk gets the offset. the chunks are sorted on the client side - task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; - task_arg.eventual = task_eventuals[chnk_id_curr]; - auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args[chnk_id_curr], - &abt_tasks[chnk_id_curr]); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() task create failed", __func__); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1); + try { + // start tasklet for writing chunk + chunk_op.write_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], + chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + } catch (const gkfs::data::ChunkWriteOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while write_async err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } // next chunk @@ -280,30 +178,9 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { /* * 4. Read task results and accumulate in out.io_size */ - out.err = 0; - out.io_size = 0; - for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) { - ssize_t* task_written_size = nullptr; - // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler - auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_written_size); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to wait for write task for chunk {}", - __func__, chnk_id_curr); - out.err = EIO; - break; - } - assert(task_written_size != nullptr); - if (*task_written_size < 0) { - GKFS_DATA->spdlogger()->error("{}() Write task failed for chunk {}", - __func__, chnk_id_curr); - out.err = -(*task_written_size); - break; - } - - out.io_size += *task_written_size; // add task written size to output size - ABT_eventual_free(&task_eventuals[chnk_id_curr]); - } + auto write_result = chunk_op.wait_for_tasks(); + out.err = write_result.first; + out.io_size = write_result.second; // Sanity check to see if all data has been written if (in.total_chunk_size != out.io_size) { @@ -315,17 +192,16 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { * 5. Respond and cleanup */ GKFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.err); - ret = gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - // free tasks after responding - for (auto&& task : abt_tasks) { - ABT_task_join(task); - ABT_task_free(&task); - } - return ret; + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_write) +/** + * RPC handler for an incoming read RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_read(hg_handle_t handle) { /* * 1. Setup @@ -371,7 +247,6 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); - auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -387,10 +262,8 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto chnk_ptr = static_cast(bulk_buf); // temporary variables auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size : gkfs::config::rpc::chunksize; - // tasks structures - vector abt_tasks(in.chunk_n); - vector task_eventuals(in.chunk_n); - vector task_args(in.chunk_n); + // object for asynchronous disk IO + gkfs::data::ChunkReadOperation chunk_read_op{in.path, in.chunk_n}; /* * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk */ @@ -403,9 +276,11 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { // Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { // if only 1 destination and 1 chunk (small read) the transfer_size == bulk_size - auto offset_transfer_size = (in.offset + bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size - : static_cast( - gkfs::config::rpc::chunksize - in.offset); + size_t offset_transfer_size = 0; + if (in.offset + bulk_size <= gkfs::config::rpc::chunksize) + offset_transfer_size = bulk_size; + else + offset_transfer_size = static_cast(gkfs::config::rpc::chunksize - in.offset); // Setting later transfer offsets local_offsets[chnk_id_curr] = 0; origin_offsets[chnk_id_curr] = 0; @@ -432,22 +307,12 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; } - // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool - // Starting tasklets for parallel I/O - ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value - auto& task_arg = task_args[chnk_id_curr]; - task_arg.path = path.get(); - task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; - task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; - task_arg.size = chnk_sizes[chnk_id_curr]; - // only the first chunk gets the offset. the chunks are sorted on the client side - task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; - task_arg.eventual = task_eventuals[chnk_id_curr]; - auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args[chnk_id_curr], - &abt_tasks[chnk_id_curr]); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() task create failed", __func__); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1); + try { + // start tasklet for read operation + chunk_read_op.read_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + } catch (const gkfs::data::ChunkReadOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } chnk_id_curr++; @@ -459,113 +324,89 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { /* * 4. Read task results and accumulate in out.io_size */ - out.err = 0; - out.io_size = 0; - for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) { - ssize_t* task_read_size = nullptr; - // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler - auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to wait for read task for chunk {}", - __func__, chnk_id_curr); - out.err = EIO; - break; - } - assert(task_read_size != nullptr); - if (*task_read_size < 0) { - if (-(*task_read_size) == ENOENT) { - continue; - } - GKFS_DATA->spdlogger()->warn( - "{}() Read task failed for chunk {}", - __func__, chnk_id_curr); - out.err = -(*task_read_size); - break; - } - - if (*task_read_size == 0) { - continue; - } - - ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id_curr], - bulk_handle, local_offsets[chnk_id_curr], *task_read_size); - if (ret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}", - __func__, chnk_id_curr, in.path, origin_offsets[chnk_id_curr], local_offsets[chnk_id_curr], - chnk_sizes[chnk_id_curr]); - out.err = EIO; - break; - } - out.io_size += *task_read_size; // add task read size to output size - } + gkfs::data::ChunkReadOperation::bulk_args bulk_args{}; + bulk_args.mid = mid; + bulk_args.origin_addr = hgi->addr; + bulk_args.origin_bulk_handle = in.bulk_handle; + bulk_args.origin_offsets = &origin_offsets; + bulk_args.local_bulk_handle = bulk_handle; + bulk_args.local_offsets = &local_offsets; + bulk_args.chunk_ids = &chnk_ids_host; + // wait for all tasklets and push read data back to client + auto read_result = chunk_read_op.wait_for_tasks_and_push_back(bulk_args); + out.err = read_result.first; + out.io_size = read_result.second; /* * 5. Respond and cleanup */ GKFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}", __func__, out.err); - ret = gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - // free tasks after responding - cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n); - return ret; + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_read) +/** + * RPC handler for an incoming truncate RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_truncate(hg_handle_t handle) { rpc_trunc_in_t in{}; rpc_err_out_t out{}; - + out.err = EIO; + // Getting some information from margo auto ret = margo_get_input(handle, &in); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Could not get RPC input data with err {}", __func__, ret); - throw runtime_error("Failed to get RPC input data"); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: {}", __func__, in.path, in.length); + GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); - unsigned int chunk_start = gkfs::util::chnk_id_for_offset(in.length, gkfs::config::rpc::chunksize); - - // If we trunc in the the middle of a chunk, do not delete that chunk - auto left_pad = gkfs::util::chnk_lpad(in.length, gkfs::config::rpc::chunksize); - if (left_pad != 0) { - GKFS_DATA->storage()->truncate_chunk(in.path, chunk_start, left_pad); - ++chunk_start; + gkfs::data::ChunkTruncateOperation chunk_op{in.path}; + try { + // start tasklet for truncate operation + chunk_op.truncate(0, in.length); + } catch (const gkfs::data::ChunkMetaOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->storage()->trim_chunk_space(in.path, chunk_start); + // wait and get output + out.err = chunk_op.wait_for_tasks(); - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); - auto hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to respond"); - } - // Destroy handle when finished - margo_free_input(handle, &in); - margo_destroy(handle); - return HG_SUCCESS; + GKFS_DATA->spdlogger()->debug("{}() Sending output response '{}'", __func__, out.err); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate) +/** + * RPC handler for an incoming chunk stat RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { - GKFS_DATA->spdlogger()->trace("{}() called", __func__); - + GKFS_DATA->spdlogger()->debug("{}() enter", __func__); rpc_chunk_stat_out_t out{}; - // Get input - auto chk_stat = GKFS_DATA->storage()->chunk_stat(); - // Create output and send it - out.chunk_size = chk_stat.chunk_size; - out.chunk_total = chk_stat.chunk_total; - out.chunk_free = chk_stat.chunk_free; - auto hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + out.err = EIO; + try { + auto chk_stat = GKFS_DATA->storage()->chunk_stat(); + out.chunk_size = chk_stat.chunk_size; + out.chunk_total = chk_stat.chunk_total; + out.chunk_free = chk_stat.chunk_free; + out.err = 0; + } catch (const gkfs::data::ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + out.err = err.code().value(); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); + out.err = EAGAIN; } - // Destroy handle when finished - margo_destroy(handle); - return hret; + // Create output and send it back + return gkfs::rpc::cleanup_respond(&handle, &out); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index ce6de804f..2e19ac3d1 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -18,6 +18,7 @@ #include #include +#include using namespace std; @@ -69,7 +70,7 @@ static hg_return_t rpc_srv_stat(hg_handle_t handle) { out.db_val = val.c_str(); out.err = 0; GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -100,17 +101,17 @@ static hg_return_t rpc_srv_decr_size(hg_handle_t handle) { throw runtime_error("Failed to retrieve input from handle"); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: {}", __func__, in.path, in.length); + GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); try { GKFS_DATA->mdb()->decrease_size(in.path, in.length); out.err = 0; } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); out.err = EIO; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -134,26 +135,23 @@ static hg_return_t rpc_srv_remove(hg_handle_t handle) { assert(ret == HG_SUCCESS); GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); + // Remove metadentry if exists on the node and remove all chunks for that file try { - // Remove metadentry if exists on the node - // and remove all chunks for that file - gkfs::metadata::remove_node(in.path); - out.err = 0; - } catch (const NotFoundException& e) { - /* The metadentry was not found on this node, - * this is not an error. At least one node involved in this - * broadcast operation will find and delete the entry on its local - * MetadataDB. - * TODO: send the metadentry remove only to the node that actually - * has it. - */ + gkfs::metadata::remove(in.path); out.err = 0; + } catch (const gkfs::metadata::DBException& e) { + GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); + out.err = EIO; + } catch (const gkfs::data::ChunkStorageException& e) { + GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), + e.what()); + out.err = e.code().value(); } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to remove node: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); out.err = EBUSY; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -201,7 +199,7 @@ static hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { out.err = 1; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -224,7 +222,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { if (ret != HG_SUCCESS) GKFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}, append: {}", __func__, in.path, in.size, + GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, in.offset, in.append); try { @@ -233,7 +231,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { //TODO the actual size of the file could be different after the size update // do to concurrency on size out.ret_size = in.size + in.offset; - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -241,7 +239,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { out.err = EBUSY; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -270,7 +268,7 @@ static hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { try { out.ret_size = gkfs::metadata::get_size(in.path); out.err = 0; - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -301,7 +299,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { auto ret = margo_get_input(handle, &in); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Could not get RPC input data with err {}", __func__, ret); + "{}() Could not get RPC input data with err '{}'", __func__, ret); return ret; } @@ -309,7 +307,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); GKFS_DATA->spdlogger()->debug( - "{}() Got dirents RPC with path {}", __func__, in.path); + "{}() Got dirents RPC with path '{}'", __func__, in.path); auto bulk_size = margo_bulk_get_size(in.bulk_handle); //Get directory entries from local DB @@ -366,7 +364,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { out_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Failed push dirents on path {} to client", + "{}() Failed push dirents on path '{}' to client", __func__, in.path ); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); @@ -399,10 +397,10 @@ static hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { gkfs::metadata::create(in.path, md); out.err = 0; } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; } - GKFS_DATA->spdlogger()->debug("{}() Sending output err {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp new file mode 100644 index 000000000..1681b1559 --- /dev/null +++ b/src/daemon/ops/data.cpp @@ -0,0 +1,426 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#include +#include +#include +#include + +extern "C" { +#include +} + +using namespace std; + +namespace gkfs { +namespace data { + +/** + * Cleans up and cancels all tasks in flight + */ +void ChunkOperation::cancel_all_tasks() { + GKFS_DATA->spdlogger()->trace("{}() enter", __func__); + for (auto& task : abt_tasks_) { + if (task) { + ABT_task_cancel(task); + ABT_task_free(&task); + } + } + for (auto& eventual : task_eventuals_) { + if (eventual) { + ABT_eventual_reset(eventual); + ABT_eventual_free(&eventual); + } + } + abt_tasks_.clear(); + task_eventuals_.clear(); +} + +ChunkOperation::ChunkOperation(string path) : ChunkOperation(move(path), 1) {} + +ChunkOperation::ChunkOperation(string path, size_t n) : path_(::move(path)) { + // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults + abt_tasks_.resize(n); + task_eventuals_.resize(n); +} + +ChunkOperation::~ChunkOperation() { + cancel_all_tasks(); +} + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + size_t size; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent operations per daemon. + * @return error is put into eventual to signal that it finished + */ +void ChunkTruncateOperation::truncate_abt(void* _arg) { + // Unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + const size_t size = arg->size; + int err_response = 0; + try { + // get chunk from where to cut off + auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); + // do not last delete chunk if it is in the middle of a chunk + auto left_pad = gkfs::util::chnk_lpad(size, gkfs::config::rpc::chunksize); + if (left_pad != 0) { + GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start, left_pad); + chunk_id_start++; + } + GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + err_response = err.code().value(); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, + size); + err_response = EIO; + } + ABT_eventual_set(arg->eventual, &err_response, sizeof(int)); +} + +ChunkTruncateOperation::ChunkTruncateOperation(string path) : ChunkTruncateOperation{move(path), 1} {} + +ChunkTruncateOperation::ChunkTruncateOperation(string path, size_t n) : ChunkOperation{move(path), n} { + task_args_.resize(n); +} + +void ChunkTruncateOperation::cancel_all_tasks() { + ChunkOperation::cancel_all_tasks(); + task_args_.clear(); +} + +/** + * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed + */ +void ChunkTruncateOperation::truncate(size_t idx, size_t size) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->trace("ChunkMetaOperation::{}() enter: idx '{}' path '{}' size '{}'", __func__, idx, path_, + size); + + auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[idx]); // truncate file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkMetaOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.size = size; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkMetaOpException(err_str); + } +} + + +int ChunkTruncateOperation::wait_for_tasks() { + GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_); + int trunc_err = 0; + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * On error, cleanup eventuals and set written data to 0 as written data is corrupted + */ + for (auto& e : task_eventuals_) { + int* task_err = nullptr; + auto abt_err = ABT_eventual_wait(e, (void**) &task_err); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkTruncateOperation::{}() Error when waiting on ABT eventual", __func__); + trunc_err = EIO; + ABT_eventual_free(&e); + continue; + } + assert(task_err != nullptr); + if (*task_err != 0) { + trunc_err = *task_err; + } + ABT_eventual_free(&e); + } + return trunc_err; +} + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + const char* buf; + const gkfs::types::rpc_chnk_id_t* chnk_id; + size_t size; + off64_t off; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. + * This function is called by tasklets, as this function cannot be allowed to block. + * @return written_size is put into eventual to signal that it finished + */ +void ChunkWriteOperation::write_file_abt(void* _arg) { + // Unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + ssize_t wrote{0}; + try { + wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + wrote = -(err.code().value());; + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, + path); + wrote = -EIO; + } + ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); +} + +ChunkWriteOperation::ChunkWriteOperation(string path, size_t n) : ChunkOperation{move(path), n} { + task_args_.resize(n); +} + +void ChunkWriteOperation::cancel_all_tasks() { + ChunkOperation::cancel_all_tasks(); + task_args_.clear(); +} + +/** + * Write buffer from a single chunk referenced by its ID. Put task into IO queue. + * On failure the write operations is aborted, throwing an error, and cleaned up. + * The caller may repeat a failed call. + * @param chunk_id + * @param bulk_buf_ptr + * @param size + * @param offset + * @throws ChunkWriteOpException + */ +void +ChunkWriteOperation::write_async(size_t idx, const uint64_t chunk_id, const char* bulk_buf_ptr, + const size_t size, + const off64_t offset) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'", __func__, + idx, path_, size, offset); + + auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // written file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkWriteOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.buf = bulk_buf_ptr; // write_file_abt will treat buf as const and will not modify it + task_arg.chnk_id = chunk_id; + task_arg.size = size; + task_arg.off = offset; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkWriteOpException(err_str); + } +} + +/** + * Waits for all Argobots tasklets to finish and report back the write error code and the size written. + * @return + */ +pair ChunkWriteOperation::wait_for_tasks() { + GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'", __func__, path_); + size_t total_written = 0; + int io_err = 0; + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * On error, cleanup eventuals and set written data to 0 as written data is corrupted + */ + for (auto& e : task_eventuals_) { + ssize_t* task_size = nullptr; + auto abt_err = ABT_eventual_wait(e, (void**) &task_size); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkWriteOperation::{}() Error when waiting on ABT eventual", __func__); + io_err = EIO; + ABT_eventual_free(&e); + continue; + } + if (io_err != 0) { + ABT_eventual_free(&e); + continue; + } + assert(task_size != nullptr); + if (*task_size < 0) { + io_err = -(*task_size); + } else { + total_written += *task_size; + } + ABT_eventual_free(&e); + } + // in case of error set written size to zero as data would be corrupted + if (io_err != 0) + total_written = 0; + return make_pair(io_err, total_written); +} + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + char* buf; + const gkfs::types::rpc_chnk_id_t* chnk_id; + size_t size; + off64_t off; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. + * This function is called by tasklets, as this function cannot be allowed to block. + * @return read_size is put into eventual to signal that it finished + */ +void ChunkReadOperation::read_file_abt(void* _arg) { + //unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + ssize_t read = 0; + try { + // Under expected circumstances (error or no error) read_chunk will signal the eventual + read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + read = -(err.code().value());; + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, + path); + read = -EIO; + } + ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); +} + +ChunkReadOperation::ChunkReadOperation(string path, size_t n) : ChunkOperation{move(path), n} { + task_args_.resize(n); +} + +void ChunkReadOperation::cancel_all_tasks() { + ChunkOperation::cancel_all_tasks(); + task_args_.clear(); +} + +/** + * Read buffer to a single chunk referenced by its ID. Put task into IO queue. + * On failure the read operations is aborted, throwing an error, and cleaned up. + * The caller may repeat a failed call. + * @param chunk_id + * @param bulk_buf_ptr + * @param size + * @param offset + */ +void +ChunkReadOperation::read_async(size_t idx, const uint64_t chunk_id, char* bulk_buf_ptr, const size_t size, + const off64_t offset) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'", __func__, + idx, path_, size, offset); + + auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // read file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkReadOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.buf = bulk_buf_ptr; + task_arg.chnk_id = chunk_id; + task_arg.size = size; + task_arg.off = offset; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkReadOpException(err_str); + } + +} + +pair ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { + GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'", __func__, path_); + assert(args.chunk_ids->size() == task_args_.size()); + size_t total_read = 0; + int io_err = 0; + + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * As soon as an error is encountered, bulk_transfers will no longer be executed as the data would be corrupted + * The loop continues until all eventuals have been cleaned and freed. + */ + for (uint64_t idx = 0; idx < task_args_.size(); idx++) { + ssize_t* task_size = nullptr; + auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkReadOperation::{}() Error when waiting on ABT eventual", __func__); + io_err = EIO; + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + // error occured. stop processing but clean up + if (io_err != 0) { + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + assert(task_size != nullptr); + if (*task_size < 0) { + // sparse regions do not have chunk files and are therefore skipped + if (-(*task_size) == ENOENT) { + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + io_err = -(*task_size); // make error code > 0 + } else if (*task_size == 0) { + // read size of 0 is not an error and can happen because reading the end-of-file + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } else { + // successful case, push read data back to client + GKFS_DATA->spdlogger()->trace( + "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", + __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), + *task_size); + // TODO try, repeat do-while + assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); + auto margo_err = margo_bulk_transfer(args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, + args.origin_offsets->at(idx), args.local_bulk_handle, + args.local_offsets->at(idx), *task_size); + if (margo_err != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'", __func__, + margo_err); + io_err = EBUSY; + continue; + } + total_read += *task_size; + } + ABT_eventual_free(&task_eventuals_[idx]); + } + // in case of error set read size to zero as data would be corrupted + if (io_err != 0) + total_read = 0; + return make_pair(io_err, total_read); +} + +} // namespace data +} // namespace gkfs diff --git a/src/daemon/ops/metadentry.cpp b/src/daemon/ops/metadentry.cpp index 169c7ac36..d241b03bd 100644 --- a/src/daemon/ops/metadentry.cpp +++ b/src/daemon/ops/metadentry.cpp @@ -104,9 +104,16 @@ void update_size(const string& path, size_t io_size, off64_t offset, bool append * Remove metadentry if exists and try to remove all chunks for path * @param path * @return + * @throws gkfs::metadata::DBException, gkfs::data::ChunkStorageException */ -void remove_node(const string& path) { - GKFS_DATA->mdb()->remove(path); // remove metadentry +void remove(const string& path) { + /* + * try to remove metadata from kv store but catch NotFoundException which is not an error in this case + * because removes can be broadcast to catch all data chunks but only one node will hold the kv store entry. + */ + try { + GKFS_DATA->mdb()->remove(path); // remove metadata from KV store + } catch (const NotFoundException& e) {} GKFS_DATA->storage()->destroy_chunk_space(path); // destroys all chunks for the path on this node } diff --git a/src/daemon/util.cpp b/src/daemon/util.cpp index 92fa06221..8202cde1d 100644 --- a/src/daemon/util.cpp +++ b/src/daemon/util.cpp @@ -31,7 +31,7 @@ void populate_hosts_file() { throw runtime_error( fmt::format("Failed to open hosts file '{}': {}", hosts_file, strerror(errno))); } - lfstream << fmt::format("{} {}", get_my_hostname(true), RPC_DATA->self_addr_str()) << std::endl; + lfstream << fmt::format("{} {}", gkfs::rpc::get_my_hostname(true), RPC_DATA->self_addr_str()) << std::endl; if (!lfstream) { throw runtime_error( fmt::format("Failed to write on hosts file '{}': {}", hosts_file, strerror(errno))); diff --git a/src/global/rpc/rpc_util.cpp b/src/global/rpc/rpc_util.cpp index 8a38fa597..e5607fd33 100644 --- a/src/global/rpc/rpc_util.cpp +++ b/src/global/rpc/rpc_util.cpp @@ -24,6 +24,9 @@ extern "C" { using namespace std; +namespace gkfs { +namespace rpc { + /** * converts std bool to mercury bool * @param state @@ -89,4 +92,7 @@ string get_host_by_name(const string& hostname) { } freeaddrinfo(addr); return addr_str; -} \ No newline at end of file +} + +} // namespace rpc +} // namespace gkfs \ No newline at end of file -- GitLab From 321b4a42fa127d817a0099693cbbaeb4a20f446a Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 4 Mar 2020 11:54:29 +0100 Subject: [PATCH 02/10] Utils and distributor for guided mapping --- include/global/rpc/distributor.hpp | 21 ++++++++++++ src/global/rpc/distributor.cpp | 51 ++++++++++++++++++++++++++++++ utils/generate.py | 15 +++++++++ 3 files changed, 87 insertions(+) create mode 100644 utils/generate.py diff --git a/include/global/rpc/distributor.hpp b/include/global/rpc/distributor.hpp index 2a0b79ff3..36cbe08b9 100644 --- a/include/global/rpc/distributor.hpp +++ b/include/global/rpc/distributor.hpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include namespace gkfs { namespace rpc { @@ -69,6 +71,25 @@ public: std::vector locate_directory_metadata(const std::string& path) const override; }; +class GuidedDistributor : public Distributor { +private: + host_t localhost_; + unsigned int hosts_size_; + std::vector all_hosts_; + std::hash str_hash; + std::map< std::pair, host_t > mapping; +public: + GuidedDistributor(host_t localhost, unsigned int hosts_size); + + host_t localhost() const override; + + host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const override; + + host_t locate_file_metadata(const std::string& path) const override; + + std::vector locate_directory_metadata(const std::string& path) const override; +}; + } // namespace rpc } // namespace gkfs diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index ab0597853..2fe27e0df 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -68,5 +68,56 @@ locate_directory_metadata(const string& path) const { return {localhost_}; } + +GuidedDistributor:: +GuidedDistributor(host_t localhost, unsigned int hosts_size) : + localhost_(localhost), + hosts_size_(hosts_size), + all_hosts_(hosts_size) { + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + + // Fill map + // header read 4 /fluidda-XFIEL.00000001.00000158.mpio.bin 1 8 136 3 + + + std::string op, path; + unsigned int original_host, destination_host; + unsigned int chunk_id; + size_t size, offset; + std::ifstream mapfile; + mapfile.open("~/mapping.txt"); + while (mapfile >> op >> original_host >> path >> chunk_id >> size >> offset >> destination_host) + { + mapping[ std::make_pair (path, chunk_id) ] = destination_host; + } + mapfile.close(); + +} + +host_t GuidedDistributor:: +localhost() const { + return localhost_; +} + +host_t GuidedDistributor:: +locate_data(const string& path, const chunkid_t& chnk_id) const { + auto it = mapping.find( std::make_pair ( path, chnk_id) ); + + if (it != mapping.end()) + return it->second; + else + return str_hash(path + ::to_string(chnk_id)) % hosts_size_; +} + +host_t GuidedDistributor:: +locate_file_metadata(const string& path) const { + return str_hash(path) % hosts_size_; +} + +::vector GuidedDistributor:: +locate_directory_metadata(const string& path) const { + return all_hosts_; +} + } // namespace rpc } // namespace gkfs \ No newline at end of file diff --git a/utils/generate.py b/utils/generate.py new file mode 100644 index 000000000..47b7db3d0 --- /dev/null +++ b/utils/generate.py @@ -0,0 +1,15 @@ +import re +import sys +#[2020-03-03 10:50:45.513741 CET] [151530] [debug] host: 9, path: /fluidda-XFIEL.00000001.00000119.mpio.bin, chunks: 1, size: 524288, offset: 295200 +#(read).+(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+) +# Original host # path # chunkid # size # offset +destination = sys.argv[1] +file = sys.argv[2] +pattern = re.compile(r".+(read).+(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+)") + +with open(file) as f: + for line in f: + result = pattern.match(line) + if result: + #split = re.split(":\,",line) + print (result[1], result[3], result[5], result[7], result[9], result[11], destination) \ No newline at end of file -- GitLab From c44852cde1b76ce090ffe8fad517298977af18b7 Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Thu, 5 Mar 2020 11:36:13 +0100 Subject: [PATCH 03/10] More descriptive typenames in margo respond wrappers --- include/daemon/handler/rpc_util.hpp | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/include/daemon/handler/rpc_util.hpp b/include/daemon/handler/rpc_util.hpp index ea90df5de..dd29b4d87 100644 --- a/include/daemon/handler/rpc_util.hpp +++ b/include/daemon/handler/rpc_util.hpp @@ -25,8 +25,8 @@ extern "C" { namespace gkfs { namespace rpc { -template -inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t cleanup(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (bulk_handle) { ret = margo_bulk_free(*bulk_handle); @@ -51,8 +51,8 @@ inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* return ret; } -template -inline hg_return_t respond(hg_handle_t* handle, O* output) { +template +inline hg_return_t respond(hg_handle_t* handle, OutputType* output) { auto ret = HG_SUCCESS; if (output && handle) { ret = margo_respond(*handle, output); @@ -62,21 +62,21 @@ inline hg_return_t respond(hg_handle_t* handle, O* output) { return ret; } -template -inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) { auto ret = respond(handle, output); if (ret != HG_SUCCESS) return ret; - return cleanup(handle, input, static_cast(nullptr), bulk_handle); + return cleanup(handle, input, static_cast(nullptr), bulk_handle); } -template -inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output) { +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output) { return cleanup_respond(handle, input, output, nullptr); } -template -inline hg_return_t cleanup_respond(hg_handle_t* handle, O* output) { +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, OutputType* output) { auto ret = respond(handle, output); if (ret != HG_SUCCESS) return ret; -- GitLab From ee9ac08e56048a76aeccad7ada5b7b9b7f5bc207 Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Thu, 5 Mar 2020 12:38:14 +0100 Subject: [PATCH 04/10] gkfs::types::rpc_chnk_id_t -> gkfs::rpc::chnk_id_t. Drop typedef for using --- include/daemon/backend/data/chunk_storage.hpp | 10 +++++----- include/daemon/ops/data.hpp | 4 ++-- include/global/global_defs.hpp | 7 +++---- src/daemon/backend/data/chunk_storage.cpp | 12 ++++++------ src/daemon/ops/data.cpp | 4 ++-- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/include/daemon/backend/data/chunk_storage.hpp b/include/daemon/backend/data/chunk_storage.hpp index 08da9d56c..8b0f28d37 100644 --- a/include/daemon/backend/data/chunk_storage.hpp +++ b/include/daemon/backend/data/chunk_storage.hpp @@ -54,7 +54,7 @@ private: static inline std::string get_chunks_dir(const std::string& file_path); - static inline std::string get_chunk_path(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id); + static inline std::string get_chunk_path(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id); void init_chunk_space(const std::string& file_path) const; @@ -64,15 +64,15 @@ public: void destroy_chunk_space(const std::string& file_path) const; ssize_t - write_chunk(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, const char* buff, size_t size, + write_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buff, size_t size, off64_t offset) const; - ssize_t read_chunk(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, char* buf, size_t size, + ssize_t read_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, off64_t offset) const; - void trim_chunk_space(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_start); + void trim_chunk_space(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_start); - void truncate_chunk_file(const std::string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, off_t length); + void truncate_chunk_file(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length); ChunkStat chunk_stat() const; }; diff --git a/include/daemon/ops/data.hpp b/include/daemon/ops/data.hpp index be4177e2e..66d3b6bcc 100644 --- a/include/daemon/ops/data.hpp +++ b/include/daemon/ops/data.hpp @@ -111,7 +111,7 @@ private: struct chunk_write_args { const std::string* path; const char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; + gkfs::rpc::chnk_id_t chnk_id; size_t size; off64_t off; ABT_eventual eventual; @@ -141,7 +141,7 @@ private: struct chunk_read_args { const std::string* path; char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; + gkfs::rpc::chnk_id_t chnk_id; size_t size; off64_t off; ABT_eventual eventual; diff --git a/include/global/global_defs.hpp b/include/global/global_defs.hpp index 8bd958615..4675c8e8a 100644 --- a/include/global/global_defs.hpp +++ b/include/global/global_defs.hpp @@ -18,6 +18,9 @@ namespace gkfs { // These constexpr set the RPC's identity and which handler the receiver end should use namespace rpc { + +using chnk_id_t = unsigned long; + namespace tag { constexpr auto fs_config = "rpc_srv_fs_config"; @@ -45,10 +48,6 @@ constexpr auto ofi_tcp = "ofi+tcp"; } // namespace protocol } // namespace rpc -namespace types { -// typedefs -typedef unsigned long rpc_chnk_id_t; -} // namespace types } // namespace gkfs #endif //GEKKOFS_GLOBAL_DEFS_HPP diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index dae4da983..164bf7bcf 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -44,7 +44,7 @@ string ChunkStorage::get_chunks_dir(const string& file_path) { return chunk_dir; } -string ChunkStorage::get_chunk_path(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id) { +string ChunkStorage::get_chunk_path(const string& file_path, gkfs::rpc::chnk_id_t chunk_id) { return fmt::format("{}/{}", get_chunks_dir(file_path), chunk_id); } @@ -120,7 +120,7 @@ void ChunkStorage::destroy_chunk_space(const string& file_path) const { * @throws ChunkStorageException (caller will handle eventual signalling) */ ssize_t -ChunkStorage::write_chunk(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, const char* buff, size_t size, +ChunkStorage::write_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buff, size_t size, off64_t offset) const { assert((offset + size) <= chunksize_); @@ -163,7 +163,7 @@ ChunkStorage::write_chunk(const string& file_path, gkfs::types::rpc_chnk_id_t ch * @param eventual */ ssize_t -ChunkStorage::read_chunk(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, char* buf, size_t size, +ChunkStorage::read_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, off64_t offset) const { assert((offset + size) <= chunksize_); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); @@ -230,7 +230,7 @@ ChunkStorage::read_chunk(const string& file_path, gkfs::types::rpc_chnk_id_t chu * @param chunk_end * @throws ChunkStorageException */ -void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_start) { +void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_t chunk_start) { auto chunk_dir = absolute(get_chunks_dir(file_path)); const bfs::directory_iterator end; @@ -259,9 +259,9 @@ void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::types::rpc_ch * @param length * @throws ChunkStorageException */ -void ChunkStorage::truncate_chunk_file(const string& file_path, gkfs::types::rpc_chnk_id_t chunk_id, off_t length) { +void ChunkStorage::truncate_chunk_file(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length) { auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - assert(length > 0 && static_cast(length) <= chunksize_); + assert(length > 0 && static_cast(length) <= chunksize_); int ret = truncate64(chunk_path.c_str(), length); if (ret == -1) { auto err_str = fmt::format("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp index 1681b1559..27524f77f 100644 --- a/src/daemon/ops/data.cpp +++ b/src/daemon/ops/data.cpp @@ -162,7 +162,7 @@ int ChunkTruncateOperation::wait_for_tasks() { * Used by an argobots tasklet. Argument args has the following fields: * const string* path; const char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; + const gkfs::rpc::chnk_id_t* chnk_id; size_t size; off64_t off; ABT_eventual* eventual; @@ -281,7 +281,7 @@ pair ChunkWriteOperation::wait_for_tasks() { * Used by an argobots tasklet. Argument args has the following fields: * const string* path; char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; + const gkfs::rpc::chnk_id_t* chnk_id; size_t size; off64_t off; ABT_eventual* eventual; -- GitLab From 936a377d6f350a54c77c38e11f98b4695a827923 Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Thu, 5 Mar 2020 15:29:20 +0100 Subject: [PATCH 05/10] Adding asserts, fixing comments, removing pass-by-value --- include/daemon/backend/data/chunk_storage.hpp | 2 +- include/daemon/ops/data.hpp | 12 +++---- src/daemon/backend/data/chunk_storage.cpp | 33 +++++++++---------- src/daemon/ops/data.cpp | 21 +++++++----- 4 files changed, 35 insertions(+), 33 deletions(-) diff --git a/include/daemon/backend/data/chunk_storage.hpp b/include/daemon/backend/data/chunk_storage.hpp index 8b0f28d37..1bf94d797 100644 --- a/include/daemon/backend/data/chunk_storage.hpp +++ b/include/daemon/backend/data/chunk_storage.hpp @@ -59,7 +59,7 @@ private: void init_chunk_space(const std::string& file_path) const; public: - ChunkStorage(std::string path, size_t chunksize); + ChunkStorage(std::string& path, size_t chunksize); void destroy_chunk_space(const std::string& file_path) const; diff --git a/include/daemon/ops/data.hpp b/include/daemon/ops/data.hpp index 66d3b6bcc..1df6c8f03 100644 --- a/include/daemon/ops/data.hpp +++ b/include/daemon/ops/data.hpp @@ -64,14 +64,14 @@ class ChunkOperation { protected: - std::string path_; + const std::string path_; std::vector abt_tasks_; std::vector task_eventuals_; virtual void cancel_all_tasks(); - explicit ChunkOperation(std::string path); + explicit ChunkOperation(const std::string& path); ChunkOperation(std::string path, size_t n); @@ -94,9 +94,9 @@ private: public: - explicit ChunkTruncateOperation(std::string path); + explicit ChunkTruncateOperation(const std::string& path); - ChunkTruncateOperation(std::string path, size_t n); + ChunkTruncateOperation(const std::string& path, size_t n); void cancel_all_tasks() override; @@ -123,7 +123,7 @@ private: public: - ChunkWriteOperation(std::string path, size_t n); + ChunkWriteOperation(const std::string& path, size_t n); void cancel_all_tasks() override; @@ -163,7 +163,7 @@ public: std::vector* chunk_ids; }; - ChunkReadOperation(std::string path, size_t n); + ChunkReadOperation(const std::string& path, size_t n); void cancel_all_tasks() override; diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index 164bf7bcf..7ee130021 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -49,10 +49,10 @@ string ChunkStorage::get_chunk_path(const string& file_path, gkfs::rpc::chnk_id_ } /** - * Creates a chunk directories are all chunk files are placed in. + * Creates a chunk directory that all chunk files are placed in. * The path to the real file will be used as the directory name * @param file_path - * @returns 0 on success or errno on failure + * @throws ChunkStorageException on error */ void ChunkStorage::init_chunk_space(const string& file_path) const { auto chunk_dir = absolute(get_chunks_dir(file_path)); @@ -72,8 +72,8 @@ void ChunkStorage::init_chunk_space(const string& file_path) const { * @param chunksize * @throws ChunkStorageException */ -ChunkStorage::ChunkStorage(string path, const size_t chunksize) : - root_path_(std::move(path)), +ChunkStorage::ChunkStorage(string& path, const size_t chunksize) : + root_path_(path), chunksize_(chunksize) { /* Initialize logger */ log_ = spdlog::get(LOGGER_NAME); @@ -107,7 +107,7 @@ void ChunkStorage::destroy_chunk_space(const string& file_path) const { /** * Writes a chunk file. - * On failure returns a negative error code corresponding to `-errno`. + * On failure throws ChunkStorageException with encapsulated error code * * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pwrite behavior * @@ -152,7 +152,7 @@ ChunkStorage::write_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id /** * Read from a chunk file. - * On failure returns a negative error code corresponding to `-errno`. + * On failure throws ChunkStorageException with encapsulated error code * * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pread behavior * @param file_path @@ -161,6 +161,7 @@ ChunkStorage::write_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id * @param size * @param offset * @param eventual + * @throws ChunkStorageException (caller will handle eventual signalling) */ ssize_t ChunkStorage::read_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, @@ -217,17 +218,15 @@ ChunkStorage::read_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, } /** - * Delete all chunks starting with chunk a chunk id. - * Note eventual consistency here: While chunks are removed, there is no lock that prevents - * other processes from modifying anything in that directory. - * It is the application's responsibility to stop modifying the file while truncate is executed - * - * If an error is encountered when removing a chunk file, the function will still remove all files and - * report the error afterwards with ChunkStorageException. - * +* Delete all chunks starting with chunk a chunk id. +* Note eventual consistency here: While chunks are removed, there is no lock that prevents +* other processes from modifying anything in that directory. +* It is the application's responsibility to stop modifying the file while truncate is executed +* +* If an error is encountered when removing a chunk file, the function will still remove all files and +* report the error afterwards with ChunkStorageException. * @param file_path * @param chunk_start - * @param chunk_end * @throws ChunkStorageException */ void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_t chunk_start) { @@ -237,7 +236,7 @@ void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_ auto err_flag = false; for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { auto chunk_path = chunk_file->path(); - auto chunk_id = ::stoul(chunk_path.filename().c_str()); + auto chunk_id = std::stoul(chunk_path.filename().c_str()); if (chunk_id >= chunk_start) { auto err = unlink(chunk_path.c_str()); if (err == -1 && errno != ENOENT) { @@ -262,7 +261,7 @@ void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_ void ChunkStorage::truncate_chunk_file(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length) { auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); assert(length > 0 && static_cast(length) <= chunksize_); - int ret = truncate64(chunk_path.c_str(), length); + int ret = truncate(chunk_path.c_str(), length); if (ret == -1) { auto err_str = fmt::format("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp index 27524f77f..49bb31d2d 100644 --- a/src/daemon/ops/data.cpp +++ b/src/daemon/ops/data.cpp @@ -46,9 +46,9 @@ void ChunkOperation::cancel_all_tasks() { task_eventuals_.clear(); } -ChunkOperation::ChunkOperation(string path) : ChunkOperation(move(path), 1) {} +ChunkOperation::ChunkOperation(const string& path) : ChunkOperation(path, 1) {} -ChunkOperation::ChunkOperation(string path, size_t n) : path_(::move(path)) { +ChunkOperation::ChunkOperation(string path, size_t n) : path_(std::move(path)) { // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults abt_tasks_.resize(n); task_eventuals_.resize(n); @@ -67,6 +67,7 @@ ChunkOperation::~ChunkOperation() { * @return error is put into eventual to signal that it finished */ void ChunkTruncateOperation::truncate_abt(void* _arg) { + assert(_arg); // Unpack args auto* arg = static_cast(_arg); const string& path = *(arg->path); @@ -93,9 +94,9 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { ABT_eventual_set(arg->eventual, &err_response, sizeof(int)); } -ChunkTruncateOperation::ChunkTruncateOperation(string path) : ChunkTruncateOperation{move(path), 1} {} +ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} -ChunkTruncateOperation::ChunkTruncateOperation(string path, size_t n) : ChunkOperation{move(path), n} { +ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } @@ -171,6 +172,7 @@ int ChunkTruncateOperation::wait_for_tasks() { * @return written_size is put into eventual to signal that it finished */ void ChunkWriteOperation::write_file_abt(void* _arg) { + assert(_arg); // Unpack args auto* arg = static_cast(_arg); const string& path = *(arg->path); @@ -179,7 +181,7 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); } catch (const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); - wrote = -(err.code().value());; + wrote = -(err.code().value()); } catch (const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, path); @@ -188,7 +190,7 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); } -ChunkWriteOperation::ChunkWriteOperation(string path, size_t n) : ChunkOperation{move(path), n} { +ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } @@ -224,7 +226,7 @@ ChunkWriteOperation::write_async(size_t idx, const uint64_t chunk_id, const char auto& task_arg = task_args_[idx]; task_arg.path = &path_; - task_arg.buf = bulk_buf_ptr; // write_file_abt will treat buf as const and will not modify it + task_arg.buf = bulk_buf_ptr; task_arg.chnk_id = chunk_id; task_arg.size = size; task_arg.off = offset; @@ -290,6 +292,7 @@ pair ChunkWriteOperation::wait_for_tasks() { * @return read_size is put into eventual to signal that it finished */ void ChunkReadOperation::read_file_abt(void* _arg) { + assert(_arg); //unpack args auto* arg = static_cast(_arg); const string& path = *(arg->path); @@ -299,7 +302,7 @@ void ChunkReadOperation::read_file_abt(void* _arg) { read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); } catch (const ChunkStorageException& err) { GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); - read = -(err.code().value());; + read = -(err.code().value()); } catch (const ::exception& err) { GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, path); @@ -308,7 +311,7 @@ void ChunkReadOperation::read_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); } -ChunkReadOperation::ChunkReadOperation(string path, size_t n) : ChunkOperation{move(path), n} { +ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } -- GitLab From 8fb5c023183b7257cbbea222bfe303c5e6b1e39d Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Thu, 5 Mar 2020 18:06:07 +0100 Subject: [PATCH 06/10] ChunkOperation use CRTP instead of runtime polymorphism --- include/daemon/ops/data.hpp | 61 ++++++++++++++++++++++++++++++------- src/daemon/ops/data.cpp | 61 ++++++++----------------------------- 2 files changed, 62 insertions(+), 60 deletions(-) diff --git a/include/daemon/ops/data.hpp b/include/daemon/ops/data.hpp index 1df6c8f03..dcfb41b23 100644 --- a/include/daemon/ops/data.hpp +++ b/include/daemon/ops/data.hpp @@ -58,8 +58,9 @@ public: * * In the future, this class may be used to provide failure tolerance for IO tasks * - * Abstract base class without public constructor: + * Base class using the CRTP idiom */ +template class ChunkOperation { protected: @@ -69,17 +70,46 @@ protected: std::vector abt_tasks_; std::vector task_eventuals_; - virtual void cancel_all_tasks(); +public: - explicit ChunkOperation(const std::string& path); + explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {}; - ChunkOperation(std::string path, size_t n); + ChunkOperation(std::string path, size_t n) : path_(std::move(path)) { + // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults + abt_tasks_.resize(n); + task_eventuals_.resize(n); + }; - ~ChunkOperation(); + ~ChunkOperation() { + cancel_all_tasks(); + } + + /** + * Cleans up and cancels all tasks in flight + */ + void cancel_all_tasks() { + GKFS_DATA->spdlogger()->trace("{}() enter", __func__); + for (auto& task : abt_tasks_) { + if (task) { + ABT_task_cancel(task); + ABT_task_free(&task); + } + } + for (auto& eventual : task_eventuals_) { + if (eventual) { + ABT_eventual_reset(eventual); + ABT_eventual_free(&eventual); + } + } + abt_tasks_.clear(); + task_eventuals_.clear(); + static_cast(this)->clear_task_args(); + } }; -class ChunkTruncateOperation : public ChunkOperation { +class ChunkTruncateOperation : public ChunkOperation { + friend class ChunkOperation; private: struct chunk_truncate_args { @@ -92,20 +122,24 @@ private: static void truncate_abt(void* _arg); + void clear_task_args(); + public: explicit ChunkTruncateOperation(const std::string& path); ChunkTruncateOperation(const std::string& path, size_t n); - void cancel_all_tasks() override; + ~ChunkTruncateOperation() = default; void truncate(size_t idx, size_t size); int wait_for_tasks(); }; -class ChunkWriteOperation : public ChunkOperation { +class ChunkWriteOperation : public ChunkOperation { + friend class ChunkOperation; + private: struct chunk_write_args { @@ -121,11 +155,13 @@ private: static void write_file_abt(void* _arg); + void clear_task_args(); + public: ChunkWriteOperation(const std::string& path, size_t n); - void cancel_all_tasks() override; + ~ChunkWriteOperation() = default; void write_async(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset); @@ -134,7 +170,8 @@ public: }; -class ChunkReadOperation : public ChunkOperation { +class ChunkReadOperation : public ChunkOperation { + friend class ChunkOperation; private: @@ -151,6 +188,8 @@ private: static void read_file_abt(void* _arg); + void clear_task_args(); + public: struct bulk_args { @@ -165,7 +204,7 @@ public: ChunkReadOperation(const std::string& path, size_t n); - void cancel_all_tasks() override; + ~ChunkReadOperation() = default; void read_async(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset); diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp index 49bb31d2d..52315e8c3 100644 --- a/src/daemon/ops/data.cpp +++ b/src/daemon/ops/data.cpp @@ -25,39 +25,6 @@ using namespace std; namespace gkfs { namespace data { -/** - * Cleans up and cancels all tasks in flight - */ -void ChunkOperation::cancel_all_tasks() { - GKFS_DATA->spdlogger()->trace("{}() enter", __func__); - for (auto& task : abt_tasks_) { - if (task) { - ABT_task_cancel(task); - ABT_task_free(&task); - } - } - for (auto& eventual : task_eventuals_) { - if (eventual) { - ABT_eventual_reset(eventual); - ABT_eventual_free(&eventual); - } - } - abt_tasks_.clear(); - task_eventuals_.clear(); -} - -ChunkOperation::ChunkOperation(const string& path) : ChunkOperation(path, 1) {} - -ChunkOperation::ChunkOperation(string path, size_t n) : path_(std::move(path)) { - // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults - abt_tasks_.resize(n); - task_eventuals_.resize(n); -} - -ChunkOperation::~ChunkOperation() { - cancel_all_tasks(); -} - /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; @@ -94,17 +61,16 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { ABT_eventual_set(arg->eventual, &err_response, sizeof(int)); } +void ChunkTruncateOperation::clear_task_args() { + task_args_.clear(); +} + ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } -void ChunkTruncateOperation::cancel_all_tasks() { - ChunkOperation::cancel_all_tasks(); - task_args_.clear(); -} - /** * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed */ @@ -190,13 +156,12 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); } -ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { - task_args_.resize(n); +void ChunkWriteOperation::clear_task_args() { + task_args_.clear(); } -void ChunkWriteOperation::cancel_all_tasks() { - ChunkOperation::cancel_all_tasks(); - task_args_.clear(); +ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); } /** @@ -311,13 +276,12 @@ void ChunkReadOperation::read_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); } -ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { - task_args_.resize(n); +void ChunkReadOperation::clear_task_args() { + task_args_.clear(); } -void ChunkReadOperation::cancel_all_tasks() { - ChunkOperation::cancel_all_tasks(); - task_args_.clear(); +ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); } /** @@ -403,7 +367,6 @@ pair ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_ar "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); - // TODO try, repeat do-while assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer(args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, args.origin_offsets->at(idx), args.local_bulk_handle, -- GitLab From 3c412e10e2990b637123257337801704c6417554 Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Fri, 13 Mar 2020 14:49:54 +0100 Subject: [PATCH 07/10] Readding code that was removed by accident --- include/client/gkfs_functions.hpp | 2 ++ src/client/gkfs_functions.cpp | 23 +++++++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/include/client/gkfs_functions.hpp b/include/client/gkfs_functions.hpp index 15e5a215f..7ae71ce99 100644 --- a/include/client/gkfs_functions.hpp +++ b/include/client/gkfs_functions.hpp @@ -37,6 +37,8 @@ int gkfs_stat(const std::string& path, struct stat* buf, bool follow_links = tru int gkfs_statfs(struct statfs* buf); +int gkfs_statvfs(struct statvfs* buf); + off64_t gkfs_lseek(unsigned int fd, off64_t offset, unsigned int whence); off64_t gkfs_lseek(std::shared_ptr gkfs_fd, off64_t offset, unsigned int whence); diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 037dab5eb..0d9bf7476 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -256,6 +256,29 @@ int gkfs_statfs(struct statfs* buf) { return 0; } +int gkfs_statvfs(struct statvfs* buf) { + gkfs::rpc::ChunkStat blk_stat{}; + try { + blk_stat = gkfs::rpc::forward_get_chunk_stat(); + } catch (const std::exception& e) { + LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + return -1; + } + buf->f_bsize = blk_stat.chunk_size; + buf->f_blocks = blk_stat.chunk_total; + buf->f_bfree = blk_stat.chunk_free; + buf->f_bavail = blk_stat.chunk_free; + buf->f_files = 0; + buf->f_ffree = 0; + buf->f_favail = 0; + buf->f_fsid = 0; + buf->f_namemax = path::max_length; + buf->f_frsize = 0; + buf->f_flag = + ST_NOATIME | ST_NODIRATIME | ST_NOSUID | ST_NODEV | ST_SYNCHRONOUS; + return 0; +} + off_t gkfs_lseek(unsigned int fd, off_t offset, unsigned int whence) { return gkfs_lseek(CTX->file_map()->get(fd), offset, whence); } -- GitLab From 93b4fe65aadc857b902359f283e3f33a1eca04a6 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 16 Mar 2020 14:20:23 +0100 Subject: [PATCH 08/10] Last update --- src/client/preload.cpp | 8 ++++++-- src/global/rpc/distributor.cpp | 30 ++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/client/preload.cpp b/src/client/preload.cpp index ad09cff32..e078e6e23 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -98,9 +98,13 @@ void init_ld_environment_() { } /* Setup distributor */ - auto simple_hash_dist = std::make_shared(CTX->local_host_id(), + // auto simple_hash_dist = std::make_shared(CTX->local_host_id(), + // CTX->hosts().size()); + // CTX->distributor(simple_hash_dist); + + auto guided_dist = std::make_shared(CTX->local_host_id(), CTX->hosts().size()); - CTX->distributor(simple_hash_dist); + CTX->distributor(guided_dist); LOG(INFO, "Retrieving file system configuration..."); diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index 2fe27e0df..f48626591 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -80,18 +80,25 @@ GuidedDistributor(host_t localhost, unsigned int hosts_size) : // header read 4 /fluidda-XFIEL.00000001.00000158.mpio.bin 1 8 136 3 - std::string op, path; + /* std::string op, path; unsigned int original_host, destination_host; unsigned int chunk_id; size_t size, offset; std::ifstream mapfile; - mapfile.open("~/mapping.txt"); + std::ofstream mapout; + mapfile.open("/home/bsc25/bsc25015/mapping.txt"); while (mapfile >> op >> original_host >> path >> chunk_id >> size >> offset >> destination_host) { - mapping[ std::make_pair (path, chunk_id) ] = destination_host; + mapping[ std::make_pair (path, chunk_id) ] = destination_host; + + mapout.open("/home/bsc25/bsc25015/mapout.txt", std::ofstream::app); + + mapout << "C " << path << " " << chunk_id << " --> " << destination_host << std::endl; + mapout.close(); + } mapfile.close(); - +*/ } host_t GuidedDistributor:: @@ -101,11 +108,18 @@ localhost() const { host_t GuidedDistributor:: locate_data(const string& path, const chunkid_t& chnk_id) const { - auto it = mapping.find( std::make_pair ( path, chnk_id) ); - +/* auto locate = std::make_pair(path, chnk_id); + auto it = mapping.find(locate); if (it != mapping.end()) + { + std::ofstream mapout; + mapout.open("/home/bsc25/bsc25015/mapout.txt", std::ofstream::app); + mapout << path << " " << chnk_id << " --> " << it->second << std::endl; + mapout.close(); + return it->second; - else + } + else */ return str_hash(path + ::to_string(chnk_id)) % hosts_size_; } @@ -120,4 +134,4 @@ locate_directory_metadata(const string& path) const { } } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs -- GitLab From dcb0f6662452f0e21da0a146f5472fb894fb382d Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Wed, 18 Mar 2020 10:30:14 +0100 Subject: [PATCH 09/10] Fixing index loop bug during I/O on the daemon --- src/daemon/handler/srv_data.cpp | 34 ++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 289a294b8..76c78e093 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -48,8 +48,9 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, - in.path, bulk_size, in.offset); + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", + __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* * 2. Set up buffers for pull bulk transfers */ @@ -98,14 +99,20 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { uint64_t local_offset; // object for asynchronous disk IO gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n}; + /* * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk */ // Start to look for a chunk that hashes to this host with the first chunk in the buffer - for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { + for (auto chnk_id_file = in.chunk_start; + chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - if (distributor.locate_data(in.path, chnk_id_file) != host_id) + if (distributor.locate_data(in.path, chnk_id_file) != host_id) { + GKFS_DATA->spdlogger()->trace( + "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", + __func__, chnk_id_file, host_id, chnk_id_curr); continue; + } chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { @@ -172,6 +179,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { } // Sanity check that all chunks where detected in previous loop + // TODO don't proceed if that happens. if (chnk_size_left_host != 0) GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__, chnk_size_left_host); @@ -221,8 +229,9 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, - in.path, bulk_size, in.offset); + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", + __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* * 2. Set up buffers for pull bulk transfers @@ -268,10 +277,15 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk */ // Start to look for a chunk that hashes to this host with the first chunk in the buffer - for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { + for (auto chnk_id_file = in.chunk_start; + chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - if (distributor.locate_data(in.path, chnk_id_file) != host_id) + if (distributor.locate_data(in.path, chnk_id_file) != host_id) { + GKFS_DATA->spdlogger()->trace( + "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", + __func__, chnk_id_file, host_id, chnk_id_curr); continue; + } chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { @@ -309,7 +323,8 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { } try { // start tasklet for read operation - chunk_read_op.read_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + chunk_read_op.read_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], + chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); } catch (const gkfs::data::ChunkReadOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); @@ -318,6 +333,7 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { chnk_id_curr++; } // Sanity check that all chunks where detected in previous loop + // TODO error out. If we continue this will crash the server when sending results back that don't exist. if (chnk_size_left_host != 0) GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__, chnk_size_left_host); -- GitLab From 1db65294254f6e3a3597d9395a017af10f1c7c02 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 31 Mar 2020 17:00:50 +0200 Subject: [PATCH 10/10] Last updates, resolved mistake with chunk_n vs chunk_id --- include/global/rpc/distributor.hpp | 4 ++- src/client/preload.cpp | 9 +++--- src/client/preload_util.cpp | 3 +- src/client/rpc/forward_data.cpp | 9 +++++- src/daemon/backend/data/chunk_storage.cpp | 2 +- src/daemon/handler/srv_data.cpp | 6 ++-- src/daemon/handler/srv_metadata.cpp | 2 +- src/daemon/ops/data.cpp | 10 +++---- src/global/rpc/distributor.cpp | 35 ++++++++++------------- utils/generate.py | 9 +++++- 10 files changed, 50 insertions(+), 39 deletions(-) diff --git a/include/global/rpc/distributor.hpp b/include/global/rpc/distributor.hpp index 36cbe08b9..bf91f5e44 100644 --- a/include/global/rpc/distributor.hpp +++ b/include/global/rpc/distributor.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include namespace gkfs { @@ -77,7 +78,8 @@ private: unsigned int hosts_size_; std::vector all_hosts_; std::hash str_hash; - std::map< std::pair, host_t > mapping; + //std::unordered_map< std::pair, host_t > mapping; +std::unordered_map< std::string, host_t > mapping; public: GuidedDistributor(host_t localhost, unsigned int hosts_size); diff --git a/src/client/preload.cpp b/src/client/preload.cpp index e078e6e23..c5a1c0543 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -98,12 +98,11 @@ void init_ld_environment_() { } /* Setup distributor */ - // auto simple_hash_dist = std::make_shared(CTX->local_host_id(), - // CTX->hosts().size()); - // CTX->distributor(simple_hash_dist); + //auto simple_hash_dist = std::make_shared(CTX->local_host_id(), + // CTX->hosts().size()); + //CTX->distributor(simple_hash_dist); - auto guided_dist = std::make_shared(CTX->local_host_id(), - CTX->hosts().size()); + auto guided_dist = std::make_shared(CTX->local_host_id(), CTX->hosts().size()); CTX->distributor(guided_dist); LOG(INFO, "Retrieving file system configuration..."); diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index e9663604a..0820cd62e 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -214,6 +214,7 @@ void load_hosts() { ::mt19937 g(rd()); // seed the random generator ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector // lookup addresses and put abstract server addresses into rpc_addresses + ::sort(host_ids.begin(), host_ids.end()); for (const auto& id: host_ids) { const auto& hostname = hosts.at(id).first; @@ -239,4 +240,4 @@ void load_hosts() { } } // namespace util -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index 3c850a218..afa98c8ad 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -55,6 +55,10 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; + + //LOG(DEBUG, "path: \"{}\", append_flag:{}, in_offset:{}, offset:{}, write_size: {}, updated_metadentry_size:{}, chnk_start:{}, chnk_end:{}, ", + // path, append_flag, in_offset, offset, write_size, updated_metadentry_size, chnk_start, chnk_end); + for (uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) { auto target = CTX->distributor()->locate_data(path, chnk_id); @@ -306,6 +310,9 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const LOG(DEBUG, "host: {}, path: {}, chunks: {}, size: {}, offset: {}", target, path, in.chunk_n(), total_chunk_size, in.offset()); + // LOG(INFO, "read host: {}, path: {}, chunk_start: {}, chunk_end: {}", + // target, path, chnk_start, chnk_end); + } catch (const std::exception& ex) { LOG(ERROR, "Unable to send non-blocking rpc for path \"{}\" " "[peer: {}]", path, target); @@ -485,4 +492,4 @@ ChunkStat forward_get_chunk_stat() { } } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index 7ee130021..819ed1c50 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -97,7 +97,7 @@ void ChunkStorage::destroy_chunk_space(const string& file_path) const { try { // Note: remove_all does not throw an error when path doesn't exist. auto n = bfs::remove_all(chunk_dir); - log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); + log_->info("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); } catch (const bfs::filesystem_error& e) { auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, chunk_dir, e.what()); diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 76c78e093..c3c74c5e8 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -48,7 +48,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug( + GKFS_DATA->spdlogger()->info( "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* @@ -72,7 +72,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + gkfs::rpc::GuidedDistributor distributor(host_id, host_size); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); @@ -254,7 +254,7 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + gkfs::rpc::GuidedDistributor distributor(host_id, host_size); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 2e19ac3d1..bcd066e76 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -133,7 +133,7 @@ static hg_return_t rpc_srv_remove(hg_handle_t handle) { if (ret != HG_SUCCESS) GKFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); - GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); + GKFS_DATA->spdlogger()->info("{}() Got remove RPC with path '{}'", __func__, in.path); // Remove metadentry if exists on the node and remove all chunks for that file try { diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp index 52315e8c3..5c68b51e7 100644 --- a/src/daemon/ops/data.cpp +++ b/src/daemon/ops/data.cpp @@ -76,7 +76,7 @@ ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : C */ void ChunkTruncateOperation::truncate(size_t idx, size_t size) { assert(idx < task_args_.size()); - GKFS_DATA->spdlogger()->trace("ChunkMetaOperation::{}() enter: idx '{}' path '{}' size '{}'", __func__, idx, path_, + GKFS_DATA->spdlogger()->info("ChunkMetaOperation::{}() enter: idx '{}' path '{}' size '{}'", __func__, idx, path_, size); auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[idx]); // truncate file return value @@ -179,8 +179,8 @@ ChunkWriteOperation::write_async(size_t idx, const uint64_t chunk_id, const char const size_t size, const off64_t offset) { assert(idx < task_args_.size()); - GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'", __func__, - idx, path_, size, offset); + GKFS_DATA->spdlogger()->debug("ChunkWriteOperation::{}() enter: idx '{}' path '{}' chunkid '{}' size '{}' offset '{}'", __func__, + idx, path_, chunk_id, size, offset); auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // written file return value if (abt_err != ABT_SUCCESS) { @@ -297,8 +297,8 @@ void ChunkReadOperation::read_async(size_t idx, const uint64_t chunk_id, char* bulk_buf_ptr, const size_t size, const off64_t offset) { assert(idx < task_args_.size()); - GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'", __func__, - idx, path_, size, offset); + GKFS_DATA->spdlogger()->info("ChunkReadOperation::{}() enter: idx '{}' path '{}' chunk_id '{}' size '{}' offset '{}'", __func__, + idx, path_, chunk_id, size, offset); auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // read file return value if (abt_err != ABT_SUCCESS) { diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index f48626591..d09441118 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -80,25 +80,24 @@ GuidedDistributor(host_t localhost, unsigned int hosts_size) : // header read 4 /fluidda-XFIEL.00000001.00000158.mpio.bin 1 8 136 3 - /* std::string op, path; + std::string op, path; unsigned int original_host, destination_host; - unsigned int chunk_id; + chunkid_t chunk_id; size_t size, offset; std::ifstream mapfile; std::ofstream mapout; - mapfile.open("/home/bsc25/bsc25015/mapping.txt"); - while (mapfile >> op >> original_host >> path >> chunk_id >> size >> offset >> destination_host) + mapfile.open("/home/bsc25/bsc25015/mm32.txt"); + while (mapfile >> path >> chunk_id >> destination_host) { - mapping[ std::make_pair (path, chunk_id) ] = destination_host; - - mapout.open("/home/bsc25/bsc25015/mapout.txt", std::ofstream::app); + //mapping[ std::make_pair (path, chunk_id) ] = destination_host; + mapping[ path + ::to_string(chunk_id) ] = destination_host; + // mapout.open("/home/bsc25/bsc25015/mapout.txt", std::ofstream::app); - mapout << "C " << path << " " << chunk_id << " --> " << destination_host << std::endl; - mapout.close(); + // mapout << "C " << path << " " << chunk_id << " --> " << destination_host << std::endl; + // mapout.close(); } - mapfile.close(); -*/ + mapfile.close(); } host_t GuidedDistributor:: @@ -108,19 +107,15 @@ localhost() const { host_t GuidedDistributor:: locate_data(const string& path, const chunkid_t& chnk_id) const { -/* auto locate = std::make_pair(path, chnk_id); + //auto locate = std::make_pair(path, chnk_id); + auto locate = path + ::to_string(chnk_id); auto it = mapping.find(locate); - if (it != mapping.end()) + if (it != mapping.end()) { - std::ofstream mapout; - mapout.open("/home/bsc25/bsc25015/mapout.txt", std::ofstream::app); - mapout << path << " " << chnk_id << " --> " << it->second << std::endl; - mapout.close(); - return it->second; } - else */ - return str_hash(path + ::to_string(chnk_id)) % hosts_size_; + else + return str_hash(locate) % hosts_size_; } host_t GuidedDistributor:: diff --git a/utils/generate.py b/utils/generate.py index 47b7db3d0..089def390 100644 --- a/utils/generate.py +++ b/utils/generate.py @@ -7,9 +7,16 @@ destination = sys.argv[1] file = sys.argv[2] pattern = re.compile(r".+(read).+(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+)") +# [2020-03-30 14:58:09.938632 CEST] [81763] [info] host: 6, path: /fluidda-XFIEL.00000001.00000181.mpio.bin, chunks: 1, size: 8, offset: 168 +# [2020-03-31 13:30:06.280897 CEST] [186656] [info] read host: 8, path: /fluidda-XFIEL.00000001.00000080.mpio.bin, chunk_start: 412, chunk_end: 470 + +pattern = re.compile(r".*(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+)") + +pattern = re.compile(r".+(read).+(host: )(\d+).+(path: )(.+),.+(chunk_start: )(\d+).+(chunk_end: )(\d+)") with open(file) as f: for line in f: result = pattern.match(line) if result: #split = re.split(":\,",line) - print (result[1], result[3], result[5], result[7], result[9], result[11], destination) \ No newline at end of file + for i in range(int(result[7]), int(result[9])+1): + print (result[5], i, destination) -- GitLab