diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index 95babcb0c50a969df18f2bb91c4ea7d8f6938ca5..ab1f575a8e060c8c1ef64de271cb392e279f3f78 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -2092,11 +2092,13 @@ struct chunk_stat { public: output() : + m_err(), m_chunk_size(), m_chunk_total(), m_chunk_free() {} - output(uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) : + output(int32_t err, uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) : + m_err(err), m_chunk_size(chunk_size), m_chunk_total(chunk_total), m_chunk_free(chunk_free) {} @@ -2111,11 +2113,17 @@ struct chunk_stat { explicit output(const rpc_chunk_stat_out_t& out) { + m_err = out.err; m_chunk_size = out.chunk_size; m_chunk_total = out.chunk_total; m_chunk_free = out.chunk_free; } + int32_t + err() const { + return m_err; + } + uint64_t chunk_size() const { return m_chunk_size; @@ -2132,6 +2140,7 @@ struct chunk_stat { } private: + int32_t m_err; uint64_t m_chunk_size; uint64_t m_chunk_total; uint64_t m_chunk_free; diff --git a/include/daemon/backend/data/chunk_storage.hpp b/include/daemon/backend/data/chunk_storage.hpp index 8a6db678439ca5e6e60ffabab4c80d6096a2a43e..1bf94d797bf76fad436c29cc9dbb44fed8e0c6a7 100644 --- a/include/daemon/backend/data/chunk_storage.hpp +++ b/include/daemon/backend/data/chunk_storage.hpp @@ -14,17 +14,16 @@ #ifndef GEKKOFS_CHUNK_STORAGE_HPP #define GEKKOFS_CHUNK_STORAGE_HPP -extern "C" { -#include -} +#include #include #include #include +#include /* Forward declarations */ namespace spdlog { - class logger; +class logger; } namespace gkfs { @@ -36,42 +35,44 @@ struct ChunkStat { unsigned long chunk_free; }; +class ChunkStorageException : public std::system_error { +public: + ChunkStorageException(const int err_code, const std::string& s) : std::system_error(err_code, + std::system_category(), s) {}; +}; + class ChunkStorage { private: static constexpr const char* LOGGER_NAME = "ChunkStorage"; - std::shared_ptr log; + std::shared_ptr log_; - std::string root_path; - size_t chunksize; + std::string root_path_; + size_t chunksize_; inline std::string absolute(const std::string& internal_path) const; static inline std::string get_chunks_dir(const std::string& file_path); - static inline std::string get_chunk_path(const std::string& file_path, unsigned int chunk_id); + static inline std::string get_chunk_path(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id); void init_chunk_space(const std::string& file_path) const; public: - ChunkStorage(const std::string& path, size_t chunksize); + ChunkStorage(std::string& path, size_t chunksize); - void write_chunk(const std::string& file_path, unsigned int chunk_id, - const char* buff, size_t size, off64_t offset, - ABT_eventual& eventual) const; - - void read_chunk(const std::string& file_path, unsigned int chunk_id, - char* buff, size_t size, off64_t offset, - ABT_eventual& eventual) const; + void destroy_chunk_space(const std::string& file_path) const; - void trim_chunk_space(const std::string& file_path, unsigned int chunk_start, - unsigned int chunk_end = std::numeric_limits::max()); + ssize_t + write_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buff, size_t size, + off64_t offset) const; - void delete_chunk(const std::string& file_path, unsigned int chunk_id); + ssize_t read_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, + off64_t offset) const; - void truncate_chunk(const std::string& file_path, unsigned int chunk_id, off_t length); + void trim_chunk_space(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_start); - void destroy_chunk_space(const std::string& file_path) const; + void truncate_chunk_file(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length); ChunkStat chunk_stat() const; }; diff --git a/include/daemon/backend/exceptions.hpp b/include/daemon/backend/exceptions.hpp index 36e80cfc148a2be058f9f045be5fe3f00216b279..b5cc11d53ab51fe28993ffdfcfe619c7c45f61a5 100644 --- a/include/daemon/backend/exceptions.hpp +++ b/include/daemon/backend/exceptions.hpp @@ -17,14 +17,20 @@ #include #include +namespace gkfs { +namespace metadata { + class DBException : public std::runtime_error { public: - DBException(const std::string& s) : std::runtime_error(s) {}; + explicit DBException(const std::string& s) : std::runtime_error(s) {}; }; class NotFoundException : public DBException { public: - NotFoundException(const std::string& s) : DBException(s) {}; + explicit NotFoundException(const std::string& s) : DBException(s) {}; }; +} // namespace metadata +} // namespace gkfs + #endif //GEKKOFS_DB_EXCEPTIONS_HPP diff --git a/include/daemon/handler/rpc_util.hpp b/include/daemon/handler/rpc_util.hpp index 38ef1094103e5ffb2adb5ef1b48bb53c6b0d5813..dd29b4d87249af1d8d2be4983d8435b9cb31f27c 100644 --- a/include/daemon/handler/rpc_util.hpp +++ b/include/daemon/handler/rpc_util.hpp @@ -25,8 +25,8 @@ extern "C" { namespace gkfs { namespace rpc { -template -inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t cleanup(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (bulk_handle) { ret = margo_bulk_free(*bulk_handle); @@ -51,16 +51,41 @@ inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* return ret; } -template -inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t respond(hg_handle_t* handle, OutputType* output) { auto ret = HG_SUCCESS; if (output && handle) { ret = margo_respond(*handle, output); if (ret != HG_SUCCESS) return ret; } - return cleanup(handle, input, static_cast(nullptr), bulk_handle); + return ret; +} + +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) { + auto ret = respond(handle, output); + if (ret != HG_SUCCESS) + return ret; + return cleanup(handle, input, static_cast(nullptr), bulk_handle); +} + +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output) { + return cleanup_respond(handle, input, output, nullptr); +} +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, OutputType* output) { + auto ret = respond(handle, output); + if (ret != HG_SUCCESS) + return ret; + if (handle) { + ret = margo_destroy(*handle); + if (ret != HG_SUCCESS) + return ret; + } + return ret; } } // namespace rpc diff --git a/include/daemon/ops/data.hpp b/include/daemon/ops/data.hpp new file mode 100644 index 0000000000000000000000000000000000000000..dcfb41b232fe76ad4fcdcfcfb4ee0ecf2d051c98 --- /dev/null +++ b/include/daemon/ops/data.hpp @@ -0,0 +1,219 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#ifndef GEKKOFS_DAEMON_DATA_HPP +#define GEKKOFS_DAEMON_DATA_HPP + +#include +#include + +#include +#include + +extern "C" { +#include +#include +} + +namespace gkfs { +namespace data { + +class ChunkOpException : public std::runtime_error { +public: + explicit ChunkOpException(const std::string& s) : std::runtime_error(s) {}; +}; + +class ChunkWriteOpException : public ChunkOpException { +public: + explicit ChunkWriteOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +class ChunkReadOpException : public ChunkOpException { +public: + explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +class ChunkMetaOpException : public ChunkOpException { +public: + explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +/** + * Classes to encapsulate asynchronous chunk operations. + * All operations on chunk files must go through the Argobots' task queues. + * Otherwise operations may overtake operations in the queues. + * This applies to write, read, and truncate which may modify the middle of a chunk, essentially a write operation. + * + * Note: This class is not thread-safe. + * + * In the future, this class may be used to provide failure tolerance for IO tasks + * + * Base class using the CRTP idiom + */ +template +class ChunkOperation { + +protected: + + const std::string path_; + + std::vector abt_tasks_; + std::vector task_eventuals_; + +public: + + explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {}; + + ChunkOperation(std::string path, size_t n) : path_(std::move(path)) { + // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults + abt_tasks_.resize(n); + task_eventuals_.resize(n); + }; + + ~ChunkOperation() { + cancel_all_tasks(); + } + + /** + * Cleans up and cancels all tasks in flight + */ + void cancel_all_tasks() { + GKFS_DATA->spdlogger()->trace("{}() enter", __func__); + for (auto& task : abt_tasks_) { + if (task) { + ABT_task_cancel(task); + ABT_task_free(&task); + } + } + for (auto& eventual : task_eventuals_) { + if (eventual) { + ABT_eventual_reset(eventual); + ABT_eventual_free(&eventual); + } + } + abt_tasks_.clear(); + task_eventuals_.clear(); + static_cast(this)->clear_task_args(); + } +}; + + +class ChunkTruncateOperation : public ChunkOperation { + friend class ChunkOperation; + +private: + struct chunk_truncate_args { + const std::string* path; + size_t size; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void truncate_abt(void* _arg); + + void clear_task_args(); + +public: + + explicit ChunkTruncateOperation(const std::string& path); + + ChunkTruncateOperation(const std::string& path, size_t n); + + ~ChunkTruncateOperation() = default; + + void truncate(size_t idx, size_t size); + + int wait_for_tasks(); +}; + +class ChunkWriteOperation : public ChunkOperation { + friend class ChunkOperation; + +private: + + struct chunk_write_args { + const std::string* path; + const char* buf; + gkfs::rpc::chnk_id_t chnk_id; + size_t size; + off64_t off; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void write_file_abt(void* _arg); + + void clear_task_args(); + +public: + + ChunkWriteOperation(const std::string& path, size_t n); + + ~ChunkWriteOperation() = default; + + void write_async(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset); + + std::pair wait_for_tasks(); + +}; + + +class ChunkReadOperation : public ChunkOperation { + friend class ChunkOperation; + +private: + + struct chunk_read_args { + const std::string* path; + char* buf; + gkfs::rpc::chnk_id_t chnk_id; + size_t size; + off64_t off; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void read_file_abt(void* _arg); + + void clear_task_args(); + +public: + + struct bulk_args { + margo_instance_id mid; + hg_addr_t origin_addr; + hg_bulk_t origin_bulk_handle; + std::vector* origin_offsets; + hg_bulk_t local_bulk_handle; + std::vector* local_offsets; + std::vector* chunk_ids; + }; + + ChunkReadOperation(const std::string& path, size_t n); + + ~ChunkReadOperation() = default; + + void read_async(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset); + + std::pair + wait_for_tasks_and_push_back(const bulk_args& args); + +}; + +} // namespace data +} // namespace gkfs + +#endif //GEKKOFS_DAEMON_DATA_HPP diff --git a/include/daemon/ops/metadentry.hpp b/include/daemon/ops/metadentry.hpp index 1c903a3ad880cacc23127948158dae11fd3a0821..fd41aa712f0e4cd7723476a3d0614b9e99eca43c 100644 --- a/include/daemon/ops/metadentry.hpp +++ b/include/daemon/ops/metadentry.hpp @@ -35,7 +35,7 @@ void update(const std::string& path, Metadata& md); void update_size(const std::string& path, size_t io_size, off_t offset, bool append); -void remove_node(const std::string& path); +void remove(const std::string& path); } // namespace metadata } // namespace gkfs diff --git a/include/global/global_defs.hpp b/include/global/global_defs.hpp index 8bd958615eb1bc6e98e84c73f3c637e61b1d6340..4675c8e8a9c1086de0d22f2ffc7e1fcedd10ea70 100644 --- a/include/global/global_defs.hpp +++ b/include/global/global_defs.hpp @@ -18,6 +18,9 @@ namespace gkfs { // These constexpr set the RPC's identity and which handler the receiver end should use namespace rpc { + +using chnk_id_t = unsigned long; + namespace tag { constexpr auto fs_config = "rpc_srv_fs_config"; @@ -45,10 +48,6 @@ constexpr auto ofi_tcp = "ofi+tcp"; } // namespace protocol } // namespace rpc -namespace types { -// typedefs -typedef unsigned long rpc_chnk_id_t; -} // namespace types } // namespace gkfs #endif //GEKKOFS_GLOBAL_DEFS_HPP diff --git a/include/global/rpc/distributor.hpp b/include/global/rpc/distributor.hpp index 2a0b79ff3ed41783f34bc5e56b64cd42dc485acd..bf91f5e4452a06d6695e9d43334a537376950217 100644 --- a/include/global/rpc/distributor.hpp +++ b/include/global/rpc/distributor.hpp @@ -17,6 +17,9 @@ #include #include #include +#include +#include +#include namespace gkfs { namespace rpc { @@ -69,6 +72,26 @@ public: std::vector locate_directory_metadata(const std::string& path) const override; }; +class GuidedDistributor : public Distributor { +private: + host_t localhost_; + unsigned int hosts_size_; + std::vector all_hosts_; + std::hash str_hash; + //std::unordered_map< std::pair, host_t > mapping; +std::unordered_map< std::string, host_t > mapping; +public: + GuidedDistributor(host_t localhost, unsigned int hosts_size); + + host_t localhost() const override; + + host_t locate_data(const std::string& path, const chunkid_t& chnk_id) const override; + + host_t locate_file_metadata(const std::string& path) const override; + + std::vector locate_directory_metadata(const std::string& path) const override; +}; + } // namespace rpc } // namespace gkfs diff --git a/include/global/rpc/rpc_types.hpp b/include/global/rpc/rpc_types.hpp index 5e6d57f12907296787bd05135ed04280741bbcd6..65b086ad236f3039e9da5d70a53236c75cede4a9 100644 --- a/include/global/rpc/rpc_types.hpp +++ b/include/global/rpc/rpc_types.hpp @@ -23,88 +23,95 @@ extern "C" { /* visible API for RPC data types used in RPCS */ // misc generic rpc types -MERCURY_GEN_PROC(rpc_err_out_t, ((hg_int32_t) (err))) +MERCURY_GEN_PROC(rpc_err_out_t, + ((hg_int32_t) (err))) // Metadentry MERCURY_GEN_PROC(rpc_mk_node_in_t, - ((hg_const_string_t) (path))\ -((uint32_t) (mode))) + ((hg_const_string_t) (path)) + ((uint32_t) (mode))) -MERCURY_GEN_PROC(rpc_path_only_in_t, ((hg_const_string_t) (path))) +MERCURY_GEN_PROC(rpc_path_only_in_t, + ((hg_const_string_t) (path))) -MERCURY_GEN_PROC(rpc_stat_out_t, ((hg_int32_t) (err)) - ((hg_const_string_t) (db_val))) +MERCURY_GEN_PROC(rpc_stat_out_t, + ((hg_int32_t) (err)) + ((hg_const_string_t) (db_val))) -MERCURY_GEN_PROC(rpc_rm_node_in_t, ((hg_const_string_t) (path))) +MERCURY_GEN_PROC(rpc_rm_node_in_t, + ((hg_const_string_t) (path))) MERCURY_GEN_PROC(rpc_trunc_in_t, - ((hg_const_string_t) (path)) \ -((hg_uint64_t) (length))) + ((hg_const_string_t) (path)) + ((hg_uint64_t) (length))) MERCURY_GEN_PROC(rpc_update_metadentry_in_t, - ((hg_const_string_t) (path))\ -((uint64_t) (nlink))\ -((hg_uint32_t) (mode))\ -((hg_uint32_t) (uid))\ -((hg_uint32_t) (gid))\ -((hg_int64_t) (size))\ -((hg_int64_t) (blocks))\ -((hg_int64_t) (atime))\ -((hg_int64_t) (mtime))\ -((hg_int64_t) (ctime))\ -((hg_bool_t) (nlink_flag))\ -((hg_bool_t) (mode_flag))\ -((hg_bool_t) (size_flag))\ -((hg_bool_t) (block_flag))\ -((hg_bool_t) (atime_flag))\ -((hg_bool_t) (mtime_flag))\ -((hg_bool_t) (ctime_flag))) - -MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, ((hg_const_string_t) (path)) - ((hg_uint64_t) (size)) - ((hg_int64_t) (offset)) - ((hg_bool_t) (append))) - -MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, ((hg_int32_t) (err)) - ((hg_int64_t) (ret_size))) - -MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, ((hg_int32_t) (err)) - ((hg_int64_t) (ret_size))) + ((hg_const_string_t) (path)) + ((uint64_t) (nlink)) + ((hg_uint32_t) (mode)) + ((hg_uint32_t) (uid)) + ((hg_uint32_t) (gid)) + ((hg_int64_t) (size)) + ((hg_int64_t) (blocks)) + ((hg_int64_t) (atime)) + ((hg_int64_t) (mtime)) + ((hg_int64_t) (ctime)) + ((hg_bool_t) (nlink_flag)) + ((hg_bool_t) (mode_flag)) + ((hg_bool_t) (size_flag)) + ((hg_bool_t) (block_flag)) + ((hg_bool_t) (atime_flag)) + ((hg_bool_t) (mtime_flag)) + ((hg_bool_t) (ctime_flag))) + +MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, + ((hg_const_string_t) (path)) + ((hg_uint64_t) (size)) + ((hg_int64_t) (offset)) + ((hg_bool_t) (append))) + +MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, + ((hg_int32_t) (err)) + ((hg_int64_t) (ret_size))) + +MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, + ((hg_int32_t) (err)) + ((hg_int64_t) (ret_size))) #ifdef HAS_SYMLINKS MERCURY_GEN_PROC(rpc_mk_symlink_in_t, - ((hg_const_string_t) (path))\ -((hg_const_string_t) (target_path)) + ((hg_const_string_t) (path)) + ((hg_const_string_t) (target_path)) ) #endif // data MERCURY_GEN_PROC(rpc_read_data_in_t, - ((hg_const_string_t) (path))\ -((int64_t) (offset))\ -((hg_uint64_t) (host_id))\ -((hg_uint64_t) (host_size))\ -((hg_uint64_t) (chunk_n))\ -((hg_uint64_t) (chunk_start))\ -((hg_uint64_t) (chunk_end))\ -((hg_uint64_t) (total_chunk_size))\ -((hg_bulk_t) (bulk_handle))) + ((hg_const_string_t) (path)) + ((int64_t) (offset)) + ((hg_uint64_t) (host_id)) + ((hg_uint64_t) (host_size)) + ((hg_uint64_t) (chunk_n)) + ((hg_uint64_t) (chunk_start)) + ((hg_uint64_t) (chunk_end)) + ((hg_uint64_t) (total_chunk_size)) + ((hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC(rpc_data_out_t, - ((int32_t) (err))\ -((hg_size_t) (io_size))) + ((int32_t) (err)) + ((hg_size_t) (io_size))) MERCURY_GEN_PROC(rpc_write_data_in_t, - ((hg_const_string_t) (path))\ -((int64_t) (offset))\ -((hg_uint64_t) (host_id))\ -((hg_uint64_t) (host_size))\ -((hg_uint64_t) (chunk_n))\ -((hg_uint64_t) (chunk_start))\ -((hg_uint64_t) (chunk_end))\ -((hg_uint64_t) (total_chunk_size))\ -((hg_bulk_t) (bulk_handle))) + ((hg_const_string_t) (path)) + ((int64_t) (offset)) + ((hg_uint64_t) (host_id)) + ((hg_uint64_t) (host_size)) + ((hg_uint64_t) (chunk_n)) + ((hg_uint64_t) (chunk_start)) + ((hg_uint64_t) (chunk_end)) + ((hg_uint64_t) (total_chunk_size)) + ((hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC(rpc_get_dirents_in_t, ((hg_const_string_t) (path)) @@ -117,15 +124,16 @@ MERCURY_GEN_PROC(rpc_get_dirents_out_t, ) -MERCURY_GEN_PROC(rpc_config_out_t, ((hg_const_string_t) (mountdir)) - ((hg_const_string_t) (rootdir)) \ -((hg_bool_t) (atime_state)) \ -((hg_bool_t) (mtime_state)) \ -((hg_bool_t) (ctime_state)) \ -((hg_bool_t) (link_cnt_state)) \ -((hg_bool_t) (blocks_state)) \ -((hg_uint32_t) (uid)) \ -((hg_uint32_t) (gid)) \ +MERCURY_GEN_PROC(rpc_config_out_t, + ((hg_const_string_t) (mountdir)) + ((hg_const_string_t) (rootdir)) + ((hg_bool_t) (atime_state)) + ((hg_bool_t) (mtime_state)) + ((hg_bool_t) (ctime_state)) + ((hg_bool_t) (link_cnt_state)) + ((hg_bool_t) (blocks_state)) + ((hg_uint32_t) (uid)) + ((hg_uint32_t) (gid)) ) @@ -134,7 +142,8 @@ MERCURY_GEN_PROC(rpc_chunk_stat_in_t, ) MERCURY_GEN_PROC(rpc_chunk_stat_out_t, - ((hg_uint64_t) (chunk_size)) + ((hg_int32_t) (err)) + ((hg_uint64_t) (chunk_size)) ((hg_uint64_t) (chunk_total)) ((hg_uint64_t) (chunk_free)) ) diff --git a/include/global/rpc/rpc_util.hpp b/include/global/rpc/rpc_util.hpp index 2822c67c0bf6fb8a420d99cf0085c2b80511578d..cdac93a2bdd1981f09797143183dbe50a35d80f8 100644 --- a/include/global/rpc/rpc_util.hpp +++ b/include/global/rpc/rpc_util.hpp @@ -22,10 +22,16 @@ extern "C" { #include +namespace gkfs { +namespace rpc { + hg_bool_t bool_to_merc_bool(bool state); std::string get_my_hostname(bool short_hostname = false); std::string get_host_by_name(const std::string& hostname); +} // namespace rpc +} // namespace gkfs + #endif //GEKKOFS_GLOBAL_RPC_UTILS_HPP diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index 029304a7726f2474a81cd60fe492bdd854703dde..0d9bf7476e2b954d03fc1d4f35690cf63fd4e14c 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -234,7 +234,13 @@ int gkfs_stat(const string& path, struct stat* buf, bool follow_links) { } int gkfs_statfs(struct statfs* buf) { - auto blk_stat = gkfs::rpc::forward_get_chunk_stat(); + gkfs::rpc::ChunkStat blk_stat{}; + try { + blk_stat = gkfs::rpc::forward_get_chunk_stat(); + } catch (const std::exception& e) { + LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + return -1; + } buf->f_type = 0; buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; @@ -251,8 +257,13 @@ int gkfs_statfs(struct statfs* buf) { } int gkfs_statvfs(struct statvfs* buf) { - gkfs::preload::init_ld_env_if_needed(); - auto blk_stat = gkfs::rpc::forward_get_chunk_stat(); + gkfs::rpc::ChunkStat blk_stat{}; + try { + blk_stat = gkfs::rpc::forward_get_chunk_stat(); + } catch (const std::exception& e) { + LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + return -1; + } buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; buf->f_bfree = blk_stat.chunk_free; diff --git a/src/client/preload.cpp b/src/client/preload.cpp index ad09cff3291be0ce05df43a4135e746eba5be6d9..c5a1c054366dea353e6707eaa397298336fbbc80 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -98,9 +98,12 @@ void init_ld_environment_() { } /* Setup distributor */ - auto simple_hash_dist = std::make_shared(CTX->local_host_id(), - CTX->hosts().size()); - CTX->distributor(simple_hash_dist); + //auto simple_hash_dist = std::make_shared(CTX->local_host_id(), + // CTX->hosts().size()); + //CTX->distributor(simple_hash_dist); + + auto guided_dist = std::make_shared(CTX->local_host_id(), CTX->hosts().size()); + CTX->distributor(guided_dist); LOG(INFO, "Retrieving file system configuration..."); diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 4db07f20e7eb71405b93bd22b3766ecc57986850..0820cd62e6685271b4900fbe051ba817a88e4066 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -195,7 +195,7 @@ void load_hosts() { LOG(INFO, "Hosts pool size: {}", hosts.size()); - auto local_hostname = get_my_hostname(true); + auto local_hostname = gkfs::rpc::get_my_hostname(true); bool local_host_found = false; std::vector addrs; @@ -214,6 +214,7 @@ void load_hosts() { ::mt19937 g(rd()); // seed the random generator ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector // lookup addresses and put abstract server addresses into rpc_addresses + ::sort(host_ids.begin(), host_ids.end()); for (const auto& id: host_ids) { const auto& hostname = hosts.at(id).first; @@ -239,4 +240,4 @@ void load_hosts() { } } // namespace util -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index f68f65e4849153e97d10e0a5fa3c8d38e0642925..afa98c8addc622af1b7a78c60642fcc0e89fbf85 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -55,6 +55,10 @@ ssize_t forward_write(const string& path, const void* buf, const bool append_fla // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; + + //LOG(DEBUG, "path: \"{}\", append_flag:{}, in_offset:{}, offset:{}, write_size: {}, updated_metadentry_size:{}, chnk_start:{}, chnk_end:{}, ", + // path, append_flag, in_offset, offset, write_size, updated_metadentry_size, chnk_start, chnk_end); + for (uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) { auto target = CTX->distributor()->locate_data(path, chnk_id); @@ -306,6 +310,9 @@ ssize_t forward_read(const string& path, void* buf, const off64_t offset, const LOG(DEBUG, "host: {}, path: {}, chunks: {}, size: {}, offset: {}", target, path, in.chunk_n(), total_chunk_size, in.offset()); + // LOG(INFO, "read host: {}, path: {}, chunk_start: {}, chunk_end: {}", + // target, path, chnk_start, chnk_end); + } catch (const std::exception& ex) { LOG(ERROR, "Unable to send non-blocking rpc for path \"{}\" " "[peer: {}]", path, target); @@ -415,6 +422,11 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si return error ? -1 : 0; } +/** + * Performs a chunk stat RPC to all hosts + * @return rpc::ChunkStat + * @throws std::runtime_error + */ ChunkStat forward_get_chunk_stat() { std::vector> handles; @@ -444,28 +456,40 @@ ChunkStat forward_get_chunk_stat() { unsigned long chunk_total = 0; unsigned long chunk_free = 0; + int error = 0; + // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { - gkfs::rpc::chunk_stat::output out; + gkfs::rpc::chunk_stat::output out{}; try { // XXX We might need a timeout here to not wait forever for an // output that never comes? out = handles[i].get().at(0); + if (out.err() != 0) { + error = out.err(); + LOG(ERROR, "Host '{}' reported err code '{}' during stat chunk.", CTX->hosts().at(i).to_string(), + error); + // we don't break here to ensure all responses are processed + continue; + } assert(out.chunk_size() == chunk_size); chunk_total += out.chunk_total(); chunk_free += out.chunk_free(); - } catch (const std::exception& ex) { - throw std::runtime_error( - fmt::format("Failed to get rpc output for target host: {}]", i)); + errno = EBUSY; + throw std::runtime_error(fmt::format("Failed to get RPC output from host: {}", i)); } } + if (error != 0) { + errno = error; + throw std::runtime_error("chunk stat failed on one host"); + } return {chunk_size, chunk_total, chunk_free}; } } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index e4ba89177c10f43f57806913a89844e6f43347a1..945a21983032a05ed2ef1675e2e6f96f5ba85c86 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -7,10 +7,11 @@ set(DAEMON_SRC daemon.cpp util.cpp ops/metadentry.cpp + ops/data.cpp classes/fs_data.cpp classes/rpc_data.cpp - handler/srv_metadata.cpp handler/srv_data.cpp + handler/srv_metadata.cpp handler/srv_management.cpp ) set(DAEMON_HEADERS @@ -23,6 +24,7 @@ set(DAEMON_HEADERS ../../include/global/path_util.hpp ../../include/daemon/daemon.hpp ../../include/daemon/util.hpp + ../../include/daemon/ops/data.hpp ../../include/daemon/ops/metadentry.hpp ../../include/daemon/classes/fs_data.hpp ../../include/daemon/classes/rpc_data.hpp diff --git a/src/daemon/backend/data/CMakeLists.txt b/src/daemon/backend/data/CMakeLists.txt index 41f1ec0f998a2e31fd26fcf595312e1706ff239e..1a7e2a59a1dc499d9bc56b10f3f58b51edb0e048 100644 --- a/src/daemon/backend/data/CMakeLists.txt +++ b/src/daemon/backend/data/CMakeLists.txt @@ -5,6 +5,7 @@ target_sources(storage ${INCLUDE_DIR}/daemon/backend/data/chunk_storage.hpp PRIVATE ${INCLUDE_DIR}/global/path_util.hpp + ${INCLUDE_DIR}/global/global_defs.hpp ${CMAKE_CURRENT_LIST_DIR}/chunk_storage.cpp ) @@ -12,10 +13,8 @@ target_link_libraries(storage PRIVATE spdlog Boost::filesystem - ${ABT_LIBRARIES} ) -target_include_directories(storage - PRIVATE - ${ABT_INCLUDE_DIRS} - ) +#target_include_directories(storage +# PRIVATE +# ) diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index 3d9b69fecaada122effbe1b47cac7b2cb996e5d1..819ed1c5083d933269f49633f78ce68cd50a49b2 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -15,6 +15,8 @@ #include #include +#include + #include #include @@ -28,22 +30,11 @@ using namespace std; namespace gkfs { namespace data { +// private functions + string ChunkStorage::absolute(const string& internal_path) const { assert(gkfs::path::is_relative(internal_path)); - return root_path + '/' + internal_path; -} - -ChunkStorage::ChunkStorage(const string& path, const size_t chunksize) : - root_path(path), - chunksize(chunksize) { - //TODO check path: absolute, exists, permission to write etc... - assert(gkfs::path::is_absolute(root_path)); - - /* Initialize logger */ - log = spdlog::get(LOGGER_NAME); - assert(log); - - log->debug("Chunk storage initialized with path: '{}'", root_path); + return fmt::format("{}/{}", root_path_, internal_path); } string ChunkStorage::get_chunks_dir(const string& file_path) { @@ -53,169 +44,255 @@ string ChunkStorage::get_chunks_dir(const string& file_path) { return chunk_dir; } -string ChunkStorage::get_chunk_path(const string& file_path, unsigned int chunk_id) { - return get_chunks_dir(file_path) + '/' + ::to_string(chunk_id); -} - -void ChunkStorage::destroy_chunk_space(const string& file_path) const { - auto chunk_dir = absolute(get_chunks_dir(file_path)); - try { - bfs::remove_all(chunk_dir); - } catch (const bfs::filesystem_error& e) { - log->error("Failed to remove chunk directory. Path: '{}', Error: '{}'", chunk_dir, e.what()); - } +string ChunkStorage::get_chunk_path(const string& file_path, gkfs::rpc::chnk_id_t chunk_id) { + return fmt::format("{}/{}", get_chunks_dir(file_path), chunk_id); } +/** + * Creates a chunk directory that all chunk files are placed in. + * The path to the real file will be used as the directory name + * @param file_path + * @throws ChunkStorageException on error + */ void ChunkStorage::init_chunk_space(const string& file_path) const { auto chunk_dir = absolute(get_chunks_dir(file_path)); auto err = mkdir(chunk_dir.c_str(), 0750); if (err == -1 && errno != EEXIST) { - log->error("Failed to create chunk dir. Path: '{}', Error: '{}'", chunk_dir, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to create chunk directory"); + auto err_str = fmt::format("{}() Failed to create chunk directory. File: '{}', Error: '{}'", __func__, + file_path, errno); + throw ChunkStorageException(errno, err_str); } } -/* Delete all chunks stored on this node that falls in the gap [chunk_start, chunk_end] +// public functions + +/** * - * This is pretty slow method because it cycle over all the chunks sapce for this file. + * @param path + * @param chunksize + * @throws ChunkStorageException */ -void ChunkStorage::trim_chunk_space(const string& file_path, - unsigned int chunk_start, unsigned int chunk_end) { - - auto chunk_dir = absolute(get_chunks_dir(file_path)); - const bfs::directory_iterator end; - - for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { - auto chunk_path = chunk_file->path(); - auto chunk_id = ::stoul(chunk_path.filename().c_str()); - if (chunk_id >= chunk_start && chunk_id <= chunk_end) { - int ret = unlink(chunk_path.c_str()); - if (ret == -1) { - log->error("Failed to remove chunk file. File: '{}', Error: '{}'", chunk_path.native(), - ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to remove chunk file"); - } - } - } -} - -void ChunkStorage::delete_chunk(const string& file_path, unsigned int chunk_id) { - auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - int ret = unlink(chunk_path.c_str()); - if (ret == -1) { - log->error("Failed to remove chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to remove chunk file"); - } +ChunkStorage::ChunkStorage(string& path, const size_t chunksize) : + root_path_(path), + chunksize_(chunksize) { + /* Initialize logger */ + log_ = spdlog::get(LOGGER_NAME); + assert(log_); + assert(gkfs::path::is_absolute(root_path_)); + // Verify that we have sufficient write access + // This will throw on error, canceling daemon initialization + auto test_file_path = "/.__chunk_dir_test"s; + init_chunk_space(test_file_path); + destroy_chunk_space(test_file_path); + log_->debug("{}() Chunk storage initialized with path: '{}'", __func__, root_path_); } -void ChunkStorage::truncate_chunk(const string& file_path, unsigned int chunk_id, off_t length) { - auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - assert(length > 0 && (unsigned int) length <= chunksize); - int ret = truncate(chunk_path.c_str(), length); - if (ret == -1) { - log->error("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to truncate chunk file"); +/** + * Removes chunk directory with all its files + * @param file_path + * @throws ChunkStorageException + */ +void ChunkStorage::destroy_chunk_space(const string& file_path) const { + auto chunk_dir = absolute(get_chunks_dir(file_path)); + try { + // Note: remove_all does not throw an error when path doesn't exist. + auto n = bfs::remove_all(chunk_dir); + log_->info("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); + } catch (const bfs::filesystem_error& e) { + auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, + chunk_dir, e.what()); + throw ChunkStorageException(e.code().value(), err_str); } } -void ChunkStorage::write_chunk(const string& file_path, unsigned int chunk_id, - const char* buff, size_t size, off64_t offset, ABT_eventual& eventual) const { - - assert((offset + size) <= chunksize); +/** + * Writes a chunk file. + * On failure throws ChunkStorageException with encapsulated error code + * + * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pwrite behavior + * + * @param file_path + * @param chunk_id + * @param buff + * @param size + * @param offset + * @param eventual + * @throws ChunkStorageException (caller will handle eventual signalling) + */ +ssize_t +ChunkStorage::write_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buff, size_t size, + off64_t offset) const { + assert((offset + size) <= chunksize_); + // may throw ChunkStorageException on failure init_chunk_space(file_path); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); int fd = open(chunk_path.c_str(), O_WRONLY | O_CREAT, 0640); if (fd < 0) { - log->error("Failed to open chunk file for write. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to open chunk file for write"); + auto err_str = fmt::format("Failed to open chunk file for write. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } auto wrote = pwrite(fd, buff, size, offset); if (wrote < 0) { - log->error("Failed to write chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", - chunk_path, size, offset, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to write chunk file"); + auto err_str = fmt::format("Failed to write chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", + chunk_path, size, offset, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - ABT_eventual_set(eventual, &wrote, sizeof(size_t)); - - auto err = close(fd); - if (err < 0) { - log->error("Failed to close chunk file after write. File: '{}', Error: '{}'", + // if close fails we just write an entry into the log erroring out + if (close(fd) < 0) { + log_->warn("Failed to close chunk file after write. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - //throw ::system_error(errno, ::system_category(), "Failed to close chunk file"); } + return wrote; } -void ChunkStorage::read_chunk(const string& file_path, unsigned int chunk_id, - char* buff, size_t size, off64_t offset, ABT_eventual& eventual) const { - assert((offset + size) <= chunksize); +/** + * Read from a chunk file. + * On failure throws ChunkStorageException with encapsulated error code + * + * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pread behavior + * @param file_path + * @param chunk_id + * @param buf + * @param size + * @param offset + * @param eventual + * @throws ChunkStorageException (caller will handle eventual signalling) + */ +ssize_t +ChunkStorage::read_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, + off64_t offset) const { + assert((offset + size) <= chunksize_); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); int fd = open(chunk_path.c_str(), O_RDONLY); if (fd < 0) { - log->error("Failed to open chunk file for read. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to open chunk file for read"); + auto err_str = fmt::format("Failed to open chunk file for read. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } size_t tot_read = 0; ssize_t read = 0; do { read = pread64(fd, - buff + tot_read, + buf + tot_read, size - tot_read, offset + tot_read); if (read == 0) { + /* + * A value of zero indicates end-of-file (except if the value of the size argument is also zero). + * This is not considered an error. If you keep calling read while at end-of-file, + * it will keep returning zero and doing nothing else. + * Hence, we break here. + */ break; } if (read < 0) { - log->error("Failed to read chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", - chunk_path, size, offset, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to read chunk file"); + auto err_str = fmt::format("Failed to read chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", + chunk_path, size, offset, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } #ifndef NDEBUG if (tot_read + read < size) { - log->warn("Read less bytes than requested: '{}'/{}. Total read was '{}'", read, size - tot_read, size); + log_->debug("Read less bytes than requested: '{}'/{}. Total read was '{}'. This is not an error!", read, + size - tot_read, size); } #endif assert(read > 0); tot_read += read; - - } while (tot_read != size); - ABT_eventual_set(eventual, &tot_read, sizeof(size_t)); - auto err = close(fd); - if (err < 0) { - log->error("Failed to close chunk file after read. File: '{}', Error: '{}'", + // if close fails we just write an entry into the log erroring out + if (close(fd) < 0) { + log_->warn("Failed to close chunk file after read. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - //throw ::system_error(errno, ::system_category(), "Failed to close chunk file"); } + return tot_read; } +/** +* Delete all chunks starting with chunk a chunk id. +* Note eventual consistency here: While chunks are removed, there is no lock that prevents +* other processes from modifying anything in that directory. +* It is the application's responsibility to stop modifying the file while truncate is executed +* +* If an error is encountered when removing a chunk file, the function will still remove all files and +* report the error afterwards with ChunkStorageException. + * @param file_path + * @param chunk_start + * @throws ChunkStorageException + */ +void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_t chunk_start) { + + auto chunk_dir = absolute(get_chunks_dir(file_path)); + const bfs::directory_iterator end; + auto err_flag = false; + for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { + auto chunk_path = chunk_file->path(); + auto chunk_id = std::stoul(chunk_path.filename().c_str()); + if (chunk_id >= chunk_start) { + auto err = unlink(chunk_path.c_str()); + if (err == -1 && errno != ENOENT) { + err_flag = true; + log_->warn("{}() Failed to remove chunk file. File: '{}', Error: '{}'", __func__, chunk_path.native(), + ::strerror(errno)); + } + } + } + if (err_flag) + throw ChunkStorageException(EIO, fmt::format("{}() One or more errors occurred when truncating '{}'", __func__, + file_path)); +} + +/** + * Truncates a single chunk file to a given length + * @param file_path + * @param chunk_id + * @param length + * @throws ChunkStorageException + */ +void ChunkStorage::truncate_chunk_file(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length) { + auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); + assert(length > 0 && static_cast(length) <= chunksize_); + int ret = truncate(chunk_path.c_str(), length); + if (ret == -1) { + auto err_str = fmt::format("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); + } +} + +/** + * Calls statfs on the chunk directory to get statistic on its used and free size left + * @return ChunkStat + * @throws ChunkStorageException + */ ChunkStat ChunkStorage::chunk_stat() const { struct statfs sfs{}; - if (statfs(root_path.c_str(), &sfs) != 0) { - log->error("Failed to get filesystem statistic for chunk directory." - " Error: '{}'", ::strerror(errno)); - throw ::system_error(errno, ::system_category(), - "statfs() failed on chunk directory"); + + if (statfs(root_path_.c_str(), &sfs) != 0) { + auto err_str = fmt::format("Failed to get filesystem statistic for chunk directory. Error: '{}'", + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - log->debug("Chunksize '{}', total '{}', free '{}'", sfs.f_bsize, sfs.f_blocks, sfs.f_bavail); + log_->debug("Chunksize '{}', total '{}', free '{}'", sfs.f_bsize, sfs.f_blocks, sfs.f_bavail); auto bytes_total = static_cast(sfs.f_bsize) * static_cast(sfs.f_blocks); auto bytes_free = static_cast(sfs.f_bsize) * static_cast(sfs.f_bavail); - return {chunksize, - bytes_total / chunksize, - bytes_free / chunksize}; + return {chunksize_, + bytes_total / chunksize_, + bytes_free / chunksize_}; } } // namespace data diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index 9fbf1d3f72e179b798b70fd189c2ba9fb8c8ba28..81968bb7fec76a9ddf14ec88cf3a67ed0e91344b 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -329,7 +329,7 @@ int main(int argc, const char* argv[]) { if (vm.count("listen")) { addr = vm["listen"].as(); } else { - addr = get_my_hostname(true); + addr = gkfs::rpc::get_my_hostname(true); } GKFS_DATA->bind_addr(fmt::format("{}://{}", RPC_PROTOCOL, addr)); diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 553a9eaa5391e2e7ff9235f9c069e94cc0e7eb3f..c3c74c5e81d0d2590df2248cde4a9cfa15e751eb 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -16,111 +16,19 @@ #include #include #include +#include #include #include #include - using namespace std; -struct write_chunk_args { - const std::string* path; - const char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; - size_t size; - off64_t off; - ABT_eventual eventual; -}; - /** - * Used by an argobots threads. Argument args has the following fields: - * const std::string* path; - const char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; - size_t size; - off64_t off; - ABT_eventual* eventual; - * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. - * This function is called by tasklets, as this function cannot be allowed to block. - * @return written_size is put into eventual and returned that way - */ -void write_file_abt(void* _arg) { - // Unpack args - auto* arg = static_cast(_arg); - const std::string& path = *(arg->path); - - try { - GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, - arg->buf, arg->size, arg->off, arg->eventual); - } catch (const std::system_error& serr) { - GKFS_DATA->spdlogger()->error("{}() Error writing chunk {} of file {}", __func__, arg->chnk_id, path); - ssize_t wrote = -(serr.code().value()); - ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); - } - -} - -struct read_chunk_args { - const std::string* path; - char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; - size_t size; - off64_t off; - ABT_eventual eventual; -}; - -/** - * Used by an argobots threads. Argument args has the following fields: - * const std::string* path; - char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; - size_t size; - off64_t off; - ABT_eventual* eventual; - * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. - * This function is called by tasklets, as this function cannot be allowed to block. - * @return read_size is put into eventual and returned that way - */ -void read_file_abt(void* _arg) { - //unpack args - auto* arg = static_cast(_arg); - const std::string& path = *(arg->path); - - try { - GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, - arg->buf, arg->size, arg->off, arg->eventual); - } catch (const std::system_error& serr) { - GKFS_DATA->spdlogger()->error("{}() Error reading chunk {} of file {}", __func__, arg->chnk_id, path); - ssize_t read = -(serr.code().value()); - ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); - } -} - -/** - * Free Argobots tasks and eventual constructs in a given vector until max_idx. - * Nothing is done for a vector if nullptr is given - * @param abt_tasks - * @param abt_eventuals - * @param max_idx + * RPC handler for an incoming write RPC + * @param handle * @return */ -void cancel_abt_io(vector* abt_tasks, vector* abt_eventuals, uint64_t max_idx) { - if (abt_tasks != nullptr) { - for (uint64_t i = 0; i < max_idx; i++) { - ABT_task_cancel(abt_tasks->at(i)); - ABT_task_free(&abt_tasks->at(i)); - } - } - if (abt_eventuals != nullptr) { - for (uint64_t i = 0; i < max_idx; i++) { - ABT_eventual_reset(abt_eventuals->at(i)); - ABT_eventual_free(&abt_eventuals->at(i)); - } - } -} - - static hg_return_t rpc_srv_write(hg_handle_t handle) { /* * 1. Setup @@ -140,8 +48,9 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, - in.path, bulk_size, in.offset); + GKFS_DATA->spdlogger()->info( + "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", + __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* * 2. Set up buffers for pull bulk transfers */ @@ -163,9 +72,8 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + gkfs::rpc::GuidedDistributor distributor(host_id, host_size); - auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -189,32 +97,38 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size : gkfs::config::rpc::chunksize; uint64_t origin_offset; uint64_t local_offset; - // task structures for async writing - vector abt_tasks(in.chunk_n); - vector task_eventuals(in.chunk_n); - vector task_args(in.chunk_n); + // object for asynchronous disk IO + gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n}; + /* * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk */ // Start to look for a chunk that hashes to this host with the first chunk in the buffer - for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { + for (auto chnk_id_file = in.chunk_start; + chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - if (distributor.locate_data(in.path, chnk_id_file) != host_id) + if (distributor.locate_data(in.path, chnk_id_file) != host_id) { + GKFS_DATA->spdlogger()->trace( + "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", + __func__, chnk_id_file, host_id, chnk_id_curr); continue; + } chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { // if only 1 destination and 1 chunk (small write) the transfer_size == bulk_size - auto offset_transfer_size = (in.offset + bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size - : static_cast( - gkfs::config::rpc::chunksize - in.offset); + size_t offset_transfer_size = 0; + if (in.offset + bulk_size <= gkfs::config::rpc::chunksize) + offset_transfer_size = bulk_size; + else + offset_transfer_size = static_cast(gkfs::config::rpc::chunksize - in.offset); ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, offset_transfer_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__, chnk_id_file, in.chunk_start, in.chunk_end - 1); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } bulk_buf_ptrs[chnk_id_curr] = chnk_ptr; @@ -233,7 +147,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { if (chnk_id_curr == in.chunk_n - 1) transfer_size = chnk_size_left_host; GKFS_DATA->spdlogger()->trace( - "{}() BULK_TRANSFER hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}", + "{}() BULK_TRANSFER_PULL hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}", __func__, host_id, in.path, chnk_id_file, in.total_chunk_size, chnk_size_left_host, origin_offset, local_offset, transfer_size); // RDMA the data to here @@ -242,8 +156,8 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})", __func__, - *path, chnk_id_file, in.chunk_start, (in.chunk_end - 1)); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr); + in.path, chnk_id_file, in.chunk_start, (in.chunk_end - 1)); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } bulk_buf_ptrs[chnk_id_curr] = chnk_ptr; @@ -251,22 +165,13 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; } - // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool - // Starting tasklets for parallel I/O - ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value - auto& task_arg = task_args[chnk_id_curr]; - task_arg.path = path.get(); - task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; - task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; - task_arg.size = chnk_sizes[chnk_id_curr]; - // only the first chunk gets the offset. the chunks are sorted on the client side - task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; - task_arg.eventual = task_eventuals[chnk_id_curr]; - auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args[chnk_id_curr], - &abt_tasks[chnk_id_curr]); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() task create failed", __func__); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1); + try { + // start tasklet for writing chunk + chunk_op.write_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], + chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + } catch (const gkfs::data::ChunkWriteOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while write_async err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } // next chunk @@ -274,36 +179,16 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { } // Sanity check that all chunks where detected in previous loop + // TODO don't proceed if that happens. if (chnk_size_left_host != 0) GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__, chnk_size_left_host); /* * 4. Read task results and accumulate in out.io_size */ - out.err = 0; - out.io_size = 0; - for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) { - ssize_t* task_written_size = nullptr; - // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler - auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_written_size); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to wait for write task for chunk {}", - __func__, chnk_id_curr); - out.err = EIO; - break; - } - assert(task_written_size != nullptr); - if (*task_written_size < 0) { - GKFS_DATA->spdlogger()->error("{}() Write task failed for chunk {}", - __func__, chnk_id_curr); - out.err = -(*task_written_size); - break; - } - - out.io_size += *task_written_size; // add task written size to output size - ABT_eventual_free(&task_eventuals[chnk_id_curr]); - } + auto write_result = chunk_op.wait_for_tasks(); + out.err = write_result.first; + out.io_size = write_result.second; // Sanity check to see if all data has been written if (in.total_chunk_size != out.io_size) { @@ -315,17 +200,16 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { * 5. Respond and cleanup */ GKFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.err); - ret = gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - // free tasks after responding - for (auto&& task : abt_tasks) { - ABT_task_join(task); - ABT_task_free(&task); - } - return ret; + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_write) +/** + * RPC handler for an incoming read RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_read(hg_handle_t handle) { /* * 1. Setup @@ -345,8 +229,9 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, - in.path, bulk_size, in.offset); + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", + __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* * 2. Set up buffers for pull bulk transfers @@ -369,9 +254,8 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { } auto const host_id = in.host_id; auto const host_size = in.host_size; - gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); + gkfs::rpc::GuidedDistributor distributor(host_id, host_size); - auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -387,25 +271,30 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto chnk_ptr = static_cast(bulk_buf); // temporary variables auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size : gkfs::config::rpc::chunksize; - // tasks structures - vector abt_tasks(in.chunk_n); - vector task_eventuals(in.chunk_n); - vector task_args(in.chunk_n); + // object for asynchronous disk IO + gkfs::data::ChunkReadOperation chunk_read_op{in.path, in.chunk_n}; /* * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk */ // Start to look for a chunk that hashes to this host with the first chunk in the buffer - for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { + for (auto chnk_id_file = in.chunk_start; + chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - if (distributor.locate_data(in.path, chnk_id_file) != host_id) + if (distributor.locate_data(in.path, chnk_id_file) != host_id) { + GKFS_DATA->spdlogger()->trace( + "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", + __func__, chnk_id_file, host_id, chnk_id_curr); continue; + } chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { // if only 1 destination and 1 chunk (small read) the transfer_size == bulk_size - auto offset_transfer_size = (in.offset + bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size - : static_cast( - gkfs::config::rpc::chunksize - in.offset); + size_t offset_transfer_size = 0; + if (in.offset + bulk_size <= gkfs::config::rpc::chunksize) + offset_transfer_size = bulk_size; + else + offset_transfer_size = static_cast(gkfs::config::rpc::chunksize - in.offset); // Setting later transfer offsets local_offsets[chnk_id_curr] = 0; origin_offsets[chnk_id_curr] = 0; @@ -432,140 +321,108 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; } - // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool - // Starting tasklets for parallel I/O - ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value - auto& task_arg = task_args[chnk_id_curr]; - task_arg.path = path.get(); - task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; - task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; - task_arg.size = chnk_sizes[chnk_id_curr]; - // only the first chunk gets the offset. the chunks are sorted on the client side - task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; - task_arg.eventual = task_eventuals[chnk_id_curr]; - auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args[chnk_id_curr], - &abt_tasks[chnk_id_curr]); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() task create failed", __func__); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1); + try { + // start tasklet for read operation + chunk_read_op.read_async(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], + chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + } catch (const gkfs::data::ChunkReadOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } chnk_id_curr++; } // Sanity check that all chunks where detected in previous loop + // TODO error out. If we continue this will crash the server when sending results back that don't exist. if (chnk_size_left_host != 0) GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__, chnk_size_left_host); /* * 4. Read task results and accumulate in out.io_size */ - out.err = 0; - out.io_size = 0; - for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) { - ssize_t* task_read_size = nullptr; - // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler - auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to wait for read task for chunk {}", - __func__, chnk_id_curr); - out.err = EIO; - break; - } - assert(task_read_size != nullptr); - if (*task_read_size < 0) { - if (-(*task_read_size) == ENOENT) { - continue; - } - GKFS_DATA->spdlogger()->warn( - "{}() Read task failed for chunk {}", - __func__, chnk_id_curr); - out.err = -(*task_read_size); - break; - } - - if (*task_read_size == 0) { - continue; - } - - ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id_curr], - bulk_handle, local_offsets[chnk_id_curr], *task_read_size); - if (ret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}", - __func__, chnk_id_curr, in.path, origin_offsets[chnk_id_curr], local_offsets[chnk_id_curr], - chnk_sizes[chnk_id_curr]); - out.err = EIO; - break; - } - out.io_size += *task_read_size; // add task read size to output size - } + gkfs::data::ChunkReadOperation::bulk_args bulk_args{}; + bulk_args.mid = mid; + bulk_args.origin_addr = hgi->addr; + bulk_args.origin_bulk_handle = in.bulk_handle; + bulk_args.origin_offsets = &origin_offsets; + bulk_args.local_bulk_handle = bulk_handle; + bulk_args.local_offsets = &local_offsets; + bulk_args.chunk_ids = &chnk_ids_host; + // wait for all tasklets and push read data back to client + auto read_result = chunk_read_op.wait_for_tasks_and_push_back(bulk_args); + out.err = read_result.first; + out.io_size = read_result.second; /* * 5. Respond and cleanup */ GKFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}", __func__, out.err); - ret = gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - // free tasks after responding - cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n); - return ret; + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_read) +/** + * RPC handler for an incoming truncate RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_truncate(hg_handle_t handle) { rpc_trunc_in_t in{}; rpc_err_out_t out{}; - + out.err = EIO; + // Getting some information from margo auto ret = margo_get_input(handle, &in); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Could not get RPC input data with err {}", __func__, ret); - throw runtime_error("Failed to get RPC input data"); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: {}", __func__, in.path, in.length); - - unsigned int chunk_start = gkfs::util::chnk_id_for_offset(in.length, gkfs::config::rpc::chunksize); + GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); - // If we trunc in the the middle of a chunk, do not delete that chunk - auto left_pad = gkfs::util::chnk_lpad(in.length, gkfs::config::rpc::chunksize); - if (left_pad != 0) { - GKFS_DATA->storage()->truncate_chunk(in.path, chunk_start, left_pad); - ++chunk_start; + gkfs::data::ChunkTruncateOperation chunk_op{in.path}; + try { + // start tasklet for truncate operation + chunk_op.truncate(0, in.length); + } catch (const gkfs::data::ChunkMetaOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->storage()->trim_chunk_space(in.path, chunk_start); + // wait and get output + out.err = chunk_op.wait_for_tasks(); - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); - auto hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to respond"); - } - // Destroy handle when finished - margo_free_input(handle, &in); - margo_destroy(handle); - return HG_SUCCESS; + GKFS_DATA->spdlogger()->debug("{}() Sending output response '{}'", __func__, out.err); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate) +/** + * RPC handler for an incoming chunk stat RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { - GKFS_DATA->spdlogger()->trace("{}() called", __func__); - + GKFS_DATA->spdlogger()->debug("{}() enter", __func__); rpc_chunk_stat_out_t out{}; - // Get input - auto chk_stat = GKFS_DATA->storage()->chunk_stat(); - // Create output and send it - out.chunk_size = chk_stat.chunk_size; - out.chunk_total = chk_stat.chunk_total; - out.chunk_free = chk_stat.chunk_free; - auto hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + out.err = EIO; + try { + auto chk_stat = GKFS_DATA->storage()->chunk_stat(); + out.chunk_size = chk_stat.chunk_size; + out.chunk_total = chk_stat.chunk_total; + out.chunk_free = chk_stat.chunk_free; + out.err = 0; + } catch (const gkfs::data::ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + out.err = err.code().value(); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); + out.err = EAGAIN; } - // Destroy handle when finished - margo_destroy(handle); - return hret; + // Create output and send it back + return gkfs::rpc::cleanup_respond(&handle, &out); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index ce6de804f885f99f55fac23a22110cdcd14f1fe5..bcd066e763b1934d348a71e6bb163d9889010f30 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -18,6 +18,7 @@ #include #include +#include using namespace std; @@ -69,7 +70,7 @@ static hg_return_t rpc_srv_stat(hg_handle_t handle) { out.db_val = val.c_str(); out.err = 0; GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -100,17 +101,17 @@ static hg_return_t rpc_srv_decr_size(hg_handle_t handle) { throw runtime_error("Failed to retrieve input from handle"); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: {}", __func__, in.path, in.length); + GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); try { GKFS_DATA->mdb()->decrease_size(in.path, in.length); out.err = 0; } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); out.err = EIO; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -132,28 +133,25 @@ static hg_return_t rpc_srv_remove(hg_handle_t handle) { if (ret != HG_SUCCESS) GKFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); - GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); + GKFS_DATA->spdlogger()->info("{}() Got remove RPC with path '{}'", __func__, in.path); + // Remove metadentry if exists on the node and remove all chunks for that file try { - // Remove metadentry if exists on the node - // and remove all chunks for that file - gkfs::metadata::remove_node(in.path); - out.err = 0; - } catch (const NotFoundException& e) { - /* The metadentry was not found on this node, - * this is not an error. At least one node involved in this - * broadcast operation will find and delete the entry on its local - * MetadataDB. - * TODO: send the metadentry remove only to the node that actually - * has it. - */ + gkfs::metadata::remove(in.path); out.err = 0; + } catch (const gkfs::metadata::DBException& e) { + GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); + out.err = EIO; + } catch (const gkfs::data::ChunkStorageException& e) { + GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), + e.what()); + out.err = e.code().value(); } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to remove node: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); out.err = EBUSY; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -201,7 +199,7 @@ static hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { out.err = 1; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -224,7 +222,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { if (ret != HG_SUCCESS) GKFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}, append: {}", __func__, in.path, in.size, + GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, in.offset, in.append); try { @@ -233,7 +231,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { //TODO the actual size of the file could be different after the size update // do to concurrency on size out.ret_size = in.size + in.offset; - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -241,7 +239,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { out.err = EBUSY; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -270,7 +268,7 @@ static hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { try { out.ret_size = gkfs::metadata::get_size(in.path); out.err = 0; - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -301,7 +299,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { auto ret = margo_get_input(handle, &in); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Could not get RPC input data with err {}", __func__, ret); + "{}() Could not get RPC input data with err '{}'", __func__, ret); return ret; } @@ -309,7 +307,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); GKFS_DATA->spdlogger()->debug( - "{}() Got dirents RPC with path {}", __func__, in.path); + "{}() Got dirents RPC with path '{}'", __func__, in.path); auto bulk_size = margo_bulk_get_size(in.bulk_handle); //Get directory entries from local DB @@ -366,7 +364,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { out_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Failed push dirents on path {} to client", + "{}() Failed push dirents on path '{}' to client", __func__, in.path ); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); @@ -399,10 +397,10 @@ static hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { gkfs::metadata::create(in.path, md); out.err = 0; } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; } - GKFS_DATA->spdlogger()->debug("{}() Sending output err {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5c68b51e7fc169793b69038927706310b548366c --- /dev/null +++ b/src/daemon/ops/data.cpp @@ -0,0 +1,392 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#include +#include +#include +#include + +extern "C" { +#include +} + +using namespace std; + +namespace gkfs { +namespace data { + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + size_t size; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent operations per daemon. + * @return error is put into eventual to signal that it finished + */ +void ChunkTruncateOperation::truncate_abt(void* _arg) { + assert(_arg); + // Unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + const size_t size = arg->size; + int err_response = 0; + try { + // get chunk from where to cut off + auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); + // do not last delete chunk if it is in the middle of a chunk + auto left_pad = gkfs::util::chnk_lpad(size, gkfs::config::rpc::chunksize); + if (left_pad != 0) { + GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start, left_pad); + chunk_id_start++; + } + GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + err_response = err.code().value(); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, + size); + err_response = EIO; + } + ABT_eventual_set(arg->eventual, &err_response, sizeof(int)); +} + +void ChunkTruncateOperation::clear_task_args() { + task_args_.clear(); +} + +ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} + +ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); +} + +/** + * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed + */ +void ChunkTruncateOperation::truncate(size_t idx, size_t size) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->info("ChunkMetaOperation::{}() enter: idx '{}' path '{}' size '{}'", __func__, idx, path_, + size); + + auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[idx]); // truncate file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkMetaOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.size = size; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkMetaOpException(err_str); + } +} + + +int ChunkTruncateOperation::wait_for_tasks() { + GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_); + int trunc_err = 0; + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * On error, cleanup eventuals and set written data to 0 as written data is corrupted + */ + for (auto& e : task_eventuals_) { + int* task_err = nullptr; + auto abt_err = ABT_eventual_wait(e, (void**) &task_err); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkTruncateOperation::{}() Error when waiting on ABT eventual", __func__); + trunc_err = EIO; + ABT_eventual_free(&e); + continue; + } + assert(task_err != nullptr); + if (*task_err != 0) { + trunc_err = *task_err; + } + ABT_eventual_free(&e); + } + return trunc_err; +} + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + const char* buf; + const gkfs::rpc::chnk_id_t* chnk_id; + size_t size; + off64_t off; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. + * This function is called by tasklets, as this function cannot be allowed to block. + * @return written_size is put into eventual to signal that it finished + */ +void ChunkWriteOperation::write_file_abt(void* _arg) { + assert(_arg); + // Unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + ssize_t wrote{0}; + try { + wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + wrote = -(err.code().value()); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, + path); + wrote = -EIO; + } + ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); +} + +void ChunkWriteOperation::clear_task_args() { + task_args_.clear(); +} + +ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); +} + +/** + * Write buffer from a single chunk referenced by its ID. Put task into IO queue. + * On failure the write operations is aborted, throwing an error, and cleaned up. + * The caller may repeat a failed call. + * @param chunk_id + * @param bulk_buf_ptr + * @param size + * @param offset + * @throws ChunkWriteOpException + */ +void +ChunkWriteOperation::write_async(size_t idx, const uint64_t chunk_id, const char* bulk_buf_ptr, + const size_t size, + const off64_t offset) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->debug("ChunkWriteOperation::{}() enter: idx '{}' path '{}' chunkid '{}' size '{}' offset '{}'", __func__, + idx, path_, chunk_id, size, offset); + + auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // written file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkWriteOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.buf = bulk_buf_ptr; + task_arg.chnk_id = chunk_id; + task_arg.size = size; + task_arg.off = offset; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkWriteOpException(err_str); + } +} + +/** + * Waits for all Argobots tasklets to finish and report back the write error code and the size written. + * @return + */ +pair ChunkWriteOperation::wait_for_tasks() { + GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'", __func__, path_); + size_t total_written = 0; + int io_err = 0; + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * On error, cleanup eventuals and set written data to 0 as written data is corrupted + */ + for (auto& e : task_eventuals_) { + ssize_t* task_size = nullptr; + auto abt_err = ABT_eventual_wait(e, (void**) &task_size); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkWriteOperation::{}() Error when waiting on ABT eventual", __func__); + io_err = EIO; + ABT_eventual_free(&e); + continue; + } + if (io_err != 0) { + ABT_eventual_free(&e); + continue; + } + assert(task_size != nullptr); + if (*task_size < 0) { + io_err = -(*task_size); + } else { + total_written += *task_size; + } + ABT_eventual_free(&e); + } + // in case of error set written size to zero as data would be corrupted + if (io_err != 0) + total_written = 0; + return make_pair(io_err, total_written); +} + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + char* buf; + const gkfs::rpc::chnk_id_t* chnk_id; + size_t size; + off64_t off; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. + * This function is called by tasklets, as this function cannot be allowed to block. + * @return read_size is put into eventual to signal that it finished + */ +void ChunkReadOperation::read_file_abt(void* _arg) { + assert(_arg); + //unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + ssize_t read = 0; + try { + // Under expected circumstances (error or no error) read_chunk will signal the eventual + read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + read = -(err.code().value()); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, + path); + read = -EIO; + } + ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); +} + +void ChunkReadOperation::clear_task_args() { + task_args_.clear(); +} + +ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); +} + +/** + * Read buffer to a single chunk referenced by its ID. Put task into IO queue. + * On failure the read operations is aborted, throwing an error, and cleaned up. + * The caller may repeat a failed call. + * @param chunk_id + * @param bulk_buf_ptr + * @param size + * @param offset + */ +void +ChunkReadOperation::read_async(size_t idx, const uint64_t chunk_id, char* bulk_buf_ptr, const size_t size, + const off64_t offset) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->info("ChunkReadOperation::{}() enter: idx '{}' path '{}' chunk_id '{}' size '{}' offset '{}'", __func__, + idx, path_, chunk_id, size, offset); + + auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // read file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkReadOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.buf = bulk_buf_ptr; + task_arg.chnk_id = chunk_id; + task_arg.size = size; + task_arg.off = offset; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkReadOpException(err_str); + } + +} + +pair ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { + GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'", __func__, path_); + assert(args.chunk_ids->size() == task_args_.size()); + size_t total_read = 0; + int io_err = 0; + + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * As soon as an error is encountered, bulk_transfers will no longer be executed as the data would be corrupted + * The loop continues until all eventuals have been cleaned and freed. + */ + for (uint64_t idx = 0; idx < task_args_.size(); idx++) { + ssize_t* task_size = nullptr; + auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkReadOperation::{}() Error when waiting on ABT eventual", __func__); + io_err = EIO; + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + // error occured. stop processing but clean up + if (io_err != 0) { + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + assert(task_size != nullptr); + if (*task_size < 0) { + // sparse regions do not have chunk files and are therefore skipped + if (-(*task_size) == ENOENT) { + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + io_err = -(*task_size); // make error code > 0 + } else if (*task_size == 0) { + // read size of 0 is not an error and can happen because reading the end-of-file + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } else { + // successful case, push read data back to client + GKFS_DATA->spdlogger()->trace( + "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", + __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), + *task_size); + assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); + auto margo_err = margo_bulk_transfer(args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, + args.origin_offsets->at(idx), args.local_bulk_handle, + args.local_offsets->at(idx), *task_size); + if (margo_err != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'", __func__, + margo_err); + io_err = EBUSY; + continue; + } + total_read += *task_size; + } + ABT_eventual_free(&task_eventuals_[idx]); + } + // in case of error set read size to zero as data would be corrupted + if (io_err != 0) + total_read = 0; + return make_pair(io_err, total_read); +} + +} // namespace data +} // namespace gkfs diff --git a/src/daemon/ops/metadentry.cpp b/src/daemon/ops/metadentry.cpp index 169c7ac362b13beada5a7e35da43b2ba2e919981..d241b03bd9f3d90730686015fc8a567829600e10 100644 --- a/src/daemon/ops/metadentry.cpp +++ b/src/daemon/ops/metadentry.cpp @@ -104,9 +104,16 @@ void update_size(const string& path, size_t io_size, off64_t offset, bool append * Remove metadentry if exists and try to remove all chunks for path * @param path * @return + * @throws gkfs::metadata::DBException, gkfs::data::ChunkStorageException */ -void remove_node(const string& path) { - GKFS_DATA->mdb()->remove(path); // remove metadentry +void remove(const string& path) { + /* + * try to remove metadata from kv store but catch NotFoundException which is not an error in this case + * because removes can be broadcast to catch all data chunks but only one node will hold the kv store entry. + */ + try { + GKFS_DATA->mdb()->remove(path); // remove metadata from KV store + } catch (const NotFoundException& e) {} GKFS_DATA->storage()->destroy_chunk_space(path); // destroys all chunks for the path on this node } diff --git a/src/daemon/util.cpp b/src/daemon/util.cpp index 92fa06221083172b0b631f3359e1da3cc60e20a2..8202cde1dee58a0e8f5969952212d95199ac0036 100644 --- a/src/daemon/util.cpp +++ b/src/daemon/util.cpp @@ -31,7 +31,7 @@ void populate_hosts_file() { throw runtime_error( fmt::format("Failed to open hosts file '{}': {}", hosts_file, strerror(errno))); } - lfstream << fmt::format("{} {}", get_my_hostname(true), RPC_DATA->self_addr_str()) << std::endl; + lfstream << fmt::format("{} {}", gkfs::rpc::get_my_hostname(true), RPC_DATA->self_addr_str()) << std::endl; if (!lfstream) { throw runtime_error( fmt::format("Failed to write on hosts file '{}': {}", hosts_file, strerror(errno))); diff --git a/src/global/rpc/distributor.cpp b/src/global/rpc/distributor.cpp index ab059785302aec0a56fa224a5632c0d6ac066d44..d0944111803c062118acab4b60fe4f17ba90f44f 100644 --- a/src/global/rpc/distributor.cpp +++ b/src/global/rpc/distributor.cpp @@ -68,5 +68,65 @@ locate_directory_metadata(const string& path) const { return {localhost_}; } + +GuidedDistributor:: +GuidedDistributor(host_t localhost, unsigned int hosts_size) : + localhost_(localhost), + hosts_size_(hosts_size), + all_hosts_(hosts_size) { + ::iota(all_hosts_.begin(), all_hosts_.end(), 0); + + // Fill map + // header read 4 /fluidda-XFIEL.00000001.00000158.mpio.bin 1 8 136 3 + + + std::string op, path; + unsigned int original_host, destination_host; + chunkid_t chunk_id; + size_t size, offset; + std::ifstream mapfile; + std::ofstream mapout; + mapfile.open("/home/bsc25/bsc25015/mm32.txt"); + while (mapfile >> path >> chunk_id >> destination_host) + { + //mapping[ std::make_pair (path, chunk_id) ] = destination_host; + mapping[ path + ::to_string(chunk_id) ] = destination_host; + // mapout.open("/home/bsc25/bsc25015/mapout.txt", std::ofstream::app); + + // mapout << "C " << path << " " << chunk_id << " --> " << destination_host << std::endl; + // mapout.close(); + + } + mapfile.close(); +} + +host_t GuidedDistributor:: +localhost() const { + return localhost_; +} + +host_t GuidedDistributor:: +locate_data(const string& path, const chunkid_t& chnk_id) const { + //auto locate = std::make_pair(path, chnk_id); + auto locate = path + ::to_string(chnk_id); + auto it = mapping.find(locate); + if (it != mapping.end()) + { + return it->second; + } + else + return str_hash(locate) % hosts_size_; +} + +host_t GuidedDistributor:: +locate_file_metadata(const string& path) const { + return str_hash(path) % hosts_size_; +} + +::vector GuidedDistributor:: +locate_directory_metadata(const string& path) const { + return all_hosts_; +} + } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs diff --git a/src/global/rpc/rpc_util.cpp b/src/global/rpc/rpc_util.cpp index 8a38fa5976ef789b935d34c02a626d8cc04a2ed8..e5607fd330fd5c3b3e2b28ac0e040ebf7195e06f 100644 --- a/src/global/rpc/rpc_util.cpp +++ b/src/global/rpc/rpc_util.cpp @@ -24,6 +24,9 @@ extern "C" { using namespace std; +namespace gkfs { +namespace rpc { + /** * converts std bool to mercury bool * @param state @@ -89,4 +92,7 @@ string get_host_by_name(const string& hostname) { } freeaddrinfo(addr); return addr_str; -} \ No newline at end of file +} + +} // namespace rpc +} // namespace gkfs \ No newline at end of file diff --git a/utils/generate.py b/utils/generate.py new file mode 100644 index 0000000000000000000000000000000000000000..089def390597a3fd11f0cc06e0c1a0575a484948 --- /dev/null +++ b/utils/generate.py @@ -0,0 +1,22 @@ +import re +import sys +#[2020-03-03 10:50:45.513741 CET] [151530] [debug] host: 9, path: /fluidda-XFIEL.00000001.00000119.mpio.bin, chunks: 1, size: 524288, offset: 295200 +#(read).+(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+) +# Original host # path # chunkid # size # offset +destination = sys.argv[1] +file = sys.argv[2] +pattern = re.compile(r".+(read).+(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+)") + +# [2020-03-30 14:58:09.938632 CEST] [81763] [info] host: 6, path: /fluidda-XFIEL.00000001.00000181.mpio.bin, chunks: 1, size: 8, offset: 168 +# [2020-03-31 13:30:06.280897 CEST] [186656] [info] read host: 8, path: /fluidda-XFIEL.00000001.00000080.mpio.bin, chunk_start: 412, chunk_end: 470 + +pattern = re.compile(r".*(host: )(\d+).+(path: )(.+),.+(chunks: )(\d+).+(size: )(\d+).+(offset: )(\d+)") + +pattern = re.compile(r".+(read).+(host: )(\d+).+(path: )(.+),.+(chunk_start: )(\d+).+(chunk_end: )(\d+)") +with open(file) as f: + for line in f: + result = pattern.match(line) + if result: + #split = re.split(":\,",line) + for i in range(int(result[7]), int(result[9])+1): + print (result[5], i, destination)