diff --git a/include/client/rpc/rpc_types.hpp b/include/client/rpc/rpc_types.hpp index 85aec4586165d7c473283a42ec815016f9953e54..edfac30e383b0e829a32c92361154b9931243282 100644 --- a/include/client/rpc/rpc_types.hpp +++ b/include/client/rpc/rpc_types.hpp @@ -2092,11 +2092,13 @@ struct chunk_stat { public: output() : + m_err(), m_chunk_size(), m_chunk_total(), m_chunk_free() {} - output(uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) : + output(int32_t err, uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) : + m_err(err), m_chunk_size(chunk_size), m_chunk_total(chunk_total), m_chunk_free(chunk_free) {} @@ -2111,11 +2113,17 @@ struct chunk_stat { explicit output(const rpc_chunk_stat_out_t& out) { + m_err = out.err; m_chunk_size = out.chunk_size; m_chunk_total = out.chunk_total; m_chunk_free = out.chunk_free; } + int32_t + err() const { + return m_err; + } + uint64_t chunk_size() const { return m_chunk_size; @@ -2132,6 +2140,7 @@ struct chunk_stat { } private: + int32_t m_err; uint64_t m_chunk_size; uint64_t m_chunk_total; uint64_t m_chunk_free; diff --git a/include/daemon/backend/data/chunk_storage.hpp b/include/daemon/backend/data/chunk_storage.hpp index 8a6db678439ca5e6e60ffabab4c80d6096a2a43e..934cfe513e3cc0e064e48a81b7d3cef474aead9c 100644 --- a/include/daemon/backend/data/chunk_storage.hpp +++ b/include/daemon/backend/data/chunk_storage.hpp @@ -14,17 +14,16 @@ #ifndef GEKKOFS_CHUNK_STORAGE_HPP #define GEKKOFS_CHUNK_STORAGE_HPP -extern "C" { -#include -} +#include #include #include #include +#include /* Forward declarations */ namespace spdlog { - class logger; +class logger; } namespace gkfs { @@ -36,42 +35,43 @@ struct ChunkStat { unsigned long chunk_free; }; +class ChunkStorageException : public std::system_error { +public: + ChunkStorageException(const int err_code, const std::string& s) : std::system_error(err_code, + std::generic_category(), s) {}; +}; + class ChunkStorage { private: - static constexpr const char* LOGGER_NAME = "ChunkStorage"; - std::shared_ptr log; + std::shared_ptr log_; - std::string root_path; - size_t chunksize; + std::string root_path_; + size_t chunksize_; inline std::string absolute(const std::string& internal_path) const; static inline std::string get_chunks_dir(const std::string& file_path); - static inline std::string get_chunk_path(const std::string& file_path, unsigned int chunk_id); + static inline std::string get_chunk_path(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id); void init_chunk_space(const std::string& file_path) const; public: - ChunkStorage(const std::string& path, size_t chunksize); + ChunkStorage(std::string& path, size_t chunksize); - void write_chunk(const std::string& file_path, unsigned int chunk_id, - const char* buff, size_t size, off64_t offset, - ABT_eventual& eventual) const; - - void read_chunk(const std::string& file_path, unsigned int chunk_id, - char* buff, size_t size, off64_t offset, - ABT_eventual& eventual) const; + void destroy_chunk_space(const std::string& file_path) const; - void trim_chunk_space(const std::string& file_path, unsigned int chunk_start, - unsigned int chunk_end = std::numeric_limits::max()); + ssize_t + write_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buf, size_t size, + off64_t offset) const; - void delete_chunk(const std::string& file_path, unsigned int chunk_id); + ssize_t read_chunk(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, + off64_t offset) const; - void truncate_chunk(const std::string& file_path, unsigned int chunk_id, off_t length); + void trim_chunk_space(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_start); - void destroy_chunk_space(const std::string& file_path) const; + void truncate_chunk_file(const std::string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length); ChunkStat chunk_stat() const; }; diff --git a/include/daemon/backend/data/data_module.hpp b/include/daemon/backend/data/data_module.hpp new file mode 100644 index 0000000000000000000000000000000000000000..63c2fc4da2096de8ab048eecbbd1c3028418ded1 --- /dev/null +++ b/include/daemon/backend/data/data_module.hpp @@ -0,0 +1,53 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#ifndef GEKKOFS_DAEMON_DATA_LOGGING_HPP +#define GEKKOFS_DAEMON_DATA_LOGGING_HPP + +#include + +namespace gkfs { +namespace data { + +class DataModule { + +private: + DataModule() {} + + std::shared_ptr log_; + +public: + + static constexpr const char* LOGGER_NAME = "DataModule"; + + static DataModule* getInstance() { + static DataModule instance; + return &instance; + } + + DataModule(DataModule const&) = delete; + + void operator=(DataModule const&) = delete; + + const std::shared_ptr& log() const; + + void log(const std::shared_ptr& log); + +}; + +#define GKFS_DATA_MOD (static_cast(gkfs::data::DataModule::getInstance())) + +} // namespace data +} // namespace gkfs + +#endif //GEKKOFS_DAEMON_DATA_LOGGING_HPP diff --git a/include/daemon/backend/data/file_handle.hpp b/include/daemon/backend/data/file_handle.hpp new file mode 100644 index 0000000000000000000000000000000000000000..19e26b2b8bc7ad37115fd99b3e2ac97e55c7adaf --- /dev/null +++ b/include/daemon/backend/data/file_handle.hpp @@ -0,0 +1,99 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#ifndef GEKKOFS_DAEMON_FILE_HANDLE_HPP +#define GEKKOFS_DAEMON_FILE_HANDLE_HPP + +#include + +extern "C" { +#include +} + +namespace gkfs { +namespace data { + +/** + * File handle to encapsulate a file descriptor, allowing RAII closing of the file descriptor + */ +class FileHandle { + +private: + + constexpr static const int init_value{-1}; + + int fd_{init_value}; + std::string path_{}; + +public: + + FileHandle() = default; + + explicit FileHandle(int fd, std::string path) noexcept : + fd_(fd) {} + + FileHandle(FileHandle&& rhs) = default; + + FileHandle(const FileHandle& other) = delete; + + FileHandle& operator=(FileHandle&& rhs) = default; + + FileHandle& operator=(const FileHandle& other) = delete; + + explicit operator bool() const noexcept { + return valid(); + } + + bool operator!() const noexcept { + return !valid(); + } + + bool + valid() const noexcept { + return fd_ != init_value; + } + + int + native() const noexcept { + return fd_; + } + + /** + * Closes file descriptor and resets it to initial value + * @return + */ + bool close() noexcept { + if (fd_ != init_value) { + if (::close(fd_) < 0) { + GKFS_DATA_MOD->log()->warn("{}() Failed to close file descriptor '{}' path '{}' errno '{}'", __func__, + fd_, path_, ::strerror(errno)); + return false; + } + } + fd_ = init_value; + return true; + } + + ~FileHandle() { + if (fd_ != init_value) + close(); + } + + +}; + +} // namespace data +} // namespace gkfs + + +#endif //GEKKOFS_DAEMON_FILE_HANDLE_HPP diff --git a/include/daemon/backend/exceptions.hpp b/include/daemon/backend/exceptions.hpp index 36e80cfc148a2be058f9f045be5fe3f00216b279..b5cc11d53ab51fe28993ffdfcfe619c7c45f61a5 100644 --- a/include/daemon/backend/exceptions.hpp +++ b/include/daemon/backend/exceptions.hpp @@ -17,14 +17,20 @@ #include #include +namespace gkfs { +namespace metadata { + class DBException : public std::runtime_error { public: - DBException(const std::string& s) : std::runtime_error(s) {}; + explicit DBException(const std::string& s) : std::runtime_error(s) {}; }; class NotFoundException : public DBException { public: - NotFoundException(const std::string& s) : DBException(s) {}; + explicit NotFoundException(const std::string& s) : DBException(s) {}; }; +} // namespace metadata +} // namespace gkfs + #endif //GEKKOFS_DB_EXCEPTIONS_HPP diff --git a/include/daemon/handler/rpc_util.hpp b/include/daemon/handler/rpc_util.hpp index 38ef1094103e5ffb2adb5ef1b48bb53c6b0d5813..dd29b4d87249af1d8d2be4983d8435b9cb31f27c 100644 --- a/include/daemon/handler/rpc_util.hpp +++ b/include/daemon/handler/rpc_util.hpp @@ -25,8 +25,8 @@ extern "C" { namespace gkfs { namespace rpc { -template -inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t cleanup(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if (bulk_handle) { ret = margo_bulk_free(*bulk_handle); @@ -51,16 +51,41 @@ inline hg_return_t cleanup(hg_handle_t* handle, I* input, O* output, hg_bulk_t* return ret; } -template -inline hg_return_t cleanup_respond(hg_handle_t* handle, I* input, O* output, hg_bulk_t* bulk_handle) { +template +inline hg_return_t respond(hg_handle_t* handle, OutputType* output) { auto ret = HG_SUCCESS; if (output && handle) { ret = margo_respond(*handle, output); if (ret != HG_SUCCESS) return ret; } - return cleanup(handle, input, static_cast(nullptr), bulk_handle); + return ret; +} + +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output, hg_bulk_t* bulk_handle) { + auto ret = respond(handle, output); + if (ret != HG_SUCCESS) + return ret; + return cleanup(handle, input, static_cast(nullptr), bulk_handle); +} + +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, InputType* input, OutputType* output) { + return cleanup_respond(handle, input, output, nullptr); +} +template +inline hg_return_t cleanup_respond(hg_handle_t* handle, OutputType* output) { + auto ret = respond(handle, output); + if (ret != HG_SUCCESS) + return ret; + if (handle) { + ret = margo_destroy(*handle); + if (ret != HG_SUCCESS) + return ret; + } + return ret; } } // namespace rpc diff --git a/include/daemon/ops/data.hpp b/include/daemon/ops/data.hpp new file mode 100644 index 0000000000000000000000000000000000000000..f11bb154b937310945503c9b6c31d2dbe98b56ca --- /dev/null +++ b/include/daemon/ops/data.hpp @@ -0,0 +1,217 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#ifndef GEKKOFS_DAEMON_DATA_HPP +#define GEKKOFS_DAEMON_DATA_HPP + +#include +#include + +#include +#include + +extern "C" { +#include +#include +} + +namespace gkfs { +namespace data { + +class ChunkOpException : public std::runtime_error { +public: + explicit ChunkOpException(const std::string& s) : std::runtime_error(s) {}; +}; + +class ChunkWriteOpException : public ChunkOpException { +public: + explicit ChunkWriteOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +class ChunkReadOpException : public ChunkOpException { +public: + explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +class ChunkMetaOpException : public ChunkOpException { +public: + explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s) {}; +}; + +/** + * Classes to encapsulate asynchronous chunk operations. + * All operations on chunk files must go through the Argobots' task queues. + * Otherwise operations may overtake operations in the queues. + * This applies to write, read, and truncate which may modify the middle of a chunk, essentially a write operation. + * + * Note: This class is not thread-safe. + * + * In the future, this class may be used to provide failure tolerance for IO tasks + * + * Base class using the CRTP idiom + */ +template +class ChunkOperation { + +protected: + + const std::string path_; + + std::vector abt_tasks_; + std::vector task_eventuals_; + +public: + + explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {}; + + ChunkOperation(std::string path, size_t n) : path_(std::move(path)) { + // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults + abt_tasks_.resize(n); + task_eventuals_.resize(n); + }; + + ~ChunkOperation() { + cancel_all_tasks(); + } + + /** + * Cleans up and cancels all tasks in flight + */ + void cancel_all_tasks() { + GKFS_DATA->spdlogger()->trace("{}() enter", __func__); + for (auto& task : abt_tasks_) { + if (task) { + ABT_task_cancel(task); + ABT_task_free(&task); + } + } + for (auto& eventual : task_eventuals_) { + if (eventual) { + ABT_eventual_reset(eventual); + ABT_eventual_free(&eventual); + } + } + abt_tasks_.clear(); + task_eventuals_.clear(); + static_cast(this)->clear_task_args(); + } +}; + + +class ChunkTruncateOperation : public ChunkOperation { + friend class ChunkOperation; + +private: + struct chunk_truncate_args { + const std::string* path; + size_t size; + ABT_eventual eventual; + }; + + struct chunk_truncate_args task_arg_{}; + + static void truncate_abt(void* _arg); + + void clear_task_args(); + +public: + + explicit ChunkTruncateOperation(const std::string& path); + + ~ChunkTruncateOperation() = default; + + void truncate(size_t size); + + int wait_for_task(); +}; + +class ChunkWriteOperation : public ChunkOperation { + friend class ChunkOperation; + +private: + + struct chunk_write_args { + const std::string* path; + const char* buf; + gkfs::rpc::chnk_id_t chnk_id; + size_t size; + off64_t off; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void write_file_abt(void* _arg); + + void clear_task_args(); + +public: + + ChunkWriteOperation(const std::string& path, size_t n); + + ~ChunkWriteOperation() = default; + + void write_nonblock(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset); + + std::pair wait_for_tasks(); + +}; + + +class ChunkReadOperation : public ChunkOperation { + friend class ChunkOperation; + +private: + + struct chunk_read_args { + const std::string* path; + char* buf; + gkfs::rpc::chnk_id_t chnk_id; + size_t size; + off64_t off; + ABT_eventual eventual; + }; + + std::vector task_args_; + + static void read_file_abt(void* _arg); + + void clear_task_args(); + +public: + + struct bulk_args { + margo_instance_id mid; + hg_addr_t origin_addr; + hg_bulk_t origin_bulk_handle; + std::vector* origin_offsets; + hg_bulk_t local_bulk_handle; + std::vector* local_offsets; + std::vector* chunk_ids; + }; + + ChunkReadOperation(const std::string& path, size_t n); + + ~ChunkReadOperation() = default; + + void read_nonblock(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset); + + std::pair + wait_for_tasks_and_push_back(const bulk_args& args); + +}; + +} // namespace data +} // namespace gkfs + +#endif //GEKKOFS_DAEMON_DATA_HPP diff --git a/include/daemon/ops/metadentry.hpp b/include/daemon/ops/metadentry.hpp index 1c903a3ad880cacc23127948158dae11fd3a0821..fd41aa712f0e4cd7723476a3d0614b9e99eca43c 100644 --- a/include/daemon/ops/metadentry.hpp +++ b/include/daemon/ops/metadentry.hpp @@ -35,7 +35,7 @@ void update(const std::string& path, Metadata& md); void update_size(const std::string& path, size_t io_size, off_t offset, bool append); -void remove_node(const std::string& path); +void remove(const std::string& path); } // namespace metadata } // namespace gkfs diff --git a/include/global/global_defs.hpp b/include/global/global_defs.hpp index ffe78f41f15fc42e93f486114dc06b79ffd1746f..ae099824f5b864f6663659c16137c7c7f7dfd9b7 100644 --- a/include/global/global_defs.hpp +++ b/include/global/global_defs.hpp @@ -18,6 +18,9 @@ namespace gkfs { // These constexpr set the RPC's identity and which handler the receiver end should use namespace rpc { + +using chnk_id_t = unsigned long; + namespace tag { constexpr auto fs_config = "rpc_srv_fs_config"; @@ -46,10 +49,6 @@ constexpr auto ofi_verbs = "ofi+verbs"; } // namespace protocol } // namespace rpc -namespace types { -// typedefs -typedef unsigned long rpc_chnk_id_t; -} // namespace types } // namespace gkfs #endif //GEKKOFS_GLOBAL_DEFS_HPP diff --git a/include/global/rpc/rpc_types.hpp b/include/global/rpc/rpc_types.hpp index 5e6d57f12907296787bd05135ed04280741bbcd6..65b086ad236f3039e9da5d70a53236c75cede4a9 100644 --- a/include/global/rpc/rpc_types.hpp +++ b/include/global/rpc/rpc_types.hpp @@ -23,88 +23,95 @@ extern "C" { /* visible API for RPC data types used in RPCS */ // misc generic rpc types -MERCURY_GEN_PROC(rpc_err_out_t, ((hg_int32_t) (err))) +MERCURY_GEN_PROC(rpc_err_out_t, + ((hg_int32_t) (err))) // Metadentry MERCURY_GEN_PROC(rpc_mk_node_in_t, - ((hg_const_string_t) (path))\ -((uint32_t) (mode))) + ((hg_const_string_t) (path)) + ((uint32_t) (mode))) -MERCURY_GEN_PROC(rpc_path_only_in_t, ((hg_const_string_t) (path))) +MERCURY_GEN_PROC(rpc_path_only_in_t, + ((hg_const_string_t) (path))) -MERCURY_GEN_PROC(rpc_stat_out_t, ((hg_int32_t) (err)) - ((hg_const_string_t) (db_val))) +MERCURY_GEN_PROC(rpc_stat_out_t, + ((hg_int32_t) (err)) + ((hg_const_string_t) (db_val))) -MERCURY_GEN_PROC(rpc_rm_node_in_t, ((hg_const_string_t) (path))) +MERCURY_GEN_PROC(rpc_rm_node_in_t, + ((hg_const_string_t) (path))) MERCURY_GEN_PROC(rpc_trunc_in_t, - ((hg_const_string_t) (path)) \ -((hg_uint64_t) (length))) + ((hg_const_string_t) (path)) + ((hg_uint64_t) (length))) MERCURY_GEN_PROC(rpc_update_metadentry_in_t, - ((hg_const_string_t) (path))\ -((uint64_t) (nlink))\ -((hg_uint32_t) (mode))\ -((hg_uint32_t) (uid))\ -((hg_uint32_t) (gid))\ -((hg_int64_t) (size))\ -((hg_int64_t) (blocks))\ -((hg_int64_t) (atime))\ -((hg_int64_t) (mtime))\ -((hg_int64_t) (ctime))\ -((hg_bool_t) (nlink_flag))\ -((hg_bool_t) (mode_flag))\ -((hg_bool_t) (size_flag))\ -((hg_bool_t) (block_flag))\ -((hg_bool_t) (atime_flag))\ -((hg_bool_t) (mtime_flag))\ -((hg_bool_t) (ctime_flag))) - -MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, ((hg_const_string_t) (path)) - ((hg_uint64_t) (size)) - ((hg_int64_t) (offset)) - ((hg_bool_t) (append))) - -MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, ((hg_int32_t) (err)) - ((hg_int64_t) (ret_size))) - -MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, ((hg_int32_t) (err)) - ((hg_int64_t) (ret_size))) + ((hg_const_string_t) (path)) + ((uint64_t) (nlink)) + ((hg_uint32_t) (mode)) + ((hg_uint32_t) (uid)) + ((hg_uint32_t) (gid)) + ((hg_int64_t) (size)) + ((hg_int64_t) (blocks)) + ((hg_int64_t) (atime)) + ((hg_int64_t) (mtime)) + ((hg_int64_t) (ctime)) + ((hg_bool_t) (nlink_flag)) + ((hg_bool_t) (mode_flag)) + ((hg_bool_t) (size_flag)) + ((hg_bool_t) (block_flag)) + ((hg_bool_t) (atime_flag)) + ((hg_bool_t) (mtime_flag)) + ((hg_bool_t) (ctime_flag))) + +MERCURY_GEN_PROC(rpc_update_metadentry_size_in_t, + ((hg_const_string_t) (path)) + ((hg_uint64_t) (size)) + ((hg_int64_t) (offset)) + ((hg_bool_t) (append))) + +MERCURY_GEN_PROC(rpc_update_metadentry_size_out_t, + ((hg_int32_t) (err)) + ((hg_int64_t) (ret_size))) + +MERCURY_GEN_PROC(rpc_get_metadentry_size_out_t, + ((hg_int32_t) (err)) + ((hg_int64_t) (ret_size))) #ifdef HAS_SYMLINKS MERCURY_GEN_PROC(rpc_mk_symlink_in_t, - ((hg_const_string_t) (path))\ -((hg_const_string_t) (target_path)) + ((hg_const_string_t) (path)) + ((hg_const_string_t) (target_path)) ) #endif // data MERCURY_GEN_PROC(rpc_read_data_in_t, - ((hg_const_string_t) (path))\ -((int64_t) (offset))\ -((hg_uint64_t) (host_id))\ -((hg_uint64_t) (host_size))\ -((hg_uint64_t) (chunk_n))\ -((hg_uint64_t) (chunk_start))\ -((hg_uint64_t) (chunk_end))\ -((hg_uint64_t) (total_chunk_size))\ -((hg_bulk_t) (bulk_handle))) + ((hg_const_string_t) (path)) + ((int64_t) (offset)) + ((hg_uint64_t) (host_id)) + ((hg_uint64_t) (host_size)) + ((hg_uint64_t) (chunk_n)) + ((hg_uint64_t) (chunk_start)) + ((hg_uint64_t) (chunk_end)) + ((hg_uint64_t) (total_chunk_size)) + ((hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC(rpc_data_out_t, - ((int32_t) (err))\ -((hg_size_t) (io_size))) + ((int32_t) (err)) + ((hg_size_t) (io_size))) MERCURY_GEN_PROC(rpc_write_data_in_t, - ((hg_const_string_t) (path))\ -((int64_t) (offset))\ -((hg_uint64_t) (host_id))\ -((hg_uint64_t) (host_size))\ -((hg_uint64_t) (chunk_n))\ -((hg_uint64_t) (chunk_start))\ -((hg_uint64_t) (chunk_end))\ -((hg_uint64_t) (total_chunk_size))\ -((hg_bulk_t) (bulk_handle))) + ((hg_const_string_t) (path)) + ((int64_t) (offset)) + ((hg_uint64_t) (host_id)) + ((hg_uint64_t) (host_size)) + ((hg_uint64_t) (chunk_n)) + ((hg_uint64_t) (chunk_start)) + ((hg_uint64_t) (chunk_end)) + ((hg_uint64_t) (total_chunk_size)) + ((hg_bulk_t) (bulk_handle))) MERCURY_GEN_PROC(rpc_get_dirents_in_t, ((hg_const_string_t) (path)) @@ -117,15 +124,16 @@ MERCURY_GEN_PROC(rpc_get_dirents_out_t, ) -MERCURY_GEN_PROC(rpc_config_out_t, ((hg_const_string_t) (mountdir)) - ((hg_const_string_t) (rootdir)) \ -((hg_bool_t) (atime_state)) \ -((hg_bool_t) (mtime_state)) \ -((hg_bool_t) (ctime_state)) \ -((hg_bool_t) (link_cnt_state)) \ -((hg_bool_t) (blocks_state)) \ -((hg_uint32_t) (uid)) \ -((hg_uint32_t) (gid)) \ +MERCURY_GEN_PROC(rpc_config_out_t, + ((hg_const_string_t) (mountdir)) + ((hg_const_string_t) (rootdir)) + ((hg_bool_t) (atime_state)) + ((hg_bool_t) (mtime_state)) + ((hg_bool_t) (ctime_state)) + ((hg_bool_t) (link_cnt_state)) + ((hg_bool_t) (blocks_state)) + ((hg_uint32_t) (uid)) + ((hg_uint32_t) (gid)) ) @@ -134,7 +142,8 @@ MERCURY_GEN_PROC(rpc_chunk_stat_in_t, ) MERCURY_GEN_PROC(rpc_chunk_stat_out_t, - ((hg_uint64_t) (chunk_size)) + ((hg_int32_t) (err)) + ((hg_uint64_t) (chunk_size)) ((hg_uint64_t) (chunk_total)) ((hg_uint64_t) (chunk_free)) ) diff --git a/include/global/rpc/rpc_util.hpp b/include/global/rpc/rpc_util.hpp index 2822c67c0bf6fb8a420d99cf0085c2b80511578d..cdac93a2bdd1981f09797143183dbe50a35d80f8 100644 --- a/include/global/rpc/rpc_util.hpp +++ b/include/global/rpc/rpc_util.hpp @@ -22,10 +22,16 @@ extern "C" { #include +namespace gkfs { +namespace rpc { + hg_bool_t bool_to_merc_bool(bool state); std::string get_my_hostname(bool short_hostname = false); std::string get_host_by_name(const std::string& hostname); +} // namespace rpc +} // namespace gkfs + #endif //GEKKOFS_GLOBAL_RPC_UTILS_HPP diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index c944e6fc459883aa46c59e74917c05704fb57f2b..2f69aa0c4e3afe2a64195702e93055f1a864a12a 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -271,7 +271,13 @@ int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, #endif int gkfs_statfs(struct statfs* buf) { - auto blk_stat = gkfs::rpc::forward_get_chunk_stat(); + gkfs::rpc::ChunkStat blk_stat{}; + try { + blk_stat = gkfs::rpc::forward_get_chunk_stat(); + } catch (const std::exception& e) { + LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + return -1; + } buf->f_type = 0; buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; @@ -288,8 +294,13 @@ int gkfs_statfs(struct statfs* buf) { } int gkfs_statvfs(struct statvfs* buf) { - gkfs::preload::init_ld_env_if_needed(); - auto blk_stat = gkfs::rpc::forward_get_chunk_stat(); + gkfs::rpc::ChunkStat blk_stat{}; + try { + blk_stat = gkfs::rpc::forward_get_chunk_stat(); + } catch (const std::exception& e) { + LOG(ERROR, "{}() Failure with error: '{}'", e.what()); + return -1; + } buf->f_bsize = blk_stat.chunk_size; buf->f_blocks = blk_stat.chunk_total; buf->f_bfree = blk_stat.chunk_free; diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index 4db07f20e7eb71405b93bd22b3766ecc57986850..e9663604ad51a1d7c2662aab0626c3d30b926d6f 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -195,7 +195,7 @@ void load_hosts() { LOG(INFO, "Hosts pool size: {}", hosts.size()); - auto local_hostname = get_my_hostname(true); + auto local_hostname = gkfs::rpc::get_my_hostname(true); bool local_host_found = false; std::vector addrs; diff --git a/src/client/rpc/forward_data.cpp b/src/client/rpc/forward_data.cpp index f68f65e4849153e97d10e0a5fa3c8d38e0642925..3c850a2182236fb7ed6d05907efa88a9b040f6db 100644 --- a/src/client/rpc/forward_data.cpp +++ b/src/client/rpc/forward_data.cpp @@ -415,6 +415,11 @@ int forward_truncate(const std::string& path, size_t current_size, size_t new_si return error ? -1 : 0; } +/** + * Performs a chunk stat RPC to all hosts + * @return rpc::ChunkStat + * @throws std::runtime_error + */ ChunkStat forward_get_chunk_stat() { std::vector> handles; @@ -444,25 +449,37 @@ ChunkStat forward_get_chunk_stat() { unsigned long chunk_total = 0; unsigned long chunk_free = 0; + int error = 0; + // wait for RPC responses for (std::size_t i = 0; i < handles.size(); ++i) { - gkfs::rpc::chunk_stat::output out; + gkfs::rpc::chunk_stat::output out{}; try { // XXX We might need a timeout here to not wait forever for an // output that never comes? out = handles[i].get().at(0); + if (out.err() != 0) { + error = out.err(); + LOG(ERROR, "Host '{}' reported err code '{}' during stat chunk.", CTX->hosts().at(i).to_string(), + error); + // we don't break here to ensure all responses are processed + continue; + } assert(out.chunk_size() == chunk_size); chunk_total += out.chunk_total(); chunk_free += out.chunk_free(); - } catch (const std::exception& ex) { - throw std::runtime_error( - fmt::format("Failed to get rpc output for target host: {}]", i)); + errno = EBUSY; + throw std::runtime_error(fmt::format("Failed to get RPC output from host: {}", i)); } } + if (error != 0) { + errno = error; + throw std::runtime_error("chunk stat failed on one host"); + } return {chunk_size, chunk_total, chunk_free}; } diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index f66a09001dd40a3b0b4d772a5139c4f334122c3e..884e7ee14a0a14b2e0157f531c608988bc6b3f46 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -129,24 +129,30 @@ int forward_remove(const std::string& path, const bool remove_metadentry_only, c // Small files if (static_cast(size / gkfs::config::rpc::chunksize) < CTX->hosts().size()) { - - auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path)); + const auto metadata_host_id = CTX->distributor()->locate_file_metadata(path); + const auto endp_metadata = CTX->hosts().at(metadata_host_id); try { - LOG(DEBUG, "Sending RPC to host: {}", endp.to_string()); + LOG(DEBUG, "Sending RPC to host: {}", endp_metadata.to_string()); gkfs::rpc::remove::input in(path); - handles.emplace_back(ld_network_service->post(endp, in)); + handles.emplace_back(ld_network_service->post(endp_metadata, in)); uint64_t chnk_start = 0; uint64_t chnk_end = size / gkfs::config::rpc::chunksize; for (uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) { - const auto target = CTX->hosts().at( - CTX->distributor()->locate_data(path, chnk_id)); - - LOG(DEBUG, "Sending RPC to host: {}", target.to_string()); - - handles.emplace_back(ld_network_service->post(target, in)); + const auto chnk_host_id = CTX->distributor()->locate_data(path, chnk_id); + /* + * If the chnk host matches the metadata host the remove request as already been sent + * as part of the metadata remove request. + */ + if (chnk_host_id == metadata_host_id) + continue; + const auto endp_chnk = CTX->hosts().at(chnk_host_id); + + LOG(DEBUG, "Sending RPC to host: {}", endp_chnk.to_string()); + + handles.emplace_back(ld_network_service->post(endp_chnk, in)); } } catch (const std::exception& ex) { LOG(ERROR, "Failed to send reduced remove requests"); diff --git a/src/daemon/CMakeLists.txt b/src/daemon/CMakeLists.txt index e4ba89177c10f43f57806913a89844e6f43347a1..945a21983032a05ed2ef1675e2e6f96f5ba85c86 100644 --- a/src/daemon/CMakeLists.txt +++ b/src/daemon/CMakeLists.txt @@ -7,10 +7,11 @@ set(DAEMON_SRC daemon.cpp util.cpp ops/metadentry.cpp + ops/data.cpp classes/fs_data.cpp classes/rpc_data.cpp - handler/srv_metadata.cpp handler/srv_data.cpp + handler/srv_metadata.cpp handler/srv_management.cpp ) set(DAEMON_HEADERS @@ -23,6 +24,7 @@ set(DAEMON_HEADERS ../../include/global/path_util.hpp ../../include/daemon/daemon.hpp ../../include/daemon/util.hpp + ../../include/daemon/ops/data.hpp ../../include/daemon/ops/metadentry.hpp ../../include/daemon/classes/fs_data.hpp ../../include/daemon/classes/rpc_data.hpp diff --git a/src/daemon/backend/data/CMakeLists.txt b/src/daemon/backend/data/CMakeLists.txt index 41f1ec0f998a2e31fd26fcf595312e1706ff239e..9535714be84359270aadad00d4b833bf89380c5d 100644 --- a/src/daemon/backend/data/CMakeLists.txt +++ b/src/daemon/backend/data/CMakeLists.txt @@ -5,17 +5,20 @@ target_sources(storage ${INCLUDE_DIR}/daemon/backend/data/chunk_storage.hpp PRIVATE ${INCLUDE_DIR}/global/path_util.hpp + ${INCLUDE_DIR}/global/global_defs.hpp + ${INCLUDE_DIR}/daemon/backend/data/data_module.hpp + ${INCLUDE_DIR}/daemon/backend/data/file_handle.hpp ${CMAKE_CURRENT_LIST_DIR}/chunk_storage.cpp + ${CMAKE_CURRENT_LIST_DIR}/data_module.cpp ) target_link_libraries(storage PRIVATE spdlog Boost::filesystem - ${ABT_LIBRARIES} + -ldl ) -target_include_directories(storage - PRIVATE - ${ABT_INCLUDE_DIRS} - ) +#target_include_directories(storage +# PRIVATE +# ) diff --git a/src/daemon/backend/data/chunk_storage.cpp b/src/daemon/backend/data/chunk_storage.cpp index 3d9b69fecaada122effbe1b47cac7b2cb996e5d1..b4c4e116b8e0bcf3445451b20ec237ccc193dd97 100644 --- a/src/daemon/backend/data/chunk_storage.cpp +++ b/src/daemon/backend/data/chunk_storage.cpp @@ -11,10 +11,13 @@ SPDX-License-Identifier: MIT */ +#include #include +#include #include #include + #include #include @@ -28,22 +31,11 @@ using namespace std; namespace gkfs { namespace data { +// private functions + string ChunkStorage::absolute(const string& internal_path) const { assert(gkfs::path::is_relative(internal_path)); - return root_path + '/' + internal_path; -} - -ChunkStorage::ChunkStorage(const string& path, const size_t chunksize) : - root_path(path), - chunksize(chunksize) { - //TODO check path: absolute, exists, permission to write etc... - assert(gkfs::path::is_absolute(root_path)); - - /* Initialize logger */ - log = spdlog::get(LOGGER_NAME); - assert(log); - - log->debug("Chunk storage initialized with path: '{}'", root_path); + return fmt::format("{}/{}", root_path_, internal_path); } string ChunkStorage::get_chunks_dir(const string& file_path) { @@ -53,169 +45,268 @@ string ChunkStorage::get_chunks_dir(const string& file_path) { return chunk_dir; } -string ChunkStorage::get_chunk_path(const string& file_path, unsigned int chunk_id) { - return get_chunks_dir(file_path) + '/' + ::to_string(chunk_id); -} - -void ChunkStorage::destroy_chunk_space(const string& file_path) const { - auto chunk_dir = absolute(get_chunks_dir(file_path)); - try { - bfs::remove_all(chunk_dir); - } catch (const bfs::filesystem_error& e) { - log->error("Failed to remove chunk directory. Path: '{}', Error: '{}'", chunk_dir, e.what()); - } +string ChunkStorage::get_chunk_path(const string& file_path, gkfs::rpc::chnk_id_t chunk_id) { + return fmt::format("{}/{}", get_chunks_dir(file_path), chunk_id); } +/** + * Creates a chunk directory that all chunk files are placed in. + * The path to the real file will be used as the directory name + * @param file_path + * @throws ChunkStorageException on error + */ void ChunkStorage::init_chunk_space(const string& file_path) const { auto chunk_dir = absolute(get_chunks_dir(file_path)); auto err = mkdir(chunk_dir.c_str(), 0750); if (err == -1 && errno != EEXIST) { - log->error("Failed to create chunk dir. Path: '{}', Error: '{}'", chunk_dir, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to create chunk directory"); + auto err_str = fmt::format("{}() Failed to create chunk directory. File: '{}', Error: '{}'", __func__, + file_path, errno); + throw ChunkStorageException(errno, err_str); } } -/* Delete all chunks stored on this node that falls in the gap [chunk_start, chunk_end] +// public functions + +/** * - * This is pretty slow method because it cycle over all the chunks sapce for this file. + * @param path + * @param chunksize + * @throws ChunkStorageException */ -void ChunkStorage::trim_chunk_space(const string& file_path, - unsigned int chunk_start, unsigned int chunk_end) { - - auto chunk_dir = absolute(get_chunks_dir(file_path)); - const bfs::directory_iterator end; - - for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { - auto chunk_path = chunk_file->path(); - auto chunk_id = ::stoul(chunk_path.filename().c_str()); - if (chunk_id >= chunk_start && chunk_id <= chunk_end) { - int ret = unlink(chunk_path.c_str()); - if (ret == -1) { - log->error("Failed to remove chunk file. File: '{}', Error: '{}'", chunk_path.native(), - ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to remove chunk file"); - } - } +ChunkStorage::ChunkStorage(string& path, const size_t chunksize) : + root_path_(path), + chunksize_(chunksize) { + /* Get logger instance and set it for data module and chunk storage */ + GKFS_DATA_MOD->log(spdlog::get(GKFS_DATA_MOD->LOGGER_NAME)); + assert(GKFS_DATA_MOD->log()); + log_ = spdlog::get(GKFS_DATA_MOD->LOGGER_NAME); + assert(log_); + assert(gkfs::path::is_absolute(root_path_)); + // Verify that we have sufficient access for chunk directories + if (access(root_path_.c_str(), W_OK | R_OK) != 0) { + auto err_str = fmt::format("{}() Insufficient permissions to create chunk directories in path '{}'", __func__, + root_path_); + throw ChunkStorageException(EPERM, err_str); } + log_->debug("{}() Chunk storage initialized with path: '{}'", __func__, root_path_); } -void ChunkStorage::delete_chunk(const string& file_path, unsigned int chunk_id) { - auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - int ret = unlink(chunk_path.c_str()); - if (ret == -1) { - log->error("Failed to remove chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to remove chunk file"); - } -} - -void ChunkStorage::truncate_chunk(const string& file_path, unsigned int chunk_id, off_t length) { - auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - assert(length > 0 && (unsigned int) length <= chunksize); - int ret = truncate(chunk_path.c_str(), length); - if (ret == -1) { - log->error("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to truncate chunk file"); +/** + * Removes chunk directory with all its files + * @param file_path + * @throws ChunkStorageException + */ +void ChunkStorage::destroy_chunk_space(const string& file_path) const { + auto chunk_dir = absolute(get_chunks_dir(file_path)); + try { + // Note: remove_all does not throw an error when path doesn't exist. + auto n = bfs::remove_all(chunk_dir); + log_->debug("{}() Removed '{}' files from '{}'", __func__, n, chunk_dir); + } catch (const bfs::filesystem_error& e) { + auto err_str = fmt::format("{}() Failed to remove chunk directory. Path: '{}', Error: '{}'", __func__, + chunk_dir, e.what()); + throw ChunkStorageException(e.code().value(), err_str); } } -void ChunkStorage::write_chunk(const string& file_path, unsigned int chunk_id, - const char* buff, size_t size, off64_t offset, ABT_eventual& eventual) const { - - assert((offset + size) <= chunksize); +/** + * Writes a chunk file. + * On failure throws ChunkStorageException with encapsulated error code + * + * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pwrite behavior + * + * @param file_path + * @param chunk_id + * @param buf + * @param size + * @param offset + * @param eventual + * @throws ChunkStorageException (caller will handle eventual signalling) + */ +ssize_t +ChunkStorage::write_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, const char* buf, size_t size, + off64_t offset) const { + assert((offset + size) <= chunksize_); + // may throw ChunkStorageException on failure init_chunk_space(file_path); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - int fd = open(chunk_path.c_str(), O_WRONLY | O_CREAT, 0640); - if (fd < 0) { - log->error("Failed to open chunk file for write. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to open chunk file for write"); - } - auto wrote = pwrite(fd, buff, size, offset); - if (wrote < 0) { - log->error("Failed to write chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", - chunk_path, size, offset, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to write chunk file"); + FileHandle fh(open(chunk_path.c_str(), O_WRONLY | O_CREAT, 0640), chunk_path); + if (!fh.valid()) { + auto err_str = fmt::format("{}() Failed to open chunk file for write. File: '{}', Error: '{}'", __func__, + chunk_path, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - ABT_eventual_set(eventual, &wrote, sizeof(size_t)); + size_t wrote_total{}; + ssize_t wrote{}; - auto err = close(fd); - if (err < 0) { - log->error("Failed to close chunk file after write. File: '{}', Error: '{}'", - chunk_path, ::strerror(errno)); - //throw ::system_error(errno, ::system_category(), "Failed to close chunk file"); - } + do { + wrote = pwrite(fh.native(), + buf + wrote_total, + size - wrote_total, + offset + wrote_total); + + if (wrote < 0) { + // retry if a signal or anything else has interrupted the read system call + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + continue; + auto err_str = fmt::format( + "{}() Failed to write chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", + __func__, chunk_path, size, offset, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); + } + wrote_total += wrote; + } while (wrote_total != size); + + // file is closed via the file handle's destructor. + return wrote_total; } -void ChunkStorage::read_chunk(const string& file_path, unsigned int chunk_id, - char* buff, size_t size, off64_t offset, ABT_eventual& eventual) const { - assert((offset + size) <= chunksize); +/** + * Read from a chunk file. + * On failure throws ChunkStorageException with encapsulated error code + * + * Refer to https://www.gnu.org/software/libc/manual/html_node/I_002fO-Primitives.html for pread behavior + * @param file_path + * @param chunk_id + * @param buf + * @param size + * @param offset + * @param eventual + * @throws ChunkStorageException (caller will handle eventual signalling) + */ +ssize_t +ChunkStorage::read_chunk(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, char* buf, size_t size, + off64_t offset) const { + assert((offset + size) <= chunksize_); auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); - int fd = open(chunk_path.c_str(), O_RDONLY); - if (fd < 0) { - log->error("Failed to open chunk file for read. File: '{}', Error: '{}'", chunk_path, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to open chunk file for read"); + + FileHandle fh(open(chunk_path.c_str(), O_RDONLY), chunk_path); + if (!fh.valid()) { + auto err_str = fmt::format("{}() Failed to open chunk file for read. File: '{}', Error: '{}'", __func__, + chunk_path, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - size_t tot_read = 0; + size_t read_total = 0; ssize_t read = 0; do { - read = pread64(fd, - buff + tot_read, - size - tot_read, - offset + tot_read); + read = pread64(fh.native(), + buf + read_total, + size - read_total, + offset + read_total); if (read == 0) { + /* + * A value of zero indicates end-of-file (except if the value of the size argument is also zero). + * This is not considered an error. If you keep calling read while at end-of-file, + * it will keep returning zero and doing nothing else. + * Hence, we break here. + */ break; } if (read < 0) { - log->error("Failed to read chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", - chunk_path, size, offset, ::strerror(errno)); - throw ::system_error(errno, ::system_category(), "Failed to read chunk file"); + // retry if a signal or anything else has interrupted the read system call + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + continue; + auto err_str = fmt::format("Failed to read chunk file. File: '{}', size: '{}', offset: '{}', Error: '{}'", + chunk_path, size, offset, ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } #ifndef NDEBUG - if (tot_read + read < size) { - log->warn("Read less bytes than requested: '{}'/{}. Total read was '{}'", read, size - tot_read, size); + if (read_total + read < size) { + log_->debug("Read less bytes than requested: '{}'/{}. Total read was '{}'. This is not an error!", read, + size - read_total, size); } #endif assert(read > 0); - tot_read += read; + read_total += read; + } while (read_total != size); + // file is closed via the file handle's destructor. + return read_total; +} - } while (tot_read != size); +/** +* Delete all chunks starting with chunk a chunk id. +* Note eventual consistency here: While chunks are removed, there is no lock that prevents +* other processes from modifying anything in that directory. +* It is the application's responsibility to stop modifying the file while truncate is executed +* +* If an error is encountered when removing a chunk file, the function will still remove all files and +* report the error afterwards with ChunkStorageException. + * @param file_path + * @param chunk_start + * @throws ChunkStorageException + */ +void ChunkStorage::trim_chunk_space(const string& file_path, gkfs::rpc::chnk_id_t chunk_start) { - ABT_eventual_set(eventual, &tot_read, sizeof(size_t)); + auto chunk_dir = absolute(get_chunks_dir(file_path)); + const bfs::directory_iterator end; + auto err_flag = false; + for (bfs::directory_iterator chunk_file(chunk_dir); chunk_file != end; ++chunk_file) { + auto chunk_path = chunk_file->path(); + auto chunk_id = std::stoul(chunk_path.filename().c_str()); + if (chunk_id >= chunk_start) { + auto err = unlink(chunk_path.c_str()); + if (err == -1 && errno != ENOENT) { + err_flag = true; + log_->warn("{}() Failed to remove chunk file. File: '{}', Error: '{}'", __func__, chunk_path.native(), + ::strerror(errno)); + } + } + } + if (err_flag) + throw ChunkStorageException(EIO, fmt::format("{}() One or more errors occurred when truncating '{}'", __func__, + file_path)); +} - auto err = close(fd); - if (err < 0) { - log->error("Failed to close chunk file after read. File: '{}', Error: '{}'", - chunk_path, ::strerror(errno)); - //throw ::system_error(errno, ::system_category(), "Failed to close chunk file"); +/** + * Truncates a single chunk file to a given length + * @param file_path + * @param chunk_id + * @param length + * @throws ChunkStorageException + */ +void ChunkStorage::truncate_chunk_file(const string& file_path, gkfs::rpc::chnk_id_t chunk_id, off_t length) { + auto chunk_path = absolute(get_chunk_path(file_path, chunk_id)); + assert(length > 0 && static_cast(length) <= chunksize_); + auto ret = truncate(chunk_path.c_str(), length); + if (ret == -1) { + auto err_str = fmt::format("Failed to truncate chunk file. File: '{}', Error: '{}'", chunk_path, + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } } +/** + * Calls statfs on the chunk directory to get statistic on its used and free size left + * @return ChunkStat + * @throws ChunkStorageException + */ ChunkStat ChunkStorage::chunk_stat() const { struct statfs sfs{}; - if (statfs(root_path.c_str(), &sfs) != 0) { - log->error("Failed to get filesystem statistic for chunk directory." - " Error: '{}'", ::strerror(errno)); - throw ::system_error(errno, ::system_category(), - "statfs() failed on chunk directory"); + + if (statfs(root_path_.c_str(), &sfs) != 0) { + auto err_str = fmt::format("Failed to get filesystem statistic for chunk directory. Error: '{}'", + ::strerror(errno)); + throw ChunkStorageException(errno, err_str); } - log->debug("Chunksize '{}', total '{}', free '{}'", sfs.f_bsize, sfs.f_blocks, sfs.f_bavail); + log_->debug("Chunksize '{}', total '{}', free '{}'", sfs.f_bsize, sfs.f_blocks, sfs.f_bavail); auto bytes_total = static_cast(sfs.f_bsize) * static_cast(sfs.f_blocks); auto bytes_free = static_cast(sfs.f_bsize) * static_cast(sfs.f_bavail); - return {chunksize, - bytes_total / chunksize, - bytes_free / chunksize}; + return {chunksize_, + bytes_total / chunksize_, + bytes_free / chunksize_}; } } // namespace data diff --git a/src/daemon/backend/data/data_module.cpp b/src/daemon/backend/data/data_module.cpp new file mode 100644 index 0000000000000000000000000000000000000000..42391cc0f81092c2ba77f0950f66aacb290c9624 --- /dev/null +++ b/src/daemon/backend/data/data_module.cpp @@ -0,0 +1,28 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#include + +namespace gkfs { +namespace data { + +const std::shared_ptr& DataModule::log() const { + return log_; +} + +void DataModule::log(const std::shared_ptr& log) { + DataModule::log_ = log; +} + +} // namespace data +} // namespace gkfs \ No newline at end of file diff --git a/src/daemon/daemon.cpp b/src/daemon/daemon.cpp index a7a920627495254d71fd8cd063ac67208f5f925d..5caf65c13ef5fdcf6cdaca92c09e3aa04ed4308e 100644 --- a/src/daemon/daemon.cpp +++ b/src/daemon/daemon.cpp @@ -269,7 +269,7 @@ void initialize_loggers() { auto logger_names = std::vector{ "main", "MetadataDB", - "ChunkStorage", + "DataModule", }; gkfs::log::setup(logger_names, level, path); @@ -347,9 +347,8 @@ int main(int argc, const char* argv[]) { } } else { if (RPC_PROTOCOL != string(gkfs::rpc::protocol::ofi_verbs)) - addr = get_my_hostname(true); + addr = gkfs::rpc::get_my_hostname(true); } - GKFS_DATA->bind_addr(fmt::format("{}://{}", RPC_PROTOCOL, addr)); string hosts_file; diff --git a/src/daemon/handler/srv_data.cpp b/src/daemon/handler/srv_data.cpp index 553a9eaa5391e2e7ff9235f9c069e94cc0e7eb3f..9b8cca9d71c543f1343e02b041c6f71fecf40c13 100644 --- a/src/daemon/handler/srv_data.cpp +++ b/src/daemon/handler/srv_data.cpp @@ -16,111 +16,19 @@ #include #include #include +#include #include #include #include - using namespace std; -struct write_chunk_args { - const std::string* path; - const char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; - size_t size; - off64_t off; - ABT_eventual eventual; -}; - /** - * Used by an argobots threads. Argument args has the following fields: - * const std::string* path; - const char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; - size_t size; - off64_t off; - ABT_eventual* eventual; - * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. - * This function is called by tasklets, as this function cannot be allowed to block. - * @return written_size is put into eventual and returned that way - */ -void write_file_abt(void* _arg) { - // Unpack args - auto* arg = static_cast(_arg); - const std::string& path = *(arg->path); - - try { - GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, - arg->buf, arg->size, arg->off, arg->eventual); - } catch (const std::system_error& serr) { - GKFS_DATA->spdlogger()->error("{}() Error writing chunk {} of file {}", __func__, arg->chnk_id, path); - ssize_t wrote = -(serr.code().value()); - ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); - } - -} - -struct read_chunk_args { - const std::string* path; - char* buf; - gkfs::types::rpc_chnk_id_t chnk_id; - size_t size; - off64_t off; - ABT_eventual eventual; -}; - -/** - * Used by an argobots threads. Argument args has the following fields: - * const std::string* path; - char* buf; - const gkfs::types::rpc_chnk_id_t* chnk_id; - size_t size; - off64_t off; - ABT_eventual* eventual; - * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. - * This function is called by tasklets, as this function cannot be allowed to block. - * @return read_size is put into eventual and returned that way - */ -void read_file_abt(void* _arg) { - //unpack args - auto* arg = static_cast(_arg); - const std::string& path = *(arg->path); - - try { - GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, - arg->buf, arg->size, arg->off, arg->eventual); - } catch (const std::system_error& serr) { - GKFS_DATA->spdlogger()->error("{}() Error reading chunk {} of file {}", __func__, arg->chnk_id, path); - ssize_t read = -(serr.code().value()); - ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); - } -} - -/** - * Free Argobots tasks and eventual constructs in a given vector until max_idx. - * Nothing is done for a vector if nullptr is given - * @param abt_tasks - * @param abt_eventuals - * @param max_idx + * RPC handler for an incoming write RPC + * @param handle * @return */ -void cancel_abt_io(vector* abt_tasks, vector* abt_eventuals, uint64_t max_idx) { - if (abt_tasks != nullptr) { - for (uint64_t i = 0; i < max_idx; i++) { - ABT_task_cancel(abt_tasks->at(i)); - ABT_task_free(&abt_tasks->at(i)); - } - } - if (abt_eventuals != nullptr) { - for (uint64_t i = 0; i < max_idx; i++) { - ABT_eventual_reset(abt_eventuals->at(i)); - ABT_eventual_free(&abt_eventuals->at(i)); - } - } -} - - static hg_return_t rpc_srv_write(hg_handle_t handle) { /* * 1. Setup @@ -140,8 +48,9 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, - in.path, bulk_size, in.offset); + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", + __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* * 2. Set up buffers for pull bulk transfers */ @@ -165,7 +74,6 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); - auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -189,32 +97,38 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size : gkfs::config::rpc::chunksize; uint64_t origin_offset; uint64_t local_offset; - // task structures for async writing - vector abt_tasks(in.chunk_n); - vector task_eventuals(in.chunk_n); - vector task_args(in.chunk_n); + // object for asynchronous disk IO + gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n}; + /* * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk */ // Start to look for a chunk that hashes to this host with the first chunk in the buffer - for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { + for (auto chnk_id_file = in.chunk_start; + chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - if (distributor.locate_data(in.path, chnk_id_file) != host_id) + if (distributor.locate_data(in.path, chnk_id_file) != host_id) { + GKFS_DATA->spdlogger()->trace( + "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", + __func__, chnk_id_file, host_id, chnk_id_curr); continue; + } chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // offset case. Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { // if only 1 destination and 1 chunk (small write) the transfer_size == bulk_size - auto offset_transfer_size = (in.offset + bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size - : static_cast( - gkfs::config::rpc::chunksize - in.offset); + size_t offset_transfer_size = 0; + if (in.offset + bulk_size <= gkfs::config::rpc::chunksize) + offset_transfer_size = bulk_size; + else + offset_transfer_size = static_cast(gkfs::config::rpc::chunksize - in.offset); ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr, in.bulk_handle, 0, bulk_handle, 0, offset_transfer_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}", __func__, chnk_id_file, in.chunk_start, in.chunk_end - 1); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } bulk_buf_ptrs[chnk_id_curr] = chnk_ptr; @@ -233,7 +147,7 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { if (chnk_id_curr == in.chunk_n - 1) transfer_size = chnk_size_left_host; GKFS_DATA->spdlogger()->trace( - "{}() BULK_TRANSFER hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}", + "{}() BULK_TRANSFER_PULL hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}", __func__, host_id, in.path, chnk_id_file, in.total_chunk_size, chnk_size_left_host, origin_offset, local_offset, transfer_size); // RDMA the data to here @@ -242,8 +156,8 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})", __func__, - *path, chnk_id_file, in.chunk_start, (in.chunk_end - 1)); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr); + in.path, chnk_id_file, in.chunk_start, (in.chunk_end - 1)); + out.err = EBUSY; return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } bulk_buf_ptrs[chnk_id_curr] = chnk_ptr; @@ -251,22 +165,13 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; } - // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool - // Starting tasklets for parallel I/O - ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value - auto& task_arg = task_args[chnk_id_curr]; - task_arg.path = path.get(); - task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; - task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; - task_arg.size = chnk_sizes[chnk_id_curr]; - // only the first chunk gets the offset. the chunks are sorted on the client side - task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; - task_arg.eventual = task_eventuals[chnk_id_curr]; - auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args[chnk_id_curr], - &abt_tasks[chnk_id_curr]); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() task create failed", __func__); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1); + try { + // start tasklet for writing chunk + chunk_op.write_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], + chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + } catch (const gkfs::data::ChunkWriteOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } // next chunk @@ -274,36 +179,16 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { } // Sanity check that all chunks where detected in previous loop + // TODO don't proceed if that happens. if (chnk_size_left_host != 0) GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__, chnk_size_left_host); /* * 4. Read task results and accumulate in out.io_size */ - out.err = 0; - out.io_size = 0; - for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) { - ssize_t* task_written_size = nullptr; - // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler - auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_written_size); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to wait for write task for chunk {}", - __func__, chnk_id_curr); - out.err = EIO; - break; - } - assert(task_written_size != nullptr); - if (*task_written_size < 0) { - GKFS_DATA->spdlogger()->error("{}() Write task failed for chunk {}", - __func__, chnk_id_curr); - out.err = -(*task_written_size); - break; - } - - out.io_size += *task_written_size; // add task written size to output size - ABT_eventual_free(&task_eventuals[chnk_id_curr]); - } + auto write_result = chunk_op.wait_for_tasks(); + out.err = write_result.first; + out.io_size = write_result.second; // Sanity check to see if all data has been written if (in.total_chunk_size != out.io_size) { @@ -315,17 +200,16 @@ static hg_return_t rpc_srv_write(hg_handle_t handle) { * 5. Respond and cleanup */ GKFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.err); - ret = gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - // free tasks after responding - for (auto&& task : abt_tasks) { - ABT_task_join(task); - ABT_task_free(&task); - } - return ret; + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_write) +/** + * RPC handler for an incoming read RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_read(hg_handle_t handle) { /* * 1. Setup @@ -345,8 +229,9 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); auto bulk_size = margo_bulk_get_size(in.bulk_handle); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}", __func__, - in.path, bulk_size, in.offset); + GKFS_DATA->spdlogger()->debug( + "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'", + __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n, in.total_chunk_size, bulk_size, in.offset); /* * 2. Set up buffers for pull bulk transfers @@ -371,7 +256,6 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto const host_size = in.host_size; gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size); - auto path = make_shared(in.path); // chnk_ids used by this host vector chnk_ids_host(in.chunk_n); // counter to track how many chunks have been assigned @@ -387,25 +271,30 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { auto chnk_ptr = static_cast(bulk_buf); // temporary variables auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size : gkfs::config::rpc::chunksize; - // tasks structures - vector abt_tasks(in.chunk_n); - vector task_eventuals(in.chunk_n); - vector task_args(in.chunk_n); + // object for asynchronous disk IO + gkfs::data::ChunkReadOperation chunk_read_op{in.path, in.chunk_n}; /* * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk */ // Start to look for a chunk that hashes to this host with the first chunk in the buffer - for (auto chnk_id_file = in.chunk_start; chnk_id_file < in.chunk_end || chnk_id_curr < in.chunk_n; chnk_id_file++) { + for (auto chnk_id_file = in.chunk_start; + chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n; chnk_id_file++) { // Continue if chunk does not hash to this host - if (distributor.locate_data(in.path, chnk_id_file) != host_id) + if (distributor.locate_data(in.path, chnk_id_file) != host_id) { + GKFS_DATA->spdlogger()->trace( + "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'", + __func__, chnk_id_file, host_id, chnk_id_curr); continue; + } chnk_ids_host[chnk_id_curr] = chnk_id_file; // save this id to host chunk list // Only relevant in the first iteration of the loop and if the chunk hashes to this host if (chnk_id_file == in.chunk_start && in.offset > 0) { // if only 1 destination and 1 chunk (small read) the transfer_size == bulk_size - auto offset_transfer_size = (in.offset + bulk_size <= gkfs::config::rpc::chunksize) ? bulk_size - : static_cast( - gkfs::config::rpc::chunksize - in.offset); + size_t offset_transfer_size = 0; + if (in.offset + bulk_size <= gkfs::config::rpc::chunksize) + offset_transfer_size = bulk_size; + else + offset_transfer_size = static_cast(gkfs::config::rpc::chunksize - in.offset); // Setting later transfer offsets local_offsets[chnk_id_curr] = 0; origin_offsets[chnk_id_curr] = 0; @@ -432,140 +321,108 @@ static hg_return_t rpc_srv_read(hg_handle_t handle) { chnk_ptr += transfer_size; chnk_size_left_host -= transfer_size; } - // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool - // Starting tasklets for parallel I/O - ABT_eventual_create(sizeof(ssize_t), &task_eventuals[chnk_id_curr]); // written file return value - auto& task_arg = task_args[chnk_id_curr]; - task_arg.path = path.get(); - task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; - task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; - task_arg.size = chnk_sizes[chnk_id_curr]; - // only the first chunk gets the offset. the chunks are sorted on the client side - task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; - task_arg.eventual = task_eventuals[chnk_id_curr]; - auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args[chnk_id_curr], - &abt_tasks[chnk_id_curr]); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() task create failed", __func__); - cancel_abt_io(&abt_tasks, &task_eventuals, chnk_id_curr + 1); + try { + // start tasklet for read operation + chunk_read_op.read_nonblock(chnk_id_curr, chnk_ids_host[chnk_id_curr], bulk_buf_ptrs[chnk_id_curr], + chnk_sizes[chnk_id_curr], (chnk_id_file == in.chunk_start) ? in.offset : 0); + } catch (const gkfs::data::ChunkReadOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'", __func__, e.what()); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } chnk_id_curr++; } // Sanity check that all chunks where detected in previous loop + // TODO error out. If we continue this will crash the server when sending results back that don't exist. if (chnk_size_left_host != 0) GKFS_DATA->spdlogger()->warn("{}() Not all chunks were detected!!! Size left {}", __func__, chnk_size_left_host); /* * 4. Read task results and accumulate in out.io_size */ - out.err = 0; - out.io_size = 0; - for (chnk_id_curr = 0; chnk_id_curr < in.chunk_n; chnk_id_curr++) { - ssize_t* task_read_size = nullptr; - // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler - auto abt_ret = ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size); - if (abt_ret != ABT_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed to wait for read task for chunk {}", - __func__, chnk_id_curr); - out.err = EIO; - break; - } - assert(task_read_size != nullptr); - if (*task_read_size < 0) { - if (-(*task_read_size) == ENOENT) { - continue; - } - GKFS_DATA->spdlogger()->warn( - "{}() Read task failed for chunk {}", - __func__, chnk_id_curr); - out.err = -(*task_read_size); - break; - } - - if (*task_read_size == 0) { - continue; - } - - ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id_curr], - bulk_handle, local_offsets[chnk_id_curr], *task_read_size); - if (ret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error( - "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}", - __func__, chnk_id_curr, in.path, origin_offsets[chnk_id_curr], local_offsets[chnk_id_curr], - chnk_sizes[chnk_id_curr]); - out.err = EIO; - break; - } - out.io_size += *task_read_size; // add task read size to output size - } + gkfs::data::ChunkReadOperation::bulk_args bulk_args{}; + bulk_args.mid = mid; + bulk_args.origin_addr = hgi->addr; + bulk_args.origin_bulk_handle = in.bulk_handle; + bulk_args.origin_offsets = &origin_offsets; + bulk_args.local_bulk_handle = bulk_handle; + bulk_args.local_offsets = &local_offsets; + bulk_args.chunk_ids = &chnk_ids_host; + // wait for all tasklets and push read data back to client + auto read_result = chunk_read_op.wait_for_tasks_and_push_back(bulk_args); + out.err = read_result.first; + out.io_size = read_result.second; /* * 5. Respond and cleanup */ GKFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}", __func__, out.err); - ret = gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); - // free tasks after responding - cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n); - return ret; + return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_read) +/** + * RPC handler for an incoming truncate RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_truncate(hg_handle_t handle) { rpc_trunc_in_t in{}; rpc_err_out_t out{}; - + out.err = EIO; + // Getting some information from margo auto ret = margo_get_input(handle, &in); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Could not get RPC input data with err {}", __func__, ret); - throw runtime_error("Failed to get RPC input data"); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: {}", __func__, in.path, in.length); - - unsigned int chunk_start = gkfs::util::chnk_id_for_offset(in.length, gkfs::config::rpc::chunksize); + GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); - // If we trunc in the the middle of a chunk, do not delete that chunk - auto left_pad = gkfs::util::chnk_lpad(in.length, gkfs::config::rpc::chunksize); - if (left_pad != 0) { - GKFS_DATA->storage()->truncate_chunk(in.path, chunk_start, left_pad); - ++chunk_start; + gkfs::data::ChunkTruncateOperation chunk_op{in.path}; + try { + // start tasklet for truncate operation + chunk_op.truncate(in.length); + } catch (const gkfs::data::ChunkMetaOpException& e) { + // This exception is caused by setup of Argobots variables. If this fails, something is really wrong + GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__, e.what()); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } - GKFS_DATA->storage()->trim_chunk_space(in.path, chunk_start); + // wait and get output + out.err = chunk_op.wait_for_task(); - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); - auto hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to respond"); - } - // Destroy handle when finished - margo_free_input(handle, &in); - margo_destroy(handle); - return HG_SUCCESS; + GKFS_DATA->spdlogger()->debug("{}() Sending output response '{}'", __func__, out.err); + return gkfs::rpc::cleanup_respond(&handle, &in, &out); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate) +/** + * RPC handler for an incoming chunk stat RPC + * @param handle + * @return + */ static hg_return_t rpc_srv_get_chunk_stat(hg_handle_t handle) { - GKFS_DATA->spdlogger()->trace("{}() called", __func__); - + GKFS_DATA->spdlogger()->debug("{}() enter", __func__); rpc_chunk_stat_out_t out{}; - // Get input - auto chk_stat = GKFS_DATA->storage()->chunk_stat(); - // Create output and send it - out.chunk_size = chk_stat.chunk_size; - out.chunk_total = chk_stat.chunk_total; - out.chunk_free = chk_stat.chunk_free; - auto hret = margo_respond(handle, &out); - if (hret != HG_SUCCESS) { - GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); + out.err = EIO; + try { + auto chk_stat = GKFS_DATA->storage()->chunk_stat(); + out.chunk_size = chk_stat.chunk_size; + out.chunk_total = chk_stat.chunk_total; + out.chunk_free = chk_stat.chunk_free; + out.err = 0; + } catch (const gkfs::data::ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + out.err = err.code().value(); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error when chunk stat '{}'", __func__, err.what()); + out.err = EAGAIN; } - // Destroy handle when finished - margo_destroy(handle); - return hret; + // Create output and send it back + return gkfs::rpc::cleanup_respond(&handle, &out); } DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat) diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index ce6de804f885f99f55fac23a22110cdcd14f1fe5..2e19ac3d1541b6c200d61d7d46f99f6ce0a5b572 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -18,6 +18,7 @@ #include #include +#include using namespace std; @@ -69,7 +70,7 @@ static hg_return_t rpc_srv_stat(hg_handle_t handle) { out.db_val = val.c_str(); out.err = 0; GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__, out.db_val); - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -100,17 +101,17 @@ static hg_return_t rpc_srv_decr_size(hg_handle_t handle) { throw runtime_error("Failed to retrieve input from handle"); } - GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: {}", __func__, in.path, in.length); + GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__, in.path, in.length); try { GKFS_DATA->mdb()->decrease_size(in.path, in.length); out.err = 0; } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'", __func__, e.what()); out.err = EIO; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -134,26 +135,23 @@ static hg_return_t rpc_srv_remove(hg_handle_t handle) { assert(ret == HG_SUCCESS); GKFS_DATA->spdlogger()->debug("{}() Got remove node RPC with path '{}'", __func__, in.path); + // Remove metadentry if exists on the node and remove all chunks for that file try { - // Remove metadentry if exists on the node - // and remove all chunks for that file - gkfs::metadata::remove_node(in.path); - out.err = 0; - } catch (const NotFoundException& e) { - /* The metadentry was not found on this node, - * this is not an error. At least one node involved in this - * broadcast operation will find and delete the entry on its local - * MetadataDB. - * TODO: send the metadentry remove only to the node that actually - * has it. - */ + gkfs::metadata::remove(in.path); out.err = 0; + } catch (const gkfs::metadata::DBException& e) { + GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__, in.path, e.what()); + out.err = EIO; + } catch (const gkfs::data::ChunkStorageException& e) { + GKFS_DATA->spdlogger()->error("{}(): path '{}' errcode '{}' message '{}'", __func__, in.path, e.code().value(), + e.what()); + out.err = e.code().value(); } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to remove node: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__, in.path, e.what()); out.err = EBUSY; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -201,7 +199,7 @@ static hg_return_t rpc_srv_update_metadentry(hg_handle_t handle) { out.err = 1; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -224,7 +222,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { if (ret != HG_SUCCESS) GKFS_DATA->spdlogger()->error("{}() Failed to retrieve input from handle", __func__); assert(ret == HG_SUCCESS); - GKFS_DATA->spdlogger()->debug("{}() path: {}, size: {}, offset: {}, append: {}", __func__, in.path, in.size, + GKFS_DATA->spdlogger()->debug("{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__, in.path, in.size, in.offset, in.append); try { @@ -233,7 +231,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { //TODO the actual size of the file could be different after the size update // do to concurrency on size out.ret_size = in.size + in.offset; - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -241,7 +239,7 @@ static hg_return_t rpc_srv_update_metadentry_size(hg_handle_t handle) { out.err = EBUSY; } - GKFS_DATA->spdlogger()->debug("{}() Sending output {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); @@ -270,7 +268,7 @@ static hg_return_t rpc_srv_get_metadentry_size(hg_handle_t handle) { try { out.ret_size = gkfs::metadata::get_size(in.path); out.err = 0; - } catch (const NotFoundException& e) { + } catch (const gkfs::metadata::NotFoundException& e) { GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__, in.path); out.err = ENOENT; } catch (const std::exception& e) { @@ -301,7 +299,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { auto ret = margo_get_input(handle, &in); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Could not get RPC input data with err {}", __func__, ret); + "{}() Could not get RPC input data with err '{}'", __func__, ret); return ret; } @@ -309,7 +307,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { auto hgi = margo_get_info(handle); auto mid = margo_hg_info_get_instance(hgi); GKFS_DATA->spdlogger()->debug( - "{}() Got dirents RPC with path {}", __func__, in.path); + "{}() Got dirents RPC with path '{}'", __func__, in.path); auto bulk_size = margo_bulk_get_size(in.bulk_handle); //Get directory entries from local DB @@ -366,7 +364,7 @@ static hg_return_t rpc_srv_get_dirents(hg_handle_t handle) { out_size); if (ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( - "{}() Failed push dirents on path {} to client", + "{}() Failed push dirents on path '{}' to client", __func__, in.path ); return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle); @@ -399,10 +397,10 @@ static hg_return_t rpc_srv_mk_symlink(hg_handle_t handle) { gkfs::metadata::create(in.path, md); out.err = 0; } catch (const std::exception& e) { - GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: {}", __func__, e.what()); + GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'", __func__, e.what()); out.err = -1; } - GKFS_DATA->spdlogger()->debug("{}() Sending output err {}", __func__, out.err); + GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__, out.err); auto hret = margo_respond(handle, &out); if (hret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__); diff --git a/src/daemon/ops/data.cpp b/src/daemon/ops/data.cpp new file mode 100644 index 0000000000000000000000000000000000000000..020918a84bc32981f04b75d3deca84cc155a3fed --- /dev/null +++ b/src/daemon/ops/data.cpp @@ -0,0 +1,397 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +#include +#include +#include +#include + +extern "C" { +#include +} + +using namespace std; + +namespace gkfs { +namespace data { + +/* ------------------------------------------------------------------------ + * -------------------------- TRUNCATE ------------------------------------ + * ------------------------------------------------------------------------*/ + + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + size_t size; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent operations per daemon. + * @return error is put into eventual to signal that it finished + */ +void ChunkTruncateOperation::truncate_abt(void* _arg) { + assert(_arg); + // Unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + const size_t size = arg->size; + int err_response = 0; + try { + // get chunk from where to cut off + auto chunk_id_start = gkfs::util::chnk_id_for_offset(size, gkfs::config::rpc::chunksize); + // do not last delete chunk if it is in the middle of a chunk + auto left_pad = gkfs::util::chnk_lpad(size, gkfs::config::rpc::chunksize); + if (left_pad != 0) { + GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start, left_pad); + chunk_id_start++; + } + GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + err_response = err.code().value(); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error truncating file '{}' to length '{}'", __func__, path, + size); + err_response = EIO; + } + ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response)); +} + +void ChunkTruncateOperation::clear_task_args() { + task_arg_ = {}; +} + +ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkOperation{path, 1} {} + +/** + * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed + * Only one truncate call is allowed at a time + */ +void ChunkTruncateOperation::truncate(size_t size) { + assert(!task_eventuals_[0]); + GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__, path_, + size); + // sizeof(int) comes from truncate's return type + auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[0]); // truncate file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkMetaOpException(err_str); + } + + auto& task_arg = task_arg_; + task_arg.path = &path_; + task_arg.size = size; + task_arg.eventual = task_eventuals_[0]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_, &abt_tasks_[0]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkMetaOpException(err_str); + } + +} + + +int ChunkTruncateOperation::wait_for_task() { + GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_); + int trunc_err = 0; + + int* task_err = nullptr; + auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkTruncateOperation::{}() Error when waiting on ABT eventual", __func__); + ABT_eventual_free(&task_eventuals_[0]); + return EIO; + } + assert(task_err != nullptr); + if (*task_err != 0) { + trunc_err = *task_err; + } + ABT_eventual_free(&task_eventuals_[0]); + return trunc_err; +} + +/* ------------------------------------------------------------------------ + * ----------------------------- WRITE ------------------------------------ + * ------------------------------------------------------------------------*/ + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + const char* buf; + const gkfs::rpc::chnk_id_t* chnk_id; + size_t size; + off64_t off; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. + * This function is called by tasklets, as this function cannot be allowed to block. + * @return written_size is put into eventual to signal that it finished + */ +void ChunkWriteOperation::write_file_abt(void* _arg) { + assert(_arg); + // Unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + ssize_t wrote{0}; + try { + wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + wrote = -(err.code().value()); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error writing chunk {} of file {}", __func__, arg->chnk_id, + path); + wrote = -EIO; + } + ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote)); +} + +void ChunkWriteOperation::clear_task_args() { + task_args_.clear(); +} + +ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); +} + +/** + * Write buffer from a single chunk referenced by its ID. Put task into IO queue. + * On failure the write operations is aborted, throwing an error, and cleaned up. + * The caller may repeat a failed call. + * @param chunk_id + * @param bulk_buf_ptr + * @param size + * @param offset + * @throws ChunkWriteOpException + */ +void +ChunkWriteOperation::write_nonblock(size_t idx, const uint64_t chunk_id, const char* bulk_buf_ptr, + const size_t size, + const off64_t offset) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'", __func__, + idx, path_, size, offset); + // sizeof(ssize_t) comes from pwrite's return type + auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // written file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkWriteOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.buf = bulk_buf_ptr; + task_arg.chnk_id = chunk_id; + task_arg.size = size; + task_arg.off = offset; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkWriteOpException(err_str); + } +} + +/** + * Waits for all Argobots tasklets to finish and report back the write error code and the size written. + * @return + */ +pair ChunkWriteOperation::wait_for_tasks() { + GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'", __func__, path_); + size_t total_written = 0; + int io_err = 0; + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * On error, cleanup eventuals and set written data to 0 as written data is corrupted + */ + for (auto& e : task_eventuals_) { + ssize_t* task_size = nullptr; + auto abt_err = ABT_eventual_wait(e, (void**) &task_size); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkWriteOperation::{}() Error when waiting on ABT eventual", __func__); + io_err = EIO; + ABT_eventual_free(&e); + continue; + } + if (io_err != 0) { + ABT_eventual_free(&e); + continue; + } + assert(task_size != nullptr); + if (*task_size < 0) { + io_err = -(*task_size); + } else { + total_written += *task_size; + } + ABT_eventual_free(&e); + } + // in case of error set written size to zero as data would be corrupted + if (io_err != 0) + total_written = 0; + return make_pair(io_err, total_written); +} + +/* ------------------------------------------------------------------------ + * -------------------------- READ ---------------------------------------- + * ------------------------------------------------------------------------*/ + +/** + * Used by an argobots tasklet. Argument args has the following fields: + * const string* path; + char* buf; + const gkfs::rpc::chnk_id_t* chnk_id; + size_t size; + off64_t off; + ABT_eventual* eventual; + * This function is driven by the IO pool. so there is a maximum allowed number of concurrent IO operations per daemon. + * This function is called by tasklets, as this function cannot be allowed to block. + * @return read_size is put into eventual to signal that it finished + */ +void ChunkReadOperation::read_file_abt(void* _arg) { + assert(_arg); + //unpack args + auto* arg = static_cast(_arg); + const string& path = *(arg->path); + ssize_t read = 0; + try { + // Under expected circumstances (error or no error) read_chunk will signal the eventual + read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf, arg->size, arg->off); + } catch (const ChunkStorageException& err) { + GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what()); + read = -(err.code().value()); + } catch (const ::exception& err) { + GKFS_DATA->spdlogger()->error("{}() Unexpected error reading chunk {} of file {}", __func__, arg->chnk_id, + path); + read = -EIO; + } + ABT_eventual_set(arg->eventual, &read, sizeof(read)); +} + +void ChunkReadOperation::clear_task_args() { + task_args_.clear(); +} + +ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { + task_args_.resize(n); +} + +/** + * Read buffer to a single chunk referenced by its ID. Put task into IO queue. + * On failure the read operations is aborted, throwing an error, and cleaned up. + * The caller may repeat a failed call. + * @param chunk_id + * @param bulk_buf_ptr + * @param size + * @param offset + */ +void +ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id, char* bulk_buf_ptr, const size_t size, + const off64_t offset) { + assert(idx < task_args_.size()); + GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'", __func__, + idx, path_, size, offset); + // sizeof(ssize_t) comes from pread's return type + auto abt_err = ABT_eventual_create(sizeof(ssize_t), &task_eventuals_[idx]); // read file return value + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'", + __func__, abt_err); + throw ChunkReadOpException(err_str); + } + + auto& task_arg = task_args_[idx]; + task_arg.path = &path_; + task_arg.buf = bulk_buf_ptr; + task_arg.chnk_id = chunk_id; + task_arg.size = size; + task_arg.off = offset; + task_arg.eventual = task_eventuals_[idx]; + + abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args_[idx], &abt_tasks_[idx]); + if (abt_err != ABT_SUCCESS) { + auto err_str = fmt::format("ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, + abt_err); + throw ChunkReadOpException(err_str); + } + +} + +pair ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { + GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'", __func__, path_); + assert(args.chunk_ids->size() == task_args_.size()); + size_t total_read = 0; + int io_err = 0; + + /* + * gather all Eventual's information. do not throw here to properly cleanup all eventuals + * As soon as an error is encountered, bulk_transfers will no longer be executed as the data would be corrupted + * The loop continues until all eventuals have been cleaned and freed. + */ + for (uint64_t idx = 0; idx < task_args_.size(); idx++) { + ssize_t* task_size = nullptr; + auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); + if (abt_err != ABT_SUCCESS) { + GKFS_DATA->spdlogger()->error("ChunkReadOperation::{}() Error when waiting on ABT eventual", __func__); + io_err = EIO; + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + // error occured. stop processing but clean up + if (io_err != 0) { + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + assert(task_size != nullptr); + if (*task_size < 0) { + // sparse regions do not have chunk files and are therefore skipped + if (-(*task_size) == ENOENT) { + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } + io_err = -(*task_size); // make error code > 0 + } else if (*task_size == 0) { + // read size of 0 is not an error and can happen because reading the end-of-file + ABT_eventual_free(&task_eventuals_[idx]); + continue; + } else { + // successful case, push read data back to client + GKFS_DATA->spdlogger()->trace( + "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", + __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), + *task_size); + assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); + auto margo_err = margo_bulk_transfer(args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, + args.origin_offsets->at(idx), args.local_bulk_handle, + args.local_offsets->at(idx), *task_size); + if (margo_err != HG_SUCCESS) { + GKFS_DATA->spdlogger()->error( + "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'", __func__, + margo_err); + io_err = EBUSY; + continue; + } + total_read += *task_size; + } + ABT_eventual_free(&task_eventuals_[idx]); + } + // in case of error set read size to zero as data would be corrupted + if (io_err != 0) + total_read = 0; + return make_pair(io_err, total_read); +} + +} // namespace data +} // namespace gkfs diff --git a/src/daemon/ops/metadentry.cpp b/src/daemon/ops/metadentry.cpp index 169c7ac362b13beada5a7e35da43b2ba2e919981..d241b03bd9f3d90730686015fc8a567829600e10 100644 --- a/src/daemon/ops/metadentry.cpp +++ b/src/daemon/ops/metadentry.cpp @@ -104,9 +104,16 @@ void update_size(const string& path, size_t io_size, off64_t offset, bool append * Remove metadentry if exists and try to remove all chunks for path * @param path * @return + * @throws gkfs::metadata::DBException, gkfs::data::ChunkStorageException */ -void remove_node(const string& path) { - GKFS_DATA->mdb()->remove(path); // remove metadentry +void remove(const string& path) { + /* + * try to remove metadata from kv store but catch NotFoundException which is not an error in this case + * because removes can be broadcast to catch all data chunks but only one node will hold the kv store entry. + */ + try { + GKFS_DATA->mdb()->remove(path); // remove metadata from KV store + } catch (const NotFoundException& e) {} GKFS_DATA->storage()->destroy_chunk_space(path); // destroys all chunks for the path on this node } diff --git a/src/daemon/util.cpp b/src/daemon/util.cpp index 92fa06221083172b0b631f3359e1da3cc60e20a2..8202cde1dee58a0e8f5969952212d95199ac0036 100644 --- a/src/daemon/util.cpp +++ b/src/daemon/util.cpp @@ -31,7 +31,7 @@ void populate_hosts_file() { throw runtime_error( fmt::format("Failed to open hosts file '{}': {}", hosts_file, strerror(errno))); } - lfstream << fmt::format("{} {}", get_my_hostname(true), RPC_DATA->self_addr_str()) << std::endl; + lfstream << fmt::format("{} {}", gkfs::rpc::get_my_hostname(true), RPC_DATA->self_addr_str()) << std::endl; if (!lfstream) { throw runtime_error( fmt::format("Failed to write on hosts file '{}': {}", hosts_file, strerror(errno))); diff --git a/src/global/rpc/rpc_util.cpp b/src/global/rpc/rpc_util.cpp index 8a38fa5976ef789b935d34c02a626d8cc04a2ed8..e5607fd330fd5c3b3e2b28ac0e040ebf7195e06f 100644 --- a/src/global/rpc/rpc_util.cpp +++ b/src/global/rpc/rpc_util.cpp @@ -24,6 +24,9 @@ extern "C" { using namespace std; +namespace gkfs { +namespace rpc { + /** * converts std bool to mercury bool * @param state @@ -89,4 +92,7 @@ string get_host_by_name(const string& hostname) { } freeaddrinfo(addr); return addr_str; -} \ No newline at end of file +} + +} // namespace rpc +} // namespace gkfs \ No newline at end of file diff --git a/tests/integration/data/test_truncate.py b/tests/integration/data/test_truncate.py new file mode 100644 index 0000000000000000000000000000000000000000..743832a9e6b0398aecb945a4437ebd55b874c130 --- /dev/null +++ b/tests/integration/data/test_truncate.py @@ -0,0 +1,120 @@ +################################################################################ +# Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# SPDX-License-Identifier: MIT # +################################################################################ + +# import harness +# from pathlib import Path +import os +import stat + + +# @pytest.mark.xfail(reason="invalid errno returned on success") +def test_truncate(gkfs_daemon, gkfs_client): + """Testing truncate: + 1. create a large file over multiple chunks + 2. truncate it in the middle and compare it with a fresh file with equal contents (exactly at chunk border) + 3. truncate it again so that in truncates in the middle of the chunk and compare with fresh file + TODO chunksize needs to be respected to make sure chunk border and in the middle of chunk truncates are honored + """ + truncfile = gkfs_daemon.mountdir / "trunc_file" + + # open and create test file + ret = gkfs_client.open(truncfile, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval != -1 + + # write a multi MB file (16mb) + buf_length = 16777216 + ret = gkfs_client.write_random(truncfile, buf_length) + + assert ret.retval == buf_length + assert ret.errno == 115 # FIXME: Should be 0! + + ret = gkfs_client.stat(truncfile) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.statbuf.st_size == buf_length + + # truncate it + # split exactly in the middle + trunc_size = buf_length // 2 + ret = gkfs_client.truncate(truncfile, trunc_size) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.retval == 0 + # check file length + ret = gkfs_client.stat(truncfile) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.statbuf.st_size == trunc_size + + # verify contents by writing a new file (random content is seeded) and checksum both + truncfile_verify = gkfs_daemon.mountdir / "trunc_file_verify" + + # open and create test file + ret = gkfs_client.open(truncfile_verify, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval != -1 + + # write trunc_size of data to new file + ret = gkfs_client.write_random(truncfile_verify, trunc_size) + + assert ret.retval == trunc_size + assert ret.errno == 115 # FIXME: Should be 0! + + ret = gkfs_client.stat(truncfile_verify) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.statbuf.st_size == trunc_size + + ret = gkfs_client.file_compare(truncfile, truncfile_verify, trunc_size) + + assert ret.retval == 0 + assert ret.errno == 115 # FIXME: Should be 0! + + # trunc at byte 712345 (middle of chunk) + # TODO feed chunksize into test to make sure it is always in the middle of the chunk + trunc_size = 712345 + ret = gkfs_client.truncate(truncfile, trunc_size) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.retval == 0 + + # check file length + ret = gkfs_client.stat(truncfile) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.statbuf.st_size == trunc_size + + # verify contents by writing a new file (random content is seeded) and checksum both + truncfile_verify_2 = gkfs_daemon.mountdir / "trunc_file_verify_2" + + # open and create test file + ret = gkfs_client.open(truncfile_verify_2, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + + assert ret.retval != -1 + + # write trunc_size of data to new file + ret = gkfs_client.write_random(truncfile_verify_2, trunc_size) + + assert ret.retval == trunc_size + assert ret.errno == 115 # FIXME: Should be 0! + + ret = gkfs_client.stat(truncfile_verify_2) + + assert ret.errno == 115 # FIXME: Should be 0! + assert ret.statbuf.st_size == trunc_size + + ret = gkfs_client.file_compare(truncfile, truncfile_verify_2, trunc_size) + + assert ret.retval == 0 + assert ret.errno == 115 # FIXME: Should be 0! diff --git a/tests/integration/harness/CMakeLists.txt b/tests/integration/harness/CMakeLists.txt index 13e6d5d956deaaf81a02def3238661de19dddef4..f2f55409d8e279e46f22642527e2822344607963 100644 --- a/tests/integration/harness/CMakeLists.txt +++ b/tests/integration/harness/CMakeLists.txt @@ -27,6 +27,9 @@ add_executable(gkfs.io gkfs.io/statx.cpp gkfs.io/lseek.cpp gkfs.io/write_validate.cpp + gkfs.io/write_random.cpp + gkfs.io/truncate.cpp + gkfs.io/util/file_compare.cpp ) include(FetchContent) diff --git a/tests/integration/harness/gkfs.io/binary_buffer.hpp b/tests/integration/harness/gkfs.io/binary_buffer.hpp index b09fff2db88e633dff6b5c690b253a770ca7ee9b..3806aaa9e0d56925b3870a6f8450459cff964eb8 100644 --- a/tests/integration/harness/gkfs.io/binary_buffer.hpp +++ b/tests/integration/harness/gkfs.io/binary_buffer.hpp @@ -15,8 +15,10 @@ #define IO_BINARY_BUFFER_HPP #include +#include #include + namespace io { struct buffer { @@ -30,14 +32,16 @@ struct buffer { std::copy(s.begin(), s.end(), std::back_inserter(m_data)); } + buffer(std::vector data) : m_data(std::move(data)) {} + bool - operator==(nullptr_t) const { - return m_data.size() == 0; + operator==(nullptr_t) const { + return m_data.empty(); } bool - operator!=(nullptr_t) const { - return m_data.size() != 0; + operator!=(nullptr_t) const { + return !m_data.empty(); } auto diff --git a/tests/integration/harness/gkfs.io/commands.hpp b/tests/integration/harness/gkfs.io/commands.hpp index 0fc3041d852aee4254e3ea2b2134a888c138a6a0..91bac7a3ae4080e1c673d743fc840c5348fd7b7e 100644 --- a/tests/integration/harness/gkfs.io/commands.hpp +++ b/tests/integration/harness/gkfs.io/commands.hpp @@ -70,4 +70,14 @@ lseek_init(CLI::App& app); void write_validate_init(CLI::App& app); +void +write_random_init(CLI::App& app); + +void +truncate_init(CLI::App& app); + +// UTIL +void +file_compare_init(CLI::App& app); + #endif // IO_COMMANDS_HPP diff --git a/tests/integration/harness/gkfs.io/main.cpp b/tests/integration/harness/gkfs.io/main.cpp index 426df6a7e12e7dd801c42c8015ff9ad0af6b5406..b04eb2a8699ab65b16287fe9d3601515487d9b73 100644 --- a/tests/integration/harness/gkfs.io/main.cpp +++ b/tests/integration/harness/gkfs.io/main.cpp @@ -39,6 +39,10 @@ init_commands(CLI::App& app) { #endif lseek_init(app); write_validate_init(app); + write_random_init(app); + truncate_init(app); + // util + file_compare_init(app); } diff --git a/tests/integration/harness/gkfs.io/truncate.cpp b/tests/integration/harness/gkfs.io/truncate.cpp new file mode 100644 index 0000000000000000000000000000000000000000..db6c6156e0caae6e52c77ac227273909772f54b5 --- /dev/null +++ b/tests/integration/harness/gkfs.io/truncate.cpp @@ -0,0 +1,109 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +/* C++ includes */ +#include +#include +#include +#include +#include +#include + +/* C includes */ +#include +#include + +using json = nlohmann::json; + +struct truncate_options { + bool verbose{}; + std::string path{}; + ::off_t length{}; + + REFL_DECL_STRUCT(truncate_options, + REFL_DECL_MEMBER(bool, verbose), + REFL_DECL_MEMBER(std::string, path), + REFL_DECL_MEMBER(::off_t, length) + ); +}; + +struct truncate_output { + int retval; + int errnum; + + REFL_DECL_STRUCT(truncate_output, + REFL_DECL_MEMBER(int, retval), + REFL_DECL_MEMBER(int, errnum) + ); +}; + +void +to_json(json& record, + const truncate_output& out) { + record = serialize(out); +} + +void +truncate_exec(const truncate_options& opts) { + + auto rv = ::truncate(opts.path.c_str(), opts.length); + if (rv == -1) { + if (opts.verbose) { + fmt::print("truncate(path=\"{}\", length={}) = {}, errno: {} [{}]\n", + opts.path, opts.length, rv, errno, ::strerror(errno)); + return; + } + } + + json out = truncate_output{rv, errno}; + fmt::print("{}\n", out.dump(2)); +} + +void +truncate_init(CLI::App& app) { + + // Create the option and subcommand objects + auto opts = std::make_shared(); + auto* cmd = app.add_subcommand( + "truncate", + "Execute the truncate() system call"); + + // Add options to cmd, binding them to opts + cmd->add_flag( + "-v,--verbose", + opts->verbose, + "Produce human writeable output" + ); + + cmd->add_option( + "path", + opts->path, + "Path to file" + ) + ->required() + ->type_name(""); + + cmd->add_option( + "length", + opts->length, + "Truncate to a size precisely length bytes" + ) + ->required() + ->type_name(""); + + cmd->callback([opts]() { + truncate_exec(*opts); + }); +} + + diff --git a/tests/integration/harness/gkfs.io/util/file_compare.cpp b/tests/integration/harness/gkfs.io/util/file_compare.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4c1234fb76fc2b0d0fe477813b559f939cd1aca0 --- /dev/null +++ b/tests/integration/harness/gkfs.io/util/file_compare.cpp @@ -0,0 +1,172 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +/* C++ includes */ +#include +#include +#include +#include +#include +#include +#include + +/* C includes */ +#include +#include +#include + +using json = nlohmann::json; + +struct file_compare_options { + bool verbose{}; + std::string path_1{}; + std::string path_2{}; + size_t count{}; + + REFL_DECL_STRUCT(file_compare_options, + REFL_DECL_MEMBER(bool, verbose), + REFL_DECL_MEMBER(std::string, path_1), + REFL_DECL_MEMBER(std::string, path_2), + REFL_DECL_MEMBER(size_t, count) + ); +}; + +struct file_compare_output { + int retval; + int errnum; + + REFL_DECL_STRUCT(file_compare_output, + REFL_DECL_MEMBER(int, retval), + REFL_DECL_MEMBER(int, errnum) + ); +}; + +void +to_json(json& record, + const file_compare_output& out) { + record = serialize(out); +} + +int open_file(const std::string& path, bool verbose) { + auto fd = ::open(path.c_str(), O_RDONLY); + if (fd == -1) { + if (verbose) { + fmt::print("open(pathname=\"{}\") = {}, errno: {} [{}]\n", + path, fd, errno, ::strerror(errno)); + return -1; + } + json out = file_compare_output{fd, errno}; + fmt::print("{}\n", out.dump(2)); + return -1; + } + return fd; +} + +size_t read_file(io::buffer& buf, int fd, size_t count) { + ssize_t rv{}; + size_t total{}; + do { + rv = ::read(fd, buf.data(), count - total); + total += rv; + } while (rv > 0 && total < count); + + if (rv < 0 && total != count) { + json out = file_compare_output{(int) rv, errno}; + fmt::print("{}\n", out.dump(2)); + return 0; + } + return total; +} + +void +file_compare_exec(const file_compare_options& opts) { + + // Open both files + auto fd_1 = open_file(opts.path_1, opts.verbose); + if (fd_1 == -1) { + return; + } + auto fd_2 = open_file(opts.path_2, opts.verbose); + if (fd_2 == -1) { + return; + } + + // read both files + io::buffer buf_1(opts.count); + auto rv = read_file(buf_1, fd_1, opts.count); + if (rv == 0) + return; + + io::buffer buf_2(opts.count); + rv = read_file(buf_2, fd_2, opts.count); + if (rv == 0) + return; + + // memcmp both files to check if they're equal + auto comp_rv = memcmp(buf_1.data(), buf_2.data(), opts.count); + if (comp_rv != 0) { + if (opts.verbose) { + fmt::print("memcmp(path_1='{}', path_2='{}', count='{}') = '{}'\n", + opts.path_1, opts.path_2, opts.count, comp_rv); + return; + } + } + + json out = file_compare_output{comp_rv, errno}; + fmt::print("{}\n", out.dump(2)); +} + +void +file_compare_init(CLI::App& app) { + + // Create the option and subcommand objects + auto opts = std::make_shared(); + auto* cmd = app.add_subcommand( + "file_compare", + "Execute the truncate() system call"); + + // Add options to cmd, binding them to opts + cmd->add_flag( + "-v,--verbose", + opts->verbose, + "Produce human writeable output" + ); + + cmd->add_option( + "path_1", + opts->path_1, + "Path to first file" + ) + ->required() + ->type_name(""); + + cmd->add_option( + "path_2", + opts->path_2, + "Path to second file" + ) + ->required() + ->type_name(""); + + cmd->add_option( + "count", + opts->count, + "How many bytes to compare of each file" + ) + ->required() + ->type_name(""); + + cmd->callback([opts]() { + file_compare_exec(*opts); + }); +} \ No newline at end of file diff --git a/tests/integration/harness/gkfs.io/write.cpp b/tests/integration/harness/gkfs.io/write.cpp index 8c819d82c019150a7ce78a3dc91dee5a2b155acd..fff2534140ef0ccd6ef0558bccd5b464d11a0560 100644 --- a/tests/integration/harness/gkfs.io/write.cpp +++ b/tests/integration/harness/gkfs.io/write.cpp @@ -66,7 +66,7 @@ write_exec(const write_options& opts) { if(fd == -1) { if(opts.verbose) { - fmt::print("write(pathname=\"{}\", buf=\"{}\" count={}) = {}, errno: {} [{}]\n", + fmt::print("open(pathname=\"{}\", buf=\"{}\" count={}) = {}, errno: {} [{}]\n", opts.pathname, opts.data, opts.count, fd, errno, ::strerror(errno)); return; } diff --git a/tests/integration/harness/gkfs.io/write_random.cpp b/tests/integration/harness/gkfs.io/write_random.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bafc14d3ec1258ca16000c503a2f62ebbfb21151 --- /dev/null +++ b/tests/integration/harness/gkfs.io/write_random.cpp @@ -0,0 +1,138 @@ +/* + Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain + Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany + + This software was partially supported by the + EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). + + This software was partially supported by the + ADA-FS project under the SPPEXA project funded by the DFG. + + SPDX-License-Identifier: MIT +*/ + +/* C++ includes */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* C includes */ +#include +#include +#include + +#ifndef CHAR_BIT +#define CHAR_BIT 8 +#endif +constexpr int seed = 42; + +using json = nlohmann::json; + +struct write_random_options { + bool verbose{}; + std::string pathname{}; + ::size_t count{}; + + REFL_DECL_STRUCT(write_random_options, + REFL_DECL_MEMBER(bool, verbose), + REFL_DECL_MEMBER(std::string, pathname), + REFL_DECL_MEMBER(::size_t, count) + ); +}; + +struct write_random_output { + ::ssize_t retval; + int errnum; + + REFL_DECL_STRUCT(write_random_output, + REFL_DECL_MEMBER(::size_t, retval), + REFL_DECL_MEMBER(int, errnum) + ); +}; + +void to_json(json& record, + const write_random_output& out) { + record = serialize(out); +} + +/** + * Writes `count` random bytes to file + * @param opts + */ +void write_random_exec(const write_random_options& opts) { + + int fd = ::open(opts.pathname.c_str(), O_WRONLY); + + if (fd == -1) { + if (opts.verbose) { + fmt::print("open(pathname=\"{}\", count={}) = {}, errno: {} [{}]\n", + opts.pathname, opts.count, fd, errno, ::strerror(errno)); + return; + } + + json out = write_random_output{fd, errno}; + fmt::print("{}\n", out.dump(2)); + return; + } + // random number generator with seed + std::independent_bits_engine engine{seed}; + // create buffer for opts.count + std::vector data(opts.count); + std::generate(begin(data), end(data), std::ref(engine)); + // pass data to buffer + io::buffer buf(data); + + int rv = ::write(fd, buf.data(), opts.count); + + if (opts.verbose) { + fmt::print("write(pathname=\"{}\", count={}) = {}, errno: {} [{}]\n", + opts.pathname, opts.count, rv, errno, ::strerror(errno)); + return; + } + + json out = write_random_output{rv, errno}; + fmt::print("{}\n", out.dump(2)); +} + +void write_random_init(CLI::App& app) { + + // Create the option and subcommand objects + auto opts = std::make_shared(); + auto* cmd = app.add_subcommand( + "write_random", + "Execute the write() system call "); + + // Add options to cmd, binding them to opts + cmd->add_flag( + "-v,--verbose", + opts->verbose, + "Produce human writeable output" + ); + + cmd->add_option( + "pathname", + opts->pathname, + "File name" + ) + ->required() + ->type_name(""); + + cmd->add_option( + "count", + opts->count, + "Number of random bytes to write" + ) + ->required() + ->type_name(""); + + cmd->callback([opts]() { + write_random_exec(*opts); + }); +} \ No newline at end of file diff --git a/tests/integration/harness/io.py b/tests/integration/harness/io.py index d5acf60bf5a79314b05551113cc3bf2a12e1f1ca..a9fca62c52be3877a9ed0e770c5f086d73561ee4 100644 --- a/tests/integration/harness/io.py +++ b/tests/integration/harness/io.py @@ -273,7 +273,7 @@ class StatOutputSchema(Schema): class StatxOutputSchema(Schema): - """Schema to deserialize the results of a stat() execution""" + """Schema to deserialize the results of a statx() execution""" retval = fields.Integer(required=True) statbuf = fields.Nested(StructStatxSchema, required=True) @@ -285,7 +285,7 @@ class StatxOutputSchema(Schema): class LseekOutputSchema(Schema): - """Schema to deserialize the results of an open() execution""" + """Schema to deserialize the results of an lseek() execution""" retval = fields.Integer(required=True) errno = Errno(data_key='errnum', required=True) @@ -304,6 +304,38 @@ class WriteValidateOutputSchema(Schema): def make_object(self, data, **kwargs): return namedtuple('WriteValidateReturn', ['retval', 'errno'])(**data) + +class WriteRandomOutputSchema(Schema): + """Schema to deserialize the results of a write() execution""" + + retval = fields.Integer(required=True) + errno = Errno(data_key='errnum', required=True) + + @post_load + def make_object(self, data, **kwargs): + return namedtuple('WriteRandomReturn', ['retval', 'errno'])(**data) + + +class TruncateOutputSchema(Schema): + """Schema to deserialize the results of an truncate() execution""" + retval = fields.Integer(required=True) + errno = Errno(data_key='errnum', required=True) + + @post_load + def make_object(self, data, **kwargs): + return namedtuple('TruncateReturn', ['retval', 'errno'])(**data) + + +# UTIL +class FileCompareOutputSchema(Schema): + """Schema to deserialize the results of comparing two files execution""" + retval = fields.Integer(required=True) + errno = Errno(data_key='errnum', required=True) + + @post_load + def make_object(self, data, **kwargs): + return namedtuple('FileCompareReturn', ['retval', 'errno'])(**data) + class IOParser: OutputSchemas = { @@ -323,11 +355,15 @@ class IOParser: 'stat' : StatOutputSchema(), 'statx' : StatxOutputSchema(), 'lseek' : LseekOutputSchema(), + 'write_random': WriteRandomOutputSchema(), 'write_validate' : WriteValidateOutputSchema(), + 'truncate': TruncateOutputSchema(), + # UTIL + 'file_compare': FileCompareOutputSchema(), } def parse(self, command, output): if command in self.OutputSchemas: return self.OutputSchemas[command].loads(output) else: - raise ValueError(f"Unknown I/O command {cmd}") + raise ValueError(f"Unknown I/O command {command}")