Loading include/daemon/ops/data.hpp +50 −11 Original line number Diff line number Diff line Loading @@ -58,8 +58,9 @@ public: * * In the future, this class may be used to provide failure tolerance for IO tasks * * Abstract base class without public constructor: * Base class using the CRTP idiom */ template<class OperationType> class ChunkOperation { protected: Loading @@ -69,17 +70,46 @@ protected: std::vector<ABT_task> abt_tasks_; std::vector<ABT_eventual> task_eventuals_; virtual void cancel_all_tasks(); public: explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {}; explicit ChunkOperation(const std::string& path); ChunkOperation(std::string path, size_t n) : path_(std::move(path)) { // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults abt_tasks_.resize(n); task_eventuals_.resize(n); }; ChunkOperation(std::string path, size_t n); ~ChunkOperation() { cancel_all_tasks(); } ~ChunkOperation(); /** * Cleans up and cancels all tasks in flight */ void cancel_all_tasks() { GKFS_DATA->spdlogger()->trace("{}() enter", __func__); for (auto& task : abt_tasks_) { if (task) { ABT_task_cancel(task); ABT_task_free(&task); } } for (auto& eventual : task_eventuals_) { if (eventual) { ABT_eventual_reset(eventual); ABT_eventual_free(&eventual); } } abt_tasks_.clear(); task_eventuals_.clear(); static_cast<OperationType*>(this)->clear_task_args(); } }; class ChunkTruncateOperation : public ChunkOperation { class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> { friend class ChunkOperation<ChunkTruncateOperation>; private: struct chunk_truncate_args { Loading @@ -92,20 +122,24 @@ private: static void truncate_abt(void* _arg); void clear_task_args(); public: explicit ChunkTruncateOperation(const std::string& path); ChunkTruncateOperation(const std::string& path, size_t n); void cancel_all_tasks() override; ~ChunkTruncateOperation() = default; void truncate(size_t idx, size_t size); int wait_for_tasks(); }; class ChunkWriteOperation : public ChunkOperation { class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> { friend class ChunkOperation<ChunkWriteOperation>; private: struct chunk_write_args { Loading @@ -121,11 +155,13 @@ private: static void write_file_abt(void* _arg); void clear_task_args(); public: ChunkWriteOperation(const std::string& path, size_t n); void cancel_all_tasks() override; ~ChunkWriteOperation() = default; void write_async(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset); Loading @@ -134,7 +170,8 @@ public: }; class ChunkReadOperation : public ChunkOperation { class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> { friend class ChunkOperation<ChunkReadOperation>; private: Loading @@ -151,6 +188,8 @@ private: static void read_file_abt(void* _arg); void clear_task_args(); public: struct bulk_args { Loading @@ -165,7 +204,7 @@ public: ChunkReadOperation(const std::string& path, size_t n); void cancel_all_tasks() override; ~ChunkReadOperation() = default; void read_async(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset); Loading src/daemon/ops/data.cpp +12 −49 Original line number Diff line number Diff line Loading @@ -25,39 +25,6 @@ using namespace std; namespace gkfs { namespace data { /** * Cleans up and cancels all tasks in flight */ void ChunkOperation::cancel_all_tasks() { GKFS_DATA->spdlogger()->trace("{}() enter", __func__); for (auto& task : abt_tasks_) { if (task) { ABT_task_cancel(task); ABT_task_free(&task); } } for (auto& eventual : task_eventuals_) { if (eventual) { ABT_eventual_reset(eventual); ABT_eventual_free(&eventual); } } abt_tasks_.clear(); task_eventuals_.clear(); } ChunkOperation::ChunkOperation(const string& path) : ChunkOperation(path, 1) {} ChunkOperation::ChunkOperation(string path, size_t n) : path_(std::move(path)) { // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults abt_tasks_.resize(n); task_eventuals_.resize(n); } ChunkOperation::~ChunkOperation() { cancel_all_tasks(); } /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading @@ -94,17 +61,16 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { ABT_eventual_set(arg->eventual, &err_response, sizeof(int)); } void ChunkTruncateOperation::clear_task_args() { task_args_.clear(); } ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } void ChunkTruncateOperation::cancel_all_tasks() { ChunkOperation::cancel_all_tasks(); task_args_.clear(); } /** * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed */ Loading Loading @@ -190,13 +156,12 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); } ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); void ChunkWriteOperation::clear_task_args() { task_args_.clear(); } void ChunkWriteOperation::cancel_all_tasks() { ChunkOperation::cancel_all_tasks(); task_args_.clear(); ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } /** Loading Loading @@ -311,13 +276,12 @@ void ChunkReadOperation::read_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); } ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); void ChunkReadOperation::clear_task_args() { task_args_.clear(); } void ChunkReadOperation::cancel_all_tasks() { ChunkOperation::cancel_all_tasks(); task_args_.clear(); ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } /** Loading Loading @@ -403,7 +367,6 @@ pair<int, size_t> ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_ar "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); // TODO try, repeat do-while assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer(args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, args.origin_offsets->at(idx), args.local_bulk_handle, Loading Loading
include/daemon/ops/data.hpp +50 −11 Original line number Diff line number Diff line Loading @@ -58,8 +58,9 @@ public: * * In the future, this class may be used to provide failure tolerance for IO tasks * * Abstract base class without public constructor: * Base class using the CRTP idiom */ template<class OperationType> class ChunkOperation { protected: Loading @@ -69,17 +70,46 @@ protected: std::vector<ABT_task> abt_tasks_; std::vector<ABT_eventual> task_eventuals_; virtual void cancel_all_tasks(); public: explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {}; explicit ChunkOperation(const std::string& path); ChunkOperation(std::string path, size_t n) : path_(std::move(path)) { // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults abt_tasks_.resize(n); task_eventuals_.resize(n); }; ChunkOperation(std::string path, size_t n); ~ChunkOperation() { cancel_all_tasks(); } ~ChunkOperation(); /** * Cleans up and cancels all tasks in flight */ void cancel_all_tasks() { GKFS_DATA->spdlogger()->trace("{}() enter", __func__); for (auto& task : abt_tasks_) { if (task) { ABT_task_cancel(task); ABT_task_free(&task); } } for (auto& eventual : task_eventuals_) { if (eventual) { ABT_eventual_reset(eventual); ABT_eventual_free(&eventual); } } abt_tasks_.clear(); task_eventuals_.clear(); static_cast<OperationType*>(this)->clear_task_args(); } }; class ChunkTruncateOperation : public ChunkOperation { class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> { friend class ChunkOperation<ChunkTruncateOperation>; private: struct chunk_truncate_args { Loading @@ -92,20 +122,24 @@ private: static void truncate_abt(void* _arg); void clear_task_args(); public: explicit ChunkTruncateOperation(const std::string& path); ChunkTruncateOperation(const std::string& path, size_t n); void cancel_all_tasks() override; ~ChunkTruncateOperation() = default; void truncate(size_t idx, size_t size); int wait_for_tasks(); }; class ChunkWriteOperation : public ChunkOperation { class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> { friend class ChunkOperation<ChunkWriteOperation>; private: struct chunk_write_args { Loading @@ -121,11 +155,13 @@ private: static void write_file_abt(void* _arg); void clear_task_args(); public: ChunkWriteOperation(const std::string& path, size_t n); void cancel_all_tasks() override; ~ChunkWriteOperation() = default; void write_async(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset); Loading @@ -134,7 +170,8 @@ public: }; class ChunkReadOperation : public ChunkOperation { class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> { friend class ChunkOperation<ChunkReadOperation>; private: Loading @@ -151,6 +188,8 @@ private: static void read_file_abt(void* _arg); void clear_task_args(); public: struct bulk_args { Loading @@ -165,7 +204,7 @@ public: ChunkReadOperation(const std::string& path, size_t n); void cancel_all_tasks() override; ~ChunkReadOperation() = default; void read_async(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset); Loading
src/daemon/ops/data.cpp +12 −49 Original line number Diff line number Diff line Loading @@ -25,39 +25,6 @@ using namespace std; namespace gkfs { namespace data { /** * Cleans up and cancels all tasks in flight */ void ChunkOperation::cancel_all_tasks() { GKFS_DATA->spdlogger()->trace("{}() enter", __func__); for (auto& task : abt_tasks_) { if (task) { ABT_task_cancel(task); ABT_task_free(&task); } } for (auto& eventual : task_eventuals_) { if (eventual) { ABT_eventual_reset(eventual); ABT_eventual_free(&eventual); } } abt_tasks_.clear(); task_eventuals_.clear(); } ChunkOperation::ChunkOperation(const string& path) : ChunkOperation(path, 1) {} ChunkOperation::ChunkOperation(string path, size_t n) : path_(std::move(path)) { // Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults abt_tasks_.resize(n); task_eventuals_.resize(n); } ChunkOperation::~ChunkOperation() { cancel_all_tasks(); } /** * Used by an argobots tasklet. Argument args has the following fields: * const string* path; Loading Loading @@ -94,17 +61,16 @@ void ChunkTruncateOperation::truncate_abt(void* _arg) { ABT_eventual_set(arg->eventual, &err_response, sizeof(int)); } void ChunkTruncateOperation::clear_task_args() { task_args_.clear(); } ChunkTruncateOperation::ChunkTruncateOperation(const string& path) : ChunkTruncateOperation{path, 1} {} ChunkTruncateOperation::ChunkTruncateOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } void ChunkTruncateOperation::cancel_all_tasks() { ChunkOperation::cancel_all_tasks(); task_args_.clear(); } /** * Starts a tasklet for requested truncate. In essence all chunk files after the given offset is removed */ Loading Loading @@ -190,13 +156,12 @@ void ChunkWriteOperation::write_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &wrote, sizeof(ssize_t)); } ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); void ChunkWriteOperation::clear_task_args() { task_args_.clear(); } void ChunkWriteOperation::cancel_all_tasks() { ChunkOperation::cancel_all_tasks(); task_args_.clear(); ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } /** Loading Loading @@ -311,13 +276,12 @@ void ChunkReadOperation::read_file_abt(void* _arg) { ABT_eventual_set(arg->eventual, &read, sizeof(ssize_t)); } ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); void ChunkReadOperation::clear_task_args() { task_args_.clear(); } void ChunkReadOperation::cancel_all_tasks() { ChunkOperation::cancel_all_tasks(); task_args_.clear(); ChunkReadOperation::ChunkReadOperation(const string& path, size_t n) : ChunkOperation{path, n} { task_args_.resize(n); } /** Loading Loading @@ -403,7 +367,6 @@ pair<int, size_t> ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_ar "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); // TODO try, repeat do-while assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer(args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, args.origin_offsets->at(idx), args.local_bulk_handle, Loading