Loading include/proxy/proxy_data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ struct margo_client_ids { hg_id_t rpc_update_metadentry_size_id; hg_id_t rpc_write_id; hg_id_t rpc_read_id; hg_id_t rpc_truncate_id; hg_id_t rpc_chunk_stat_id; hg_id_t rpc_get_dirents_extended_id; }; Loading include/proxy/rpc/forward_data.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,9 @@ std::pair<int, ssize_t> forward_read(const std::string& path, void* buf, int64_t offset, size_t read_size); int forward_truncate(const std::string& path, size_t current_size, size_t new_size); std::pair<int, ChunkStat> forward_get_chunk_stat(); Loading src/daemon/daemon.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -258,6 +258,8 @@ register_proxy_server_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended, rpc_get_dirents_in_t, rpc_get_dirents_out_t, rpc_srv_get_dirents_extended); MARGO_REGISTER(mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t, rpc_srv_truncate); // proxy daemon specific RPCs MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_daemon_write, rpc_proxy_daemon_write_in_t, rpc_data_out_t, Loading src/proxy/proxy.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -132,6 +132,8 @@ register_client_rpcs(margo_instance_id mid) { PROXY_DATA->rpc_client_ids().rpc_read_id = MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_daemon_read, rpc_proxy_daemon_read_in_t, rpc_data_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_truncate_id = MARGO_REGISTER( mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_chunk_stat_id = MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t, rpc_chunk_stat_out_t, NULL); Loading src/proxy/rpc/forward_data.cpp +103 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include <common/rpc/distributor.hpp> #include <map> #include <unordered_set> using namespace std; Loading Loading @@ -343,6 +344,108 @@ forward_read(const std::string& path, void* buf, const int64_t offset, return ::make_pair(err, out_size); } int forward_truncate(const std::string& path, size_t current_size, size_t new_size) { rpc_trunc_in_t daemon_in{}; rpc_err_out_t daemon_out{}; hg_return_t ret{}; bool err = false; // fill in daemon_in.path = path.c_str(); daemon_in.length = new_size; // import pow2-optimized arithmetic functions using namespace gkfs::utils::arithmetic; // Find out which data servers need to delete data chunks in order to // contact only them const unsigned int chunk_start = block_index(new_size, gkfs::config::rpc::chunksize); const unsigned int chunk_end = block_index(current_size - new_size - 1, gkfs::config::rpc::chunksize); std::unordered_set<unsigned int> hosts; for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end; ++chunk_id) { hosts.insert(PROXY_DATA->distributor()->locate_data(path, chunk_id, 0)); } // some helper variables for async RPC vector<hg_handle_t> rpc_handles(hosts.size()); vector<margo_request> rpc_waiters(hosts.size()); unsigned int req_num = 0; // Issue non-blocking RPC requests and wait for the result later for(const auto& host : hosts) { ret = margo_create(PROXY_DATA->client_rpc_mid(), PROXY_DATA->rpc_endpoints().at(host), PROXY_DATA->rpc_client_ids().rpc_truncate_id, &rpc_handles[req_num]); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to create Mercury handle for host: ", __func__, host); break; } // Send RPC ret = margo_iforward(rpc_handles[req_num], &daemon_in, &rpc_waiters[req_num]); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, host); break; } req_num++; } if(req_num < hosts.size()) { // An error occurred. Cleanup and return PROXY_DATA->log()->error( "{}() Error -> sent only some requests {}/{}. Cancelling request...", __func__, req_num, hosts.size()); for(unsigned int i = 0; i < req_num; ++i) { margo_destroy(rpc_handles[i]); } // TODO Ideally wait for dangling responses return EIO; } // Wait for RPC responses and then get response for(unsigned int i = 0; i < hosts.size(); ++i) { ret = margo_wait(rpc_waiters[i]); if(ret == HG_SUCCESS) { ret = margo_get_output(rpc_handles[i], &daemon_out); if(ret == HG_SUCCESS) { if(daemon_out.err) { PROXY_DATA->log()->error("{}() received error response: {}", __func__, daemon_out.err); err = true; } } else { // Get output failed PROXY_DATA->log()->error("{}() while getting rpc output", __func__); err = true; } } else { // Wait failed PROXY_DATA->log()->error("{}() Failed while waiting for response", __func__); err = true; } /* clean up resources consumed by this rpc */ margo_free_output(rpc_handles[i], &daemon_out); margo_destroy(rpc_handles[i]); } if(err) { errno = EBUSY; return -1; } return 0; } pair<int, ChunkStat> forward_get_chunk_stat() { int err = 0; Loading Loading
include/proxy/proxy_data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -32,6 +32,7 @@ struct margo_client_ids { hg_id_t rpc_update_metadentry_size_id; hg_id_t rpc_write_id; hg_id_t rpc_read_id; hg_id_t rpc_truncate_id; hg_id_t rpc_chunk_stat_id; hg_id_t rpc_get_dirents_extended_id; }; Loading
include/proxy/rpc/forward_data.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -27,6 +27,9 @@ std::pair<int, ssize_t> forward_read(const std::string& path, void* buf, int64_t offset, size_t read_size); int forward_truncate(const std::string& path, size_t current_size, size_t new_size); std::pair<int, ChunkStat> forward_get_chunk_stat(); Loading
src/daemon/daemon.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -258,6 +258,8 @@ register_proxy_server_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended, rpc_get_dirents_in_t, rpc_get_dirents_out_t, rpc_srv_get_dirents_extended); MARGO_REGISTER(mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t, rpc_srv_truncate); // proxy daemon specific RPCs MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_daemon_write, rpc_proxy_daemon_write_in_t, rpc_data_out_t, Loading
src/proxy/proxy.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -132,6 +132,8 @@ register_client_rpcs(margo_instance_id mid) { PROXY_DATA->rpc_client_ids().rpc_read_id = MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_daemon_read, rpc_proxy_daemon_read_in_t, rpc_data_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_truncate_id = MARGO_REGISTER( mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_chunk_stat_id = MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t, rpc_chunk_stat_out_t, NULL); Loading
src/proxy/rpc/forward_data.cpp +103 −0 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include <common/rpc/distributor.hpp> #include <map> #include <unordered_set> using namespace std; Loading Loading @@ -343,6 +344,108 @@ forward_read(const std::string& path, void* buf, const int64_t offset, return ::make_pair(err, out_size); } int forward_truncate(const std::string& path, size_t current_size, size_t new_size) { rpc_trunc_in_t daemon_in{}; rpc_err_out_t daemon_out{}; hg_return_t ret{}; bool err = false; // fill in daemon_in.path = path.c_str(); daemon_in.length = new_size; // import pow2-optimized arithmetic functions using namespace gkfs::utils::arithmetic; // Find out which data servers need to delete data chunks in order to // contact only them const unsigned int chunk_start = block_index(new_size, gkfs::config::rpc::chunksize); const unsigned int chunk_end = block_index(current_size - new_size - 1, gkfs::config::rpc::chunksize); std::unordered_set<unsigned int> hosts; for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end; ++chunk_id) { hosts.insert(PROXY_DATA->distributor()->locate_data(path, chunk_id, 0)); } // some helper variables for async RPC vector<hg_handle_t> rpc_handles(hosts.size()); vector<margo_request> rpc_waiters(hosts.size()); unsigned int req_num = 0; // Issue non-blocking RPC requests and wait for the result later for(const auto& host : hosts) { ret = margo_create(PROXY_DATA->client_rpc_mid(), PROXY_DATA->rpc_endpoints().at(host), PROXY_DATA->rpc_client_ids().rpc_truncate_id, &rpc_handles[req_num]); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to create Mercury handle for host: ", __func__, host); break; } // Send RPC ret = margo_iforward(rpc_handles[req_num], &daemon_in, &rpc_waiters[req_num]); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, host); break; } req_num++; } if(req_num < hosts.size()) { // An error occurred. Cleanup and return PROXY_DATA->log()->error( "{}() Error -> sent only some requests {}/{}. Cancelling request...", __func__, req_num, hosts.size()); for(unsigned int i = 0; i < req_num; ++i) { margo_destroy(rpc_handles[i]); } // TODO Ideally wait for dangling responses return EIO; } // Wait for RPC responses and then get response for(unsigned int i = 0; i < hosts.size(); ++i) { ret = margo_wait(rpc_waiters[i]); if(ret == HG_SUCCESS) { ret = margo_get_output(rpc_handles[i], &daemon_out); if(ret == HG_SUCCESS) { if(daemon_out.err) { PROXY_DATA->log()->error("{}() received error response: {}", __func__, daemon_out.err); err = true; } } else { // Get output failed PROXY_DATA->log()->error("{}() while getting rpc output", __func__); err = true; } } else { // Wait failed PROXY_DATA->log()->error("{}() Failed while waiting for response", __func__); err = true; } /* clean up resources consumed by this rpc */ margo_free_output(rpc_handles[i], &daemon_out); margo_destroy(rpc_handles[i]); } if(err) { errno = EBUSY; return -1; } return 0; } pair<int, ChunkStat> forward_get_chunk_stat() { int err = 0; Loading