Loading ifs/src/preload/rpc/ld_rpc_data_ws.cpp +92 −80 Original line number Diff line number Diff line Loading @@ -25,22 +25,30 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); map<uint64_t, vector<uint64_t>> target_chnks{}; // contains the target ids, used to access the target_chnks map. First idx is chunk with potential offset vector<uint64_t> targets{}; // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) { auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size); if (target_chnks.count(target) == 0) { target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id})); targets.push_back(target); } else dest_ids[recipient].push_back(i); target_chnks[target].push_back(chnk_id); // set first and last chnk targets if (chnk_id == chnk_start) chnk_start_target = target; if (chnk_id == chnk_end - 1) chnk_end_target = target; } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); vector<rpc_write_data_in_t> rpc_in(dest_n); auto target_n = targets.size(); vector<hg_handle_t> rpc_handles(target_n); vector<margo_request> rpc_waiters(target_n); vector<rpc_write_data_in_t> rpc_in(target_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = const_cast<void*>(buf); hg_bulk_t ipc_bulk_handle = nullptr; Loading @@ -59,31 +67,29 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk for (uint64_t target = 0; target < target_n; target++) { auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; // total chunk_size for target if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract if (target == chnk_end_target && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE)); // RPC auto chnk_ids = &dest_ids[dest_idx[i]]; // fill in rpc_in[i].path = path.c_str(); rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination rpc_in[i].chunk_start = chnk_start; // chunk start id of this write rpc_in[i].chunk_end = chnk_end; // chunk end id of this write rpc_in[i].total_chunk_size = total_chunk_size; // total size to write rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]); // Fill RPC input rpc_in[target].path = path.c_str(); rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination rpc_in[target].chunk_start = chnk_start; // chunk start id of this write rpc_in[target].chunk_end = chnk_end; // chunk end id of this write rpc_in[target].total_chunk_size = total_chunk_size; // total size to write rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_write_data_id, rpc_write_data_id, targets[target], rpc_handles[target], false); // Send RPC ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { for (uint64_t j = 0; j < target + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer Loading @@ -97,28 +103,28 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { for (uint64_t target = 0; target < target_n; target++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); ret = margo_wait(rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); ret = margo_get_output(rpc_handles[target], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); margo_free_output(rpc_handles[target], &out); margo_destroy(rpc_handles[target]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); Loading @@ -138,22 +144,30 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); map<uint64_t, vector<uint64_t>> target_chnks{}; // contains the recipient ids, used to access the target_chnks map. First idx is chunk with potential offset vector<uint64_t> targets{}; // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) { auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size); if (target_chnks.count(target) == 0) { target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id})); targets.push_back(target); } else dest_ids[recipient].push_back(i); target_chnks[target].push_back(chnk_id); // set first and last chnk targets if (chnk_id == chnk_start) chnk_start_target = target; if (chnk_id == chnk_end - 1) chnk_end_target = target; } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); vector<rpc_read_data_in_t> rpc_in(dest_n); auto target_n = targets.size(); vector<hg_handle_t> rpc_handles(target_n); vector<margo_request> rpc_waiters(target_n); vector<rpc_read_data_in_t> rpc_in(target_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = buf; hg_bulk_t ipc_bulk_handle = nullptr; Loading @@ -172,31 +186,29 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk for (uint64_t target = 0; target < target_n; target++) { auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract if (target == chnk_end_target && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE)); // RPC auto chnk_ids = &dest_ids[dest_idx[i]]; // fill in rpc_in[i].path = path.c_str(); rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination rpc_in[i].chunk_start = chnk_start; // chunk start id of this write rpc_in[i].chunk_end = chnk_end; // chunk end id of this write rpc_in[i].total_chunk_size = total_chunk_size; // total size to write rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]); // Fill RPC input rpc_in[target].path = path.c_str(); rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination rpc_in[target].chunk_start = chnk_start; // chunk start id of this write rpc_in[target].chunk_end = chnk_end; // chunk end id of this write rpc_in[target].total_chunk_size = total_chunk_size; // total size to write rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_read_data_id, rpc_read_data_id, targets[target], rpc_handles[target], false); // Send RPC ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { for (uint64_t j = 0; j < target + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer Loading @@ -210,28 +222,28 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { for (uint64_t target = 0; target < target_n; target++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); ret = margo_wait(rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); ret = margo_get_output(rpc_handles[target], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); margo_free_output(rpc_handles[target], &out); margo_destroy(rpc_handles[target]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); Loading Loading
ifs/src/preload/rpc/ld_rpc_data_ws.cpp +92 −80 Original line number Diff line number Diff line Loading @@ -25,22 +25,30 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); map<uint64_t, vector<uint64_t>> target_chnks{}; // contains the target ids, used to access the target_chnks map. First idx is chunk with potential offset vector<uint64_t> targets{}; // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) { auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size); if (target_chnks.count(target) == 0) { target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id})); targets.push_back(target); } else dest_ids[recipient].push_back(i); target_chnks[target].push_back(chnk_id); // set first and last chnk targets if (chnk_id == chnk_start) chnk_start_target = target; if (chnk_id == chnk_end - 1) chnk_end_target = target; } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); vector<rpc_write_data_in_t> rpc_in(dest_n); auto target_n = targets.size(); vector<hg_handle_t> rpc_handles(target_n); vector<margo_request> rpc_waiters(target_n); vector<rpc_write_data_in_t> rpc_in(target_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = const_cast<void*>(buf); hg_bulk_t ipc_bulk_handle = nullptr; Loading @@ -59,31 +67,29 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk for (uint64_t target = 0; target < target_n; target++) { auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; // total chunk_size for target if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract if (target == chnk_end_target && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE)); // RPC auto chnk_ids = &dest_ids[dest_idx[i]]; // fill in rpc_in[i].path = path.c_str(); rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination rpc_in[i].chunk_start = chnk_start; // chunk start id of this write rpc_in[i].chunk_end = chnk_end; // chunk end id of this write rpc_in[i].total_chunk_size = total_chunk_size; // total size to write rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]); // Fill RPC input rpc_in[target].path = path.c_str(); rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination rpc_in[target].chunk_start = chnk_start; // chunk start id of this write rpc_in[target].chunk_end = chnk_end; // chunk end id of this write rpc_in[target].total_chunk_size = total_chunk_size; // total size to write rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_write_data_id, rpc_write_data_id, targets[target], rpc_handles[target], false); // Send RPC ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { for (uint64_t j = 0; j < target + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer Loading @@ -97,28 +103,28 @@ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_fl // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { for (uint64_t target = 0; target < target_n; target++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); ret = margo_wait(rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); ret = margo_get_output(rpc_handles[target], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); margo_free_output(rpc_handles[target], &out); margo_destroy(rpc_handles[target]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); Loading @@ -138,22 +144,30 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); map<uint64_t, vector<uint64_t>> target_chnks{}; // contains the recipient ids, used to access the target_chnks map. First idx is chunk with potential offset vector<uint64_t> targets{}; // targets for the first and last chunk as they need special treatment uint64_t chnk_start_target = 0; uint64_t chnk_end_target = 0; for (uint64_t chnk_id = chnk_start; chnk_id < chnk_end; chnk_id++) { auto target = adafs_hash_path_chunk(path, chnk_id, fs_config->host_size); if (target_chnks.count(target) == 0) { target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id})); targets.push_back(target); } else dest_ids[recipient].push_back(i); target_chnks[target].push_back(chnk_id); // set first and last chnk targets if (chnk_id == chnk_start) chnk_start_target = target; if (chnk_id == chnk_end - 1) chnk_end_target = target; } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); vector<rpc_read_data_in_t> rpc_in(dest_n); auto target_n = targets.size(); vector<hg_handle_t> rpc_handles(target_n); vector<margo_request> rpc_waiters(target_n); vector<rpc_read_data_in_t> rpc_in(target_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = buf; hg_bulk_t ipc_bulk_handle = nullptr; Loading @@ -172,31 +186,29 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk for (uint64_t target = 0; target < target_n; target++) { auto total_chunk_size = target_chnks[targets[target]].size() * CHUNKSIZE; if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract if (target == chnk_end_target && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE)); // RPC auto chnk_ids = &dest_ids[dest_idx[i]]; // fill in rpc_in[i].path = path.c_str(); rpc_in[i].offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset rpc_in[i].chunk_n = chnk_ids->size(); // number of chunks handled by that destination rpc_in[i].chunk_start = chnk_start; // chunk start id of this write rpc_in[i].chunk_end = chnk_end; // chunk end id of this write rpc_in[i].total_chunk_size = total_chunk_size; // total size to write rpc_in[i].bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], false); ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]); // Fill RPC input rpc_in[target].path = path.c_str(); rpc_in[target].offset = offset % CHUNKSIZE;// first offset in targets is the chunk with a potential offset rpc_in[target].chunk_n = target_chnks[targets[target]].size(); // number of chunks handled by that destination rpc_in[target].chunk_start = chnk_start; // chunk start id of this write rpc_in[target].chunk_end = chnk_end; // chunk end id of this write rpc_in[target].total_chunk_size = total_chunk_size; // total size to write rpc_in[target].bulk_handle = (targets[target] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; margo_create_wrap(ipc_read_data_id, rpc_read_data_id, targets[target], rpc_handles[target], false); // Send RPC ret = margo_iforward(rpc_handles[target], &rpc_in[target], &rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { for (uint64_t j = 0; j < target + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer Loading @@ -210,28 +222,28 @@ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { for (uint64_t target = 0; target < target_n; target++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); ret = margo_wait(rpc_waiters[target]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); targets[target]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); ret = margo_get_output(rpc_handles[target], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[target]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); margo_free_output(rpc_handles[target], &out); margo_destroy(rpc_handles[target]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); Loading