Loading include/config.hpp +12 −0 Original line number Diff line number Diff line Loading @@ -50,6 +50,18 @@ namespace io { * If buffer is not zeroed, sparse regions contain invalid data. */ constexpr auto zero_buffer_before_read = false; /* * When the daemon handler serves a read request, it starts tasklets (for each * chunk) from the io pool to read all chunks of that read request in parallel. * Then another thread is waiting for the first tasklet to finish before * initiating the bulk transfer back to the client for this chunk. * This will continue in sequence, allowing gaps between bulk transfers while * waiting. Although this is CPU efficient, it does not provide the highest I/O. * if spin_lock_read is enabled it will try all tasklets if they are finished * regardless of their order minimizing the gap between bulk transfers. * Due to spinning in a loop this increases CPU utilization */ constexpr auto spin_lock_read = true; } // namespace io namespace log { Loading include/daemon/ops/data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -202,6 +202,7 @@ private: size_t size; off64_t off; ABT_eventual eventual; bool bulk_transfer_done = false; }; std::vector<struct chunk_read_args> task_args_; Loading src/daemon/ops/data.cpp +129 −47 Original line number Diff line number Diff line Loading @@ -384,6 +384,7 @@ ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id, task_arg.size = size; task_arg.off = offset; task_arg.eventual = task_eventuals_[idx]; task_arg.bulk_transfer_done = false; abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args_[idx], &abt_tasks_[idx]); Loading @@ -409,34 +410,52 @@ ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { * longer be executed as the data would be corrupted The loop continues * until all eventuals have been cleaned and freed. */ // TODO refactor both if/else. They have redundant code. if(gkfs::config::io::spin_lock_read) { uint64_t bulk_transfer_cnt = 0; do { for(uint64_t idx = 0; idx < task_args_.size(); idx++) { if(task_args_[idx].bulk_transfer_done) continue; ssize_t* task_size = nullptr; auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); int is_ready = 0; auto abt_err = ABT_eventual_test( task_eventuals_[idx], (void**) &task_size, &is_ready); if(abt_err != ABT_SUCCESS) { GKFS_DATA->spdlogger()->error( "ChunkReadOperation::{}() Error when waiting on ABT eventual", "ChunkReadOperation::{}() Error when testing on ABT eventual", __func__); io_err = EIO; bulk_transfer_cnt = task_args_.size(); ABT_eventual_free(&task_eventuals_[idx]); continue; } // not ready yet, try next if(is_ready == ABT_FALSE) continue; // error occured. stop processing but clean up if(io_err != 0) { task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); continue; } assert(task_size != nullptr); if(*task_size < 0) { // sparse regions do not have chunk files and are therefore skipped // sparse regions do not have chunk files and are therefore // skipped if(-(*task_size) == ENOENT) { task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); continue; } io_err = -(*task_size); // make error code > 0 } else if(*task_size == 0) { // read size of 0 is not an error and can happen because reading the // end-of-file // read size of 0 is not an error and can happen because // reading the end-of-file task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); continue; } else { Loading @@ -444,8 +463,68 @@ ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { GKFS_DATA->spdlogger()->trace( "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer( args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, args.origin_offsets->at(idx), args.local_bulk_handle, args.local_offsets->at(idx), *task_size); if(margo_err != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'", __func__, margo_err); io_err = EBUSY; continue; } total_read += *task_size; } task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); } } while(bulk_transfer_cnt != task_args_.size()); } else { for(uint64_t idx = 0; idx < task_args_.size(); idx++) { ssize_t* task_size = nullptr; auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); if(abt_err != ABT_SUCCESS) { GKFS_DATA->spdlogger()->error( "ChunkReadOperation::{}() Error when waiting on ABT eventual", __func__); io_err = EIO; ABT_eventual_free(&task_eventuals_[idx]); continue; } // error occured. stop processing but clean up if(io_err != 0) { ABT_eventual_free(&task_eventuals_[idx]); continue; } assert(task_size != nullptr); if(*task_size < 0) { // sparse regions do not have chunk files and are therefore // skipped if(-(*task_size) == ENOENT) { ABT_eventual_free(&task_eventuals_[idx]); continue; } io_err = -(*task_size); // make error code > 0 } else if(*task_size == 0) { // read size of 0 is not an error and can happen because reading // the end-of-file ABT_eventual_free(&task_eventuals_[idx]); continue; } else { // successful case, push read data back to client GKFS_DATA->spdlogger()->trace( "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer( args.mid, HG_BULK_PUSH, args.origin_addr, Loading @@ -463,6 +542,9 @@ ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { } ABT_eventual_free(&task_eventuals_[idx]); } } // in case of error set read size to zero as data would be corrupted if(io_err != 0) total_read = 0; Loading Loading
include/config.hpp +12 −0 Original line number Diff line number Diff line Loading @@ -50,6 +50,18 @@ namespace io { * If buffer is not zeroed, sparse regions contain invalid data. */ constexpr auto zero_buffer_before_read = false; /* * When the daemon handler serves a read request, it starts tasklets (for each * chunk) from the io pool to read all chunks of that read request in parallel. * Then another thread is waiting for the first tasklet to finish before * initiating the bulk transfer back to the client for this chunk. * This will continue in sequence, allowing gaps between bulk transfers while * waiting. Although this is CPU efficient, it does not provide the highest I/O. * if spin_lock_read is enabled it will try all tasklets if they are finished * regardless of their order minimizing the gap between bulk transfers. * Due to spinning in a loop this increases CPU utilization */ constexpr auto spin_lock_read = true; } // namespace io namespace log { Loading
include/daemon/ops/data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -202,6 +202,7 @@ private: size_t size; off64_t off; ABT_eventual eventual; bool bulk_transfer_done = false; }; std::vector<struct chunk_read_args> task_args_; Loading
src/daemon/ops/data.cpp +129 −47 Original line number Diff line number Diff line Loading @@ -384,6 +384,7 @@ ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id, task_arg.size = size; task_arg.off = offset; task_arg.eventual = task_eventuals_[idx]; task_arg.bulk_transfer_done = false; abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args_[idx], &abt_tasks_[idx]); Loading @@ -409,34 +410,52 @@ ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { * longer be executed as the data would be corrupted The loop continues * until all eventuals have been cleaned and freed. */ // TODO refactor both if/else. They have redundant code. if(gkfs::config::io::spin_lock_read) { uint64_t bulk_transfer_cnt = 0; do { for(uint64_t idx = 0; idx < task_args_.size(); idx++) { if(task_args_[idx].bulk_transfer_done) continue; ssize_t* task_size = nullptr; auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); int is_ready = 0; auto abt_err = ABT_eventual_test( task_eventuals_[idx], (void**) &task_size, &is_ready); if(abt_err != ABT_SUCCESS) { GKFS_DATA->spdlogger()->error( "ChunkReadOperation::{}() Error when waiting on ABT eventual", "ChunkReadOperation::{}() Error when testing on ABT eventual", __func__); io_err = EIO; bulk_transfer_cnt = task_args_.size(); ABT_eventual_free(&task_eventuals_[idx]); continue; } // not ready yet, try next if(is_ready == ABT_FALSE) continue; // error occured. stop processing but clean up if(io_err != 0) { task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); continue; } assert(task_size != nullptr); if(*task_size < 0) { // sparse regions do not have chunk files and are therefore skipped // sparse regions do not have chunk files and are therefore // skipped if(-(*task_size) == ENOENT) { task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); continue; } io_err = -(*task_size); // make error code > 0 } else if(*task_size == 0) { // read size of 0 is not an error and can happen because reading the // end-of-file // read size of 0 is not an error and can happen because // reading the end-of-file task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); continue; } else { Loading @@ -444,8 +463,68 @@ ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { GKFS_DATA->spdlogger()->trace( "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer( args.mid, HG_BULK_PUSH, args.origin_addr, args.origin_bulk_handle, args.origin_offsets->at(idx), args.local_bulk_handle, args.local_offsets->at(idx), *task_size); if(margo_err != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'", __func__, margo_err); io_err = EBUSY; continue; } total_read += *task_size; } task_args_[idx].bulk_transfer_done = true; bulk_transfer_cnt++; ABT_eventual_free(&task_eventuals_[idx]); } } while(bulk_transfer_cnt != task_args_.size()); } else { for(uint64_t idx = 0; idx < task_args_.size(); idx++) { ssize_t* task_size = nullptr; auto abt_err = ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size); if(abt_err != ABT_SUCCESS) { GKFS_DATA->spdlogger()->error( "ChunkReadOperation::{}() Error when waiting on ABT eventual", __func__); io_err = EIO; ABT_eventual_free(&task_eventuals_[idx]); continue; } // error occured. stop processing but clean up if(io_err != 0) { ABT_eventual_free(&task_eventuals_[idx]); continue; } assert(task_size != nullptr); if(*task_size < 0) { // sparse regions do not have chunk files and are therefore // skipped if(-(*task_size) == ENOENT) { ABT_eventual_free(&task_eventuals_[idx]); continue; } io_err = -(*task_size); // make error code > 0 } else if(*task_size == 0) { // read size of 0 is not an error and can happen because reading // the end-of-file ABT_eventual_free(&task_eventuals_[idx]); continue; } else { // successful case, push read data back to client GKFS_DATA->spdlogger()->trace( "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'", __func__, path_, args.chunk_ids->at(idx), args.origin_offsets->at(idx), args.local_offsets->at(idx), *task_size); assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx)); auto margo_err = margo_bulk_transfer( args.mid, HG_BULK_PUSH, args.origin_addr, Loading @@ -463,6 +542,9 @@ ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) { } ABT_eventual_free(&task_eventuals_[idx]); } } // in case of error set read size to zero as data would be corrupted if(io_err != 0) total_read = 0; Loading