Loading src/daemon/handler/h_data.cpp +18 −20 Original line number Original line Diff line number Diff line Loading @@ -191,7 +191,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) { // task structures for async writing // task structures for async writing vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<unique_ptr<struct write_chunk_args>> task_args(in.chunk_n); vector<struct write_chunk_args> task_args(in.chunk_n); /* /* * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk */ */ Loading Loading @@ -251,16 +251,15 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) { // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Starting tasklets for parallel I/O // Starting tasklets for parallel I/O ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value auto task_arg = make_unique<struct write_chunk_args>(); auto& task_arg = task_args[chnk_id_curr]; task_arg->path = path.get(); task_arg.path = path.get(); task_arg->buf = bulk_buf_ptrs[chnk_id_curr]; task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; task_arg->chnk_id = chnk_ids_host[chnk_id_curr]; task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; task_arg->size = chnk_sizes[chnk_id_curr]; task_arg.size = chnk_sizes[chnk_id_curr]; // only the first chunk gets the offset. the chunks are sorted on the client side // only the first chunk gets the offset. the chunks are sorted on the client side task_arg->off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg->eventual = task_eventuals[chnk_id_curr]; task_arg.eventual = task_eventuals[chnk_id_curr]; task_args[chnk_id_curr] = std::move(task_arg); auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args[chnk_id_curr], auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*task_args[chnk_id_curr]), &abt_tasks[chnk_id_curr]); &abt_tasks[chnk_id_curr]); if (abt_ret != ABT_SUCCESS) { if (abt_ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); Loading Loading @@ -380,7 +379,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { // tasks structures // tasks structures vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<unique_ptr<struct read_chunk_args>> task_args(in.chunk_n); vector<struct read_chunk_args> task_args(in.chunk_n); /* /* * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk */ */ Loading Loading @@ -423,16 +422,15 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Starting tasklets for parallel I/O // Starting tasklets for parallel I/O ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value auto task_arg = make_unique<read_chunk_args>(); auto& task_arg = task_args[chnk_id_curr]; task_arg->path = path.get(); task_arg.path = path.get(); task_arg->buf = bulk_buf_ptrs[chnk_id_curr]; task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; task_arg->chnk_id = chnk_ids_host[chnk_id_curr]; task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; task_arg->size = chnk_sizes[chnk_id_curr]; task_arg.size = chnk_sizes[chnk_id_curr]; // only the first chunk gets the offset. the chunks are sorted on the client side // only the first chunk gets the offset. the chunks are sorted on the client side task_arg->off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg->eventual = task_eventuals[chnk_id_curr]; task_arg.eventual = task_eventuals[chnk_id_curr]; task_args[chnk_id_curr] = std::move(task_arg); auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args[chnk_id_curr], auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*task_args[chnk_id_curr]), &abt_tasks[chnk_id_curr]); &abt_tasks[chnk_id_curr]); if (abt_ret != ABT_SUCCESS) { if (abt_ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); Loading Loading
src/daemon/handler/h_data.cpp +18 −20 Original line number Original line Diff line number Diff line Loading @@ -191,7 +191,7 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) { // task structures for async writing // task structures for async writing vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<unique_ptr<struct write_chunk_args>> task_args(in.chunk_n); vector<struct write_chunk_args> task_args(in.chunk_n); /* /* * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk * 3. Calculate chunk sizes that correspond to this host, transfer data, and start tasks to write to disk */ */ Loading Loading @@ -251,16 +251,15 @@ static hg_return_t rpc_srv_write_data(hg_handle_t handle) { // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Starting tasklets for parallel I/O // Starting tasklets for parallel I/O ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value auto task_arg = make_unique<struct write_chunk_args>(); auto& task_arg = task_args[chnk_id_curr]; task_arg->path = path.get(); task_arg.path = path.get(); task_arg->buf = bulk_buf_ptrs[chnk_id_curr]; task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; task_arg->chnk_id = chnk_ids_host[chnk_id_curr]; task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; task_arg->size = chnk_sizes[chnk_id_curr]; task_arg.size = chnk_sizes[chnk_id_curr]; // only the first chunk gets the offset. the chunks are sorted on the client side // only the first chunk gets the offset. the chunks are sorted on the client side task_arg->off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg->eventual = task_eventuals[chnk_id_curr]; task_arg.eventual = task_eventuals[chnk_id_curr]; task_args[chnk_id_curr] = std::move(task_arg); auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &task_args[chnk_id_curr], auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), write_file_abt, &(*task_args[chnk_id_curr]), &abt_tasks[chnk_id_curr]); &abt_tasks[chnk_id_curr]); if (abt_ret != ABT_SUCCESS) { if (abt_ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); Loading Loading @@ -380,7 +379,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { // tasks structures // tasks structures vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_task> abt_tasks(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<ABT_eventual> task_eventuals(in.chunk_n); vector<unique_ptr<struct read_chunk_args>> task_args(in.chunk_n); vector<struct read_chunk_args> task_args(in.chunk_n); /* /* * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk * 3. Calculate chunk sizes that correspond to this host and start tasks to read from disk */ */ Loading Loading @@ -423,16 +422,15 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Delegate chunk I/O operation to local FS to an I/O dedicated ABT pool // Starting tasklets for parallel I/O // Starting tasklets for parallel I/O ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value ABT_eventual_create(sizeof(size_t), &task_eventuals[chnk_id_curr]); // written file return value auto task_arg = make_unique<read_chunk_args>(); auto& task_arg = task_args[chnk_id_curr]; task_arg->path = path.get(); task_arg.path = path.get(); task_arg->buf = bulk_buf_ptrs[chnk_id_curr]; task_arg.buf = bulk_buf_ptrs[chnk_id_curr]; task_arg->chnk_id = chnk_ids_host[chnk_id_curr]; task_arg.chnk_id = chnk_ids_host[chnk_id_curr]; task_arg->size = chnk_sizes[chnk_id_curr]; task_arg.size = chnk_sizes[chnk_id_curr]; // only the first chunk gets the offset. the chunks are sorted on the client side // only the first chunk gets the offset. the chunks are sorted on the client side task_arg->off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg.off = (chnk_id_file == in.chunk_start) ? in.offset : 0; task_arg->eventual = task_eventuals[chnk_id_curr]; task_arg.eventual = task_eventuals[chnk_id_curr]; task_args[chnk_id_curr] = std::move(task_arg); auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &task_args[chnk_id_curr], auto abt_ret = ABT_task_create(RPC_DATA->io_pool(), read_file_abt, &(*task_args[chnk_id_curr]), &abt_tasks[chnk_id_curr]); &abt_tasks[chnk_id_curr]); if (abt_ret != ABT_SUCCESS) { if (abt_ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); ADAFS_DATA->spdlogger()->error("{}() task create failed", __func__); Loading