Loading ifs/src/daemon/handler/h_data.cpp +23 −29 Original line number Diff line number Diff line Loading @@ -431,17 +431,17 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { size_t* task_read_size; // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size); if (task_read_size == nullptr || *task_read_size == 0) { ADAFS_DATA->spdlogger()->error("{}() Reading file task for chunk {} failed and did return anything.", __func__, chnk_id_curr); /* * XXX We have to talk about how chunk errors are handled? Should we try to read again? * In any case we just ignore this for now and return the out.io_size with as much has been read * After all, we can decide on the semantics. */ } else { assert(task_read_size != nullptr); assert(*task_read_size >= 0); if(*task_read_size == 0){ ADAFS_DATA->spdlogger()->warn("{}() Read task for chunk {} returned 0 bytes", __func__, chnk_id_curr); continue; } ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id_curr], bulk_handle, local_offsets[chnk_id_curr], chnk_sizes[chnk_id_curr]); bulk_handle, local_offsets[chnk_id_curr], *task_read_size); if (ret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error( "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}", Loading @@ -452,12 +452,9 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { } out.io_size += *task_read_size; // add task read size to output size } ABT_eventual_free(&task_eventuals[chnk_id_curr]); } // Sanity check to see if all data has been written if (in.total_chunk_size != out.io_size) ADAFS_DATA->spdlogger()->warn("{}() total chunk size {} and out.io_size {} mismatch!", __func__, in.total_chunk_size, out.io_size); ADAFS_DATA->spdlogger()->debug("{}() total chunk size read {}/{}", __func__, out.io_size, in.total_chunk_size); /* * 5. Respond and cleanup */ Loading @@ -465,10 +462,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res); ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle); // free tasks after responding for (auto&& task : abt_tasks) { ABT_task_join(task); ABT_task_free(&task); } cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n); return ret; } Loading Loading
ifs/src/daemon/handler/h_data.cpp +23 −29 Original line number Diff line number Diff line Loading @@ -431,17 +431,17 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { size_t* task_read_size; // wait causes the calling ult to go into BLOCKED state, implicitly yielding to the pool scheduler ABT_eventual_wait(task_eventuals[chnk_id_curr], (void**) &task_read_size); if (task_read_size == nullptr || *task_read_size == 0) { ADAFS_DATA->spdlogger()->error("{}() Reading file task for chunk {} failed and did return anything.", __func__, chnk_id_curr); /* * XXX We have to talk about how chunk errors are handled? Should we try to read again? * In any case we just ignore this for now and return the out.io_size with as much has been read * After all, we can decide on the semantics. */ } else { assert(task_read_size != nullptr); assert(*task_read_size >= 0); if(*task_read_size == 0){ ADAFS_DATA->spdlogger()->warn("{}() Read task for chunk {} returned 0 bytes", __func__, chnk_id_curr); continue; } ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, origin_offsets[chnk_id_curr], bulk_handle, local_offsets[chnk_id_curr], chnk_sizes[chnk_id_curr]); bulk_handle, local_offsets[chnk_id_curr], *task_read_size); if (ret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error( "{}() Failed push chnkid {} on path {} to client. origin offset {} local offset {} chunk size {}", Loading @@ -452,12 +452,9 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { } out.io_size += *task_read_size; // add task read size to output size } ABT_eventual_free(&task_eventuals[chnk_id_curr]); } // Sanity check to see if all data has been written if (in.total_chunk_size != out.io_size) ADAFS_DATA->spdlogger()->warn("{}() total chunk size {} and out.io_size {} mismatch!", __func__, in.total_chunk_size, out.io_size); ADAFS_DATA->spdlogger()->debug("{}() total chunk size read {}/{}", __func__, out.io_size, in.total_chunk_size); /* * 5. Respond and cleanup */ Loading @@ -465,10 +462,7 @@ static hg_return_t rpc_srv_read_data(hg_handle_t handle) { ADAFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__, out.res); ret = rpc_cleanup_respond(&handle, &in, &out, &bulk_handle); // free tasks after responding for (auto&& task : abt_tasks) { ABT_task_join(task); ABT_task_free(&task); } cancel_abt_io(&abt_tasks, &task_eventuals, in.chunk_n); return ret; } Loading