Loading include/daemon/ops/data.hpp +2 −4 Original line number Diff line number Diff line Loading @@ -118,7 +118,7 @@ private: ABT_eventual eventual; }; std::vector<struct chunk_truncate_args> task_args_; struct chunk_truncate_args task_arg_{}; static void truncate_abt(void* _arg); Loading @@ -128,11 +128,9 @@ public: explicit ChunkTruncateOperation(const std::string& path); ChunkTruncateOperation(const std::string& path, size_t n); ~ChunkTruncateOperation() = default; void truncate(size_t idx, size_t size); void truncate(size_t size); int wait_for_tasks(); }; Loading src/daemon/handler/srv_data.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -382,7 +382,7 @@ static hg_return_t rpc_srv_truncate(hg_handle_t handle) { gkfs::data::ChunkTruncateOperation chunk_op{in.path}; try { // start tasklet for truncate operation chunk_op.truncate(0, in.length); chunk_op.truncate(in.length); } catch (const gkfs::data::ChunkMetaOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); Loading src/daemon/ops/data.cpp +39 −33 Original line number Diff line number Diff line Loading @@ -25,6 +25,12 @@ using namespace std; namespace gkfs { namespace data { /* ------------------------------------------------------------------------ * -------------------------- TRUNCATE ------------------------------------ * ------------------------------------------------------------------------*/ /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading @@ -62,69 +68,65 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { } void ChunkTruncateOperation::clear_task_args() { task_args_.clear(); task_arg_ = {}; } ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkOperation{path, 1} {} /** * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed * Only one truncate call is allowed at a time */ void ChunkTruncateOperation::truncate(size_t idx, size_t size) { assert(idx < task_args_.size()); GKFS_DATA->spdlogger()->trace("ChunkMetaOperation::{}() enter: idx '{}' path '{}' size '{}'", __func__, idx, path_, void ChunkTruncateOperation::truncate(size_t size) { assert(!task_eventuals_[0]); GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__, path_, size); auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[idx]); // truncate file return value auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[0]); // truncate file return value if (abt_err != ABT_SUCCESS) { auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT eventual with abt_err '{}'", auto err_str = fmt::format("ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'", __func__, abt_err); throw ChunkMetaOpException(err_str); } auto& task_arg = task_args_[idx]; auto& task_arg = task_arg_; task_arg.path = &path_; task_arg.size = size; task_arg.eventual = task_eventuals_[idx]; task_arg.eventual = task_eventuals_[0]; abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_args_[idx], &abt_tasks_[idx]); abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_, &abt_tasks_[0]); if (abt_err != ABT_SUCCESS) { auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, auto err_str = fmt::format("ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, abt_err); throw ChunkMetaOpException(err_str); } } int ChunkTruncateOperation::wait_for_tasks() { GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_); int trunc_err = 0; /* * gather all Eventual's information. do not throw here to properly cleanup all eventuals * On error, cleanup eventuals and set written data to 0 as written data is corrupted */ for (auto& e : task_eventuals_) { int* task_err = nullptr; auto abt_err = ABT_eventual_wait(e, (void**) &task_err); auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err); if (abt_err != ABT_SUCCESS) { GKFS_DATA->spdlogger()->error("ChunkTruncateOperation::{}() Error when waiting on ABT eventual", __func__); trunc_err = EIO; ABT_eventual_free(&e); continue; ABT_eventual_free(&task_eventuals_[0]); return EIO; } assert(task_err != nullptr); if (*task_err != 0) { trunc_err = *task_err; } ABT_eventual_free(&e); } ABT_eventual_free(&task_eventuals_[0]); return trunc_err; } /* ------------------------------------------------------------------------ * ----------------------------- WRITE ------------------------------------ * ------------------------------------------------------------------------*/ /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading @@ -244,6 +246,10 @@ pair<int, size_t> ChunkWriteOperation::wait_for_tasks() { return make_pair(io_err, total_written); } /* ------------------------------------------------------------------------ * -------------------------- READ ---------------------------------------- * ------------------------------------------------------------------------*/ /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading
include/daemon/ops/data.hpp +2 −4 Original line number Diff line number Diff line Loading @@ -118,7 +118,7 @@ private: ABT_eventual eventual; }; std::vector<struct chunk_truncate_args> task_args_; struct chunk_truncate_args task_arg_{}; static void truncate_abt(void* _arg); Loading @@ -128,11 +128,9 @@ public: explicit ChunkTruncateOperation(const std::string& path); ChunkTruncateOperation(const std::string& path, size_t n); ~ChunkTruncateOperation() = default; void truncate(size_t idx, size_t size); void truncate(size_t size); int wait_for_tasks(); }; Loading
src/daemon/handler/srv_data.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -382,7 +382,7 @@ static hg_return_t rpc_srv_truncate(hg_handle_t handle) { gkfs::data::ChunkTruncateOperation chunk_op{in.path}; try { // start tasklet for truncate operation chunk_op.truncate(0, in.length); chunk_op.truncate(in.length); } catch (const gkfs::data::ChunkMetaOpException& e) { // This exception is caused by setup of Argobots variables. If this fails, something is really wrong GKFS_DATA->spdlogger()->error("{}() while read_async err '{}'", __func__, e.what()); Loading
src/daemon/ops/data.cpp +39 −33 Original line number Diff line number Diff line Loading @@ -25,6 +25,12 @@ using namespace std; namespace gkfs { namespace data { /* ------------------------------------------------------------------------ * -------------------------- TRUNCATE ------------------------------------ * ------------------------------------------------------------------------*/ /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading @@ -62,69 +68,65 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { } void ChunkTruncateOperation::clear_task_args() { task_args_.clear(); task_arg_ = {}; } ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkOperation{path, 1} {} /** * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed * Only one truncate call is allowed at a time */ void ChunkTruncateOperation::truncate(size_t idx, size_t size) { assert(idx < task_args_.size()); GKFS_DATA->spdlogger()->trace("ChunkMetaOperation::{}() enter: idx '{}' path '{}' size '{}'", __func__, idx, path_, void ChunkTruncateOperation::truncate(size_t size) { assert(!task_eventuals_[0]); GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__, path_, size); auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[idx]); // truncate file return value auto abt_err = ABT_eventual_create(sizeof(int), &task_eventuals_[0]); // truncate file return value if (abt_err != ABT_SUCCESS) { auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT eventual with abt_err '{}'", auto err_str = fmt::format("ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'", __func__, abt_err); throw ChunkMetaOpException(err_str); } auto& task_arg = task_args_[idx]; auto& task_arg = task_arg_; task_arg.path = &path_; task_arg.size = size; task_arg.eventual = task_eventuals_[idx]; task_arg.eventual = task_eventuals_[0]; abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_args_[idx], &abt_tasks_[idx]); abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_, &abt_tasks_[0]); if (abt_err != ABT_SUCCESS) { auto err_str = fmt::format("ChunkMetaOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, auto err_str = fmt::format("ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'", __func__, abt_err); throw ChunkMetaOpException(err_str); } } int ChunkTruncateOperation::wait_for_tasks() { GKFS_DATA->spdlogger()->trace("ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_); int trunc_err = 0; /* * gather all Eventual's information. do not throw here to properly cleanup all eventuals * On error, cleanup eventuals and set written data to 0 as written data is corrupted */ for (auto& e : task_eventuals_) { int* task_err = nullptr; auto abt_err = ABT_eventual_wait(e, (void**) &task_err); auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err); if (abt_err != ABT_SUCCESS) { GKFS_DATA->spdlogger()->error("ChunkTruncateOperation::{}() Error when waiting on ABT eventual", __func__); trunc_err = EIO; ABT_eventual_free(&e); continue; ABT_eventual_free(&task_eventuals_[0]); return EIO; } assert(task_err != nullptr); if (*task_err != 0) { trunc_err = *task_err; } ABT_eventual_free(&e); } ABT_eventual_free(&task_eventuals_[0]); return trunc_err; } /* ------------------------------------------------------------------------ * ----------------------------- WRITE ------------------------------------ * ------------------------------------------------------------------------*/ /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading @@ -244,6 +246,10 @@ pair<int, size_t> ChunkWriteOperation::wait_for_tasks() { return make_pair(io_err, total_written); } /* ------------------------------------------------------------------------ * -------------------------- READ ---------------------------------------- * ------------------------------------------------------------------------*/ /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading