Loading ifs/include/preload/rpc/ld_rpc_data_ws.hpp +5 −1 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ extern "C" { #include <iostream> // XXX these two structs can be merged. How to deal with const void* then? struct write_args { std::shared_ptr<std::string> path; size_t total_chunk_size; Loading @@ -42,6 +41,11 @@ struct read_args { ABT_eventual eventual; }; ssize_t rpc_send_write(const std::string& path, const void* buf, const bool append_flag, const off64_t in_offset, const size_t write_size, const int64_t updated_metadentry_size); ssize_t rpc_send_read(const std::string& path, void* buf, const off64_t offset, const size_t read_size); void rpc_send_write_abt(void* _arg); void rpc_send_read_abt(void* _arg); Loading ifs/src/preload/adafs_functions.cpp +12 −119 Original line number Diff line number Diff line #include <preload/adafs_functions.hpp> #include <preload/rpc/ld_rpc_metadentry.hpp> #include <preload/rpc/ld_rpc_data_ws.hpp> #include <global/rpc/rpc_utils.hpp> using namespace std; Loading Loading @@ -169,136 +168,30 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) { auto adafs_fd = file_map.get(fd); auto path = make_shared<string>(adafs_fd->path()); auto append_flag = adafs_fd->get_flag(OpenFile_flags::append); int err = 0; ssize_t ret = 0; long updated_size = 0; auto write_size = static_cast<size_t>(0); err = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size); if (err != 0) { ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err); ret = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size); if (ret != 0) { ld_logger->error("{}() update_metadentry_size failed with ret {}", __func__, ret); return 0; // ERR } if (append_flag) offset = updated_size - count; auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end) if ((offset + count) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids auto dest_n = dest_idx.size(); vector<ABT_eventual> eventuals(dest_n); vector<unique_ptr<struct write_args>> thread_args(dest_n); for (uint64_t i = 0; i < dest_n; i++) { ABT_eventual_create(sizeof(size_t), &eventuals[i]); auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE)); auto args = make_unique<write_args>(); args->path = path; // path args->total_chunk_size = total_chunk_size; // total size to write args->in_size = count; args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset args->buf = buf;// pointer to write buffer args->chnk_start = chnk_start;// append flag when file was opened args->chnk_end = chnk_end;// append flag when file was opened args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination args->recipient = dest_idx[i];// recipient args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size thread_args[i] = std::move(args); ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr); ret = rpc_send_write(*path, buf, append_flag, offset, count, updated_size); if (ret < 0) { ld_logger->warn("{}() rpc_send_write failed with ret {}", __func__, ret); } // Sum written sizes for (uint64_t i = 0; i < dest_n; i++) { size_t* thread_ret_size; ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size); if (thread_ret_size == nullptr || *thread_ret_size == 0) { // TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i); } else write_size += *thread_ret_size; ABT_eventual_free(&eventuals[i]); } return write_size; return ret; // return written size or -1 as error } ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) { init_ld_env_if_needed(); auto adafs_fd = file_map.get(fd); auto path = make_shared<string>(adafs_fd->path()); auto read_size = static_cast<size_t>(0); auto err = 0; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end) if ((offset + count) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids auto dest_n = dest_idx.size(); vector<ABT_eventual> eventuals(dest_n); vector<unique_ptr<struct read_args>> thread_args(dest_n); for (uint64_t i = 0; i < dest_n; i++) { ABT_eventual_create(sizeof(size_t), &eventuals[i]); auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE)); auto args = make_unique<read_args>(); args->path = path; args->total_chunk_size = total_chunk_size; args->in_size = count;// total size to read args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk args->buf = buf; args->chnk_start = chnk_start; args->chnk_end = chnk_end; args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination args->recipient = dest_idx[i];// recipient args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size thread_args[i] = std::move(args); // Threads are implicitly released once calling function finishes ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr); } // Sum read sizes for (uint64_t i = 0; i < dest_n; i++) { size_t* thread_ret_size; ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size); if (thread_ret_size == nullptr || *thread_ret_size == 0) { err = -1; ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i); } else read_size += *thread_ret_size; ABT_eventual_free(&eventuals[i]); auto ret = rpc_send_read(*path, buf, offset, count); if (ret < 0) { ld_logger->warn("{}() rpc_send_read failed with ret {}", __func__, ret); } // XXX check how much we need to deal with the read_size // XXX check that we don't try to read past end of the file return err == 0 ? read_size : 0; return ret; // return read size or -1 as error } No newline at end of file ifs/src/preload/rpc/ld_rpc_data_ws.cpp +239 −0 Original line number Diff line number Diff line #include <preload/rpc/ld_rpc_data_ws.hpp> #include <global/rpc/rpc_utils.hpp> using namespace std; // TODO If we decide to keep this functionality with one segment, the function can be merged mostly. // Code is mostly redundant /** * Sends an RPC request to a specific node to pull all chunks that belong to him */ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_flag, const off64_t in_offset, const size_t write_size, const int64_t updated_metadentry_size) { // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks off64_t offset = in_offset; if (append_flag) offset = updated_metadentry_size - write_size; auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number // last chunk number (right-open) [chnk_start,chnk_end) auto chnk_end = static_cast<uint64_t>((offset + write_size) / CHUNKSIZE + 1); if ((offset + write_size) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = const_cast<void*>(buf); hg_bulk_t ipc_bulk_handle = nullptr; hg_bulk_t rpc_bulk_handle = nullptr; auto size = make_shared<size_t>(write_size); auto ret = margo_bulk_create(ld_margo_rpc_id, 1, &bulk_buf, size.get(), HG_BULK_READ_ONLY, &rpc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } ret = margo_bulk_create(ld_margo_ipc_id, 1, &bulk_buf, size.get(), HG_BULK_READ_ONLY, &ipc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE)); // RPC hg_addr_t svr_addr = HG_ADDR_NULL; auto chnk_ids = &dest_ids[dest_idx[i]]; rpc_write_data_in_t in{}; // fill in in.path = path.c_str(); in.offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset in.chunk_n = chnk_ids->size(); // number of chunks handled by that destination in.chunk_start = chnk_start; // chunk start id of this write in.chunk_end = chnk_end; // chunk end id of this write in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return -1; } } // Wait for RPC responses and then get response and add it to out_size which is the written size // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return (err < 0) ? err : out_size; } /** * Sends an RPC request to a specific node to push all chunks that belong to him */ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const size_t read_size) { // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number // last chunk number (right-open) [chnk_start,chnk_end) auto chnk_end = static_cast<uint64_t>((offset + read_size) / CHUNKSIZE + 1); if ((offset + read_size) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = buf; hg_bulk_t ipc_bulk_handle = nullptr; hg_bulk_t rpc_bulk_handle = nullptr; auto size = make_shared<size_t>(read_size); auto ret = margo_bulk_create(ld_margo_rpc_id, 1, &bulk_buf, size.get(), HG_BULK_WRITE_ONLY, &rpc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } ret = margo_bulk_create(ld_margo_ipc_id, 1, &bulk_buf, size.get(), HG_BULK_WRITE_ONLY, &ipc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE)); // RPC hg_addr_t svr_addr = HG_ADDR_NULL; auto chnk_ids = &dest_ids[dest_idx[i]]; rpc_read_data_in_t in{}; // fill in in.path = path.c_str(); in.offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset in.chunk_n = chnk_ids->size(); // number of chunks handled by that destination in.chunk_start = chnk_start; // chunk start id of this write in.chunk_end = chnk_end; // chunk end id of this write in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return -1; } } // Wait for RPC responses and then get response and add it to out_size which is the read size // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return (err < 0) ? err : out_size; } /** * Called by an argobots thread in pwrite() and sends all chunks that go to the same destination at once * @param _arg <struct write_args*> Loading Loading
ifs/include/preload/rpc/ld_rpc_data_ws.hpp +5 −1 Original line number Diff line number Diff line Loading @@ -15,7 +15,6 @@ extern "C" { #include <iostream> // XXX these two structs can be merged. How to deal with const void* then? struct write_args { std::shared_ptr<std::string> path; size_t total_chunk_size; Loading @@ -42,6 +41,11 @@ struct read_args { ABT_eventual eventual; }; ssize_t rpc_send_write(const std::string& path, const void* buf, const bool append_flag, const off64_t in_offset, const size_t write_size, const int64_t updated_metadentry_size); ssize_t rpc_send_read(const std::string& path, void* buf, const off64_t offset, const size_t read_size); void rpc_send_write_abt(void* _arg); void rpc_send_read_abt(void* _arg); Loading
ifs/src/preload/adafs_functions.cpp +12 −119 Original line number Diff line number Diff line #include <preload/adafs_functions.hpp> #include <preload/rpc/ld_rpc_metadentry.hpp> #include <preload/rpc/ld_rpc_data_ws.hpp> #include <global/rpc/rpc_utils.hpp> using namespace std; Loading Loading @@ -169,136 +168,30 @@ ssize_t adafs_pwrite_ws(int fd, const void* buf, size_t count, off64_t offset) { auto adafs_fd = file_map.get(fd); auto path = make_shared<string>(adafs_fd->path()); auto append_flag = adafs_fd->get_flag(OpenFile_flags::append); int err = 0; ssize_t ret = 0; long updated_size = 0; auto write_size = static_cast<size_t>(0); err = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size); if (err != 0) { ld_logger->error("{}() update_metadentry_size failed with err {}", __func__, err); ret = rpc_send_update_metadentry_size(*path, count, offset, append_flag, updated_size); if (ret != 0) { ld_logger->error("{}() update_metadentry_size failed with ret {}", __func__, ret); return 0; // ERR } if (append_flag) offset = updated_size - count; auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end) if ((offset + count) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids auto dest_n = dest_idx.size(); vector<ABT_eventual> eventuals(dest_n); vector<unique_ptr<struct write_args>> thread_args(dest_n); for (uint64_t i = 0; i < dest_n; i++) { ABT_eventual_create(sizeof(size_t), &eventuals[i]); auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE)); auto args = make_unique<write_args>(); args->path = path; // path args->total_chunk_size = total_chunk_size; // total size to write args->in_size = count; args->in_offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset args->buf = buf;// pointer to write buffer args->chnk_start = chnk_start;// append flag when file was opened args->chnk_end = chnk_end;// append flag when file was opened args->chnk_ids = &dest_ids[dest_idx[i]];// pointer to list of chunk ids that all go to the same destination args->recipient = dest_idx[i];// recipient args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size thread_args[i] = std::move(args); ABT_thread_create(io_pool, rpc_send_write_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr); ret = rpc_send_write(*path, buf, append_flag, offset, count, updated_size); if (ret < 0) { ld_logger->warn("{}() rpc_send_write failed with ret {}", __func__, ret); } // Sum written sizes for (uint64_t i = 0; i < dest_n; i++) { size_t* thread_ret_size; ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size); if (thread_ret_size == nullptr || *thread_ret_size == 0) { // TODO error handling if write of a thread failed. all data needs to be deleted and size update reverted ld_logger->error("{}() Writing thread {} did not write anything. NO ACTION WAS DONE", __func__, i); } else write_size += *thread_ret_size; ABT_eventual_free(&eventuals[i]); } return write_size; return ret; // return written size or -1 as error } ssize_t adafs_pread_ws(int fd, void* buf, size_t count, off64_t offset) { init_ld_env_if_needed(); auto adafs_fd = file_map.get(fd); auto path = make_shared<string>(adafs_fd->path()); auto read_size = static_cast<size_t>(0); auto err = 0; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number auto chnk_end = (offset + count) / CHUNKSIZE + 1; // last chunk number (right-open) [chnk_start,chnk_end) if ((offset + count) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(*path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // Create an Argobots thread per destination, fill an appropriate struct with its destination chunk ids auto dest_n = dest_idx.size(); vector<ABT_eventual> eventuals(dest_n); vector<unique_ptr<struct read_args>> thread_args(dest_n); for (uint64_t i = 0; i < dest_n; i++) { ABT_eventual_create(sizeof(size_t), &eventuals[i]); auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + count) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + count) % CHUNKSIZE)); auto args = make_unique<read_args>(); args->path = path; args->total_chunk_size = total_chunk_size; args->in_size = count;// total size to read args->in_offset = offset % CHUNKSIZE;// reading offset only for the first chunk args->buf = buf; args->chnk_start = chnk_start; args->chnk_end = chnk_end; args->chnk_ids = &dest_ids[dest_idx[i]]; // pointer to list of chunk ids that all go to the same destination args->recipient = dest_idx[i];// recipient args->eventual = eventuals[i];// pointer to an eventual which has allocated memory for storing the written size thread_args[i] = std::move(args); // Threads are implicitly released once calling function finishes ABT_thread_create(io_pool, rpc_send_read_abt, &(*thread_args[i]), ABT_THREAD_ATTR_NULL, nullptr); } // Sum read sizes for (uint64_t i = 0; i < dest_n; i++) { size_t* thread_ret_size; ABT_eventual_wait(eventuals[i], (void**) &thread_ret_size); if (thread_ret_size == nullptr || *thread_ret_size == 0) { err = -1; ld_logger->error("{}() Reading thread {} did not read anything. NO ACTION WAS DONE", __func__, i); } else read_size += *thread_ret_size; ABT_eventual_free(&eventuals[i]); auto ret = rpc_send_read(*path, buf, offset, count); if (ret < 0) { ld_logger->warn("{}() rpc_send_read failed with ret {}", __func__, ret); } // XXX check how much we need to deal with the read_size // XXX check that we don't try to read past end of the file return err == 0 ? read_size : 0; return ret; // return read size or -1 as error } No newline at end of file
ifs/src/preload/rpc/ld_rpc_data_ws.cpp +239 −0 Original line number Diff line number Diff line #include <preload/rpc/ld_rpc_data_ws.hpp> #include <global/rpc/rpc_utils.hpp> using namespace std; // TODO If we decide to keep this functionality with one segment, the function can be merged mostly. // Code is mostly redundant /** * Sends an RPC request to a specific node to pull all chunks that belong to him */ ssize_t rpc_send_write(const string& path, const void* buf, const bool append_flag, const off64_t in_offset, const size_t write_size, const int64_t updated_metadentry_size) { // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks off64_t offset = in_offset; if (append_flag) offset = updated_metadentry_size - write_size; auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number // last chunk number (right-open) [chnk_start,chnk_end) auto chnk_end = static_cast<uint64_t>((offset + write_size) / CHUNKSIZE + 1); if ((offset + write_size) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = const_cast<void*>(buf); hg_bulk_t ipc_bulk_handle = nullptr; hg_bulk_t rpc_bulk_handle = nullptr; auto size = make_shared<size_t>(write_size); auto ret = margo_bulk_create(ld_margo_rpc_id, 1, &bulk_buf, size.get(), HG_BULK_READ_ONLY, &rpc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } ret = margo_bulk_create(ld_margo_ipc_id, 1, &bulk_buf, size.get(), HG_BULK_READ_ONLY, &ipc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + write_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + write_size) % CHUNKSIZE)); // RPC hg_addr_t svr_addr = HG_ADDR_NULL; auto chnk_ids = &dest_ids[dest_idx[i]]; rpc_write_data_in_t in{}; // fill in in.path = path.c_str(); in.offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset in.chunk_n = chnk_ids->size(); // number of chunks handled by that destination in.chunk_start = chnk_start; // chunk start id of this write in.chunk_end = chnk_end; // chunk end id of this write in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_write_data_id, rpc_write_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return -1; } } // Wait for RPC responses and then get response and add it to out_size which is the written size // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return (err < 0) ? err : out_size; } /** * Sends an RPC request to a specific node to push all chunks that belong to him */ ssize_t rpc_send_read(const string& path, void* buf, const off64_t offset, const size_t read_size) { // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks auto chnk_start = static_cast<uint64_t>(offset) / CHUNKSIZE; // first chunk number // last chunk number (right-open) [chnk_start,chnk_end) auto chnk_end = static_cast<uint64_t>((offset + read_size) / CHUNKSIZE + 1); if ((offset + read_size) % CHUNKSIZE == 0) chnk_end--; // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer map<uint64_t, vector<uint64_t>> dest_ids{}; // contains the recipient ids, used to access the dest_ids map. First idx is chunk with potential offset vector<uint64_t> dest_idx{}; for (uint64_t i = chnk_start; i < chnk_end; i++) { auto recipient = adafs_hash_path_chunk(path, i, fs_config->host_size); if (dest_ids.count(recipient) == 0) { dest_ids.insert(make_pair(recipient, vector<uint64_t>{i})); dest_idx.push_back(recipient); } else dest_ids[recipient].push_back(i); } // some helper variables for async RPC auto dest_n = dest_idx.size(); vector<hg_handle_t> rpc_handles(dest_n); vector<margo_request> rpc_waiters(dest_n); // register local target buffer for bulk access for IPC and RPC margo instance auto bulk_buf = buf; hg_bulk_t ipc_bulk_handle = nullptr; hg_bulk_t rpc_bulk_handle = nullptr; auto size = make_shared<size_t>(read_size); auto ret = margo_bulk_create(ld_margo_rpc_id, 1, &bulk_buf, size.get(), HG_BULK_WRITE_ONLY, &rpc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } ret = margo_bulk_create(ld_margo_ipc_id, 1, &bulk_buf, size.get(), HG_BULK_WRITE_ONLY, &ipc_bulk_handle); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to create rpc bulk handle", __func__); errno = EBUSY; return -1; } // Issue non-blocking RPC requests and wait for the result later for (uint64_t i = 0; i < dest_n; i++) { auto total_chunk_size = dest_ids[dest_idx[i]].size() * CHUNKSIZE; if (i == 0) // receiver of first chunk must subtract the offset from first chunk total_chunk_size -= (offset % CHUNKSIZE); if (i == dest_n - 1 && ((offset + read_size) % CHUNKSIZE) != 0) // receiver of last chunk must subtract total_chunk_size -= (CHUNKSIZE - ((offset + read_size) % CHUNKSIZE)); // RPC hg_addr_t svr_addr = HG_ADDR_NULL; auto chnk_ids = &dest_ids[dest_idx[i]]; rpc_read_data_in_t in{}; // fill in in.path = path.c_str(); in.offset = offset % CHUNKSIZE;// first offset in dest_idx is the chunk with a potential offset in.chunk_n = chnk_ids->size(); // number of chunks handled by that destination in.chunk_start = chnk_start; // chunk start id of this write in.chunk_end = chnk_end; // chunk end id of this write in.total_chunk_size = total_chunk_size; // total size to write in.bulk_handle = (dest_idx[i] == fs_config->host_id) ? ipc_bulk_handle : rpc_bulk_handle; // TODO remove svr_addr from this call margo_create_wrap(ipc_read_data_id, rpc_read_data_id, dest_idx[i], rpc_handles[i], svr_addr, false); ret = margo_iforward(rpc_handles[i], &in, &rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; for (uint64_t j = 0; j < i + 1; j++) { margo_destroy(rpc_handles[j]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return -1; } } // Wait for RPC responses and then get response and add it to out_size which is the read size // All potential outputs are served to free resources regardless of errors, although an errorcode is set. ssize_t out_size = 0; ssize_t err = 0; for (uint64_t i = 0; i < dest_n; i++) { // XXX We might need a timeout here to not wait forever for an output that never comes? ret = margo_wait(rpc_waiters[i]); if (ret != HG_SUCCESS) { ld_logger->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, dest_idx[i]); errno = EBUSY; err = -1; } // decode response rpc_data_out_t out{}; ret = margo_get_output(rpc_handles[i], &out); if (ret != HG_SUCCESS) { ld_logger->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, dest_idx[i]); err = -1; } ld_logger->debug("{}() Got response {}", __func__, out.res); if (out.res != 0) errno = out.res; out_size += static_cast<size_t>(out.io_size); margo_free_output(rpc_handles[i], &out); margo_destroy(rpc_handles[i]); } // free bulk handles for buffer margo_bulk_free(rpc_bulk_handle); margo_bulk_free(ipc_bulk_handle); return (err < 0) ? err : out_size; } /** * Called by an argobots thread in pwrite() and sends all chunks that go to the same destination at once * @param _arg <struct write_args*> Loading