Loading include/proxy/proxy_data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ struct margo_client_ids { hg_id_t rpc_write_id; hg_id_t rpc_read_id; hg_id_t rpc_chunk_stat_id; hg_id_t rpc_get_dirents_extended_id; }; class ProxyData { Loading include/proxy/rpc/forward_metadata.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,10 @@ std::pair<int, off64_t> forward_update_metadentry_size(const std::string& path, const size_t size, const off64_t offset, const bool append_flag); std::pair<int, size_t> forward_get_dirents_single(const std::string& path, int server, void* buf, const size_t bulk_size); } // namespace gkfs::rpc Loading include/proxy/rpc/rpc_defs.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -35,4 +35,6 @@ DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_write) DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_chunk_stat) DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_get_dirents_extended) #endif // GKFS_PROXY_RPC_DEFS_HPP src/proxy/proxy.cpp +6 −0 Original line number Diff line number Diff line Loading @@ -55,6 +55,9 @@ register_server_ipcs(margo_instance_id mid) { rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t, proxy_rpc_srv_update_metadentry_size) MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_get_dirents_extended, rpc_proxy_get_dirents_in_t, rpc_get_dirents_out_t, proxy_rpc_srv_get_dirents_extended) } void Loading Loading @@ -141,6 +144,9 @@ register_client_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size, rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_get_dirents_extended_id = MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended, rpc_get_dirents_in_t, rpc_get_dirents_out_t, NULL); } void Loading src/proxy/rpc/forward_metadata.cpp +71 −0 Original line number Diff line number Diff line Loading @@ -299,4 +299,75 @@ forward_update_metadentry_size(const string& path, const size_t size, return make_pair(err, ret_size); } pair<int, size_t> forward_get_dirents_single(const std::string& path, int server, void* buf, size_t bulk_size) { hg_bulk_t bulk_handle = nullptr; hg_handle_t rpc_handle = nullptr; rpc_get_dirents_in_t daemon_in{}; // register local target buffer for bulk access auto* bulk_buf = buf; auto size = make_shared<size_t>(bulk_size); // XXX Why shared ptr? auto ret = margo_bulk_create(PROXY_DATA->client_rpc_mid(), 1, &bulk_buf, size.get(), HG_BULK_WRITE_ONLY, &bulk_handle); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error("{}() Failed to create rpc bulk handle", __func__); return ::make_pair(EBUSY, 0); } daemon_in.path = path.c_str(); daemon_in.bulk_handle = bulk_handle; auto* endp = PROXY_DATA->rpc_endpoints().at(server); ret = margo_create(PROXY_DATA->client_rpc_mid(), endp, PROXY_DATA->rpc_client_ids().rpc_get_dirents_extended_id, &rpc_handle); if(ret != HG_SUCCESS) { margo_destroy(rpc_handle); margo_bulk_free(bulk_handle); return ::make_pair(EBUSY, 0); } // Send RPC margo_request rpc_waiter{}; ret = margo_iforward(rpc_handle, &daemon_in, &rpc_waiter); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, target); margo_destroy(rpc_handle); margo_bulk_free(bulk_handle); return ::make_pair(EBUSY, 0); } PROXY_DATA->log()->debug("{}() 1 RPC sent, waiting for reply ...", __func__); int err = 0; size_t dirents_size = 0; ret = margo_wait(rpc_waiter); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, server); err = EBUSY; } // decode response rpc_get_dirents_out_t daemon_out{}; ret = margo_get_output(rpc_handle, &daemon_out); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Failed to get rpc output for path {} recipient {}", __func__, path, server); err = EBUSY; } PROXY_DATA->log()->debug( "{}() Got response from target '{}': err '{}' with dirent_size '{}'", __func__, server, daemon_out.err, daemon_out.dirents_size); if(daemon_out.err != 0) err = daemon_out.err; else dirents_size = daemon_out.dirents_size; margo_free_output(rpc_handle, &daemon_out); margo_destroy(rpc_handle); margo_bulk_free(bulk_handle); return ::make_pair(err, dirents_size); } } // namespace gkfs::rpc Loading
include/proxy/proxy_data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -33,6 +33,7 @@ struct margo_client_ids { hg_id_t rpc_write_id; hg_id_t rpc_read_id; hg_id_t rpc_chunk_stat_id; hg_id_t rpc_get_dirents_extended_id; }; class ProxyData { Loading
include/proxy/rpc/forward_metadata.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -31,6 +31,10 @@ std::pair<int, off64_t> forward_update_metadentry_size(const std::string& path, const size_t size, const off64_t offset, const bool append_flag); std::pair<int, size_t> forward_get_dirents_single(const std::string& path, int server, void* buf, const size_t bulk_size); } // namespace gkfs::rpc Loading
include/proxy/rpc/rpc_defs.hpp +2 −0 Original line number Diff line number Diff line Loading @@ -35,4 +35,6 @@ DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_write) DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_chunk_stat) DECLARE_MARGO_RPC_HANDLER(proxy_rpc_srv_get_dirents_extended) #endif // GKFS_PROXY_RPC_DEFS_HPP
src/proxy/proxy.cpp +6 −0 Original line number Diff line number Diff line Loading @@ -55,6 +55,9 @@ register_server_ipcs(margo_instance_id mid) { rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t, proxy_rpc_srv_update_metadentry_size) MARGO_REGISTER(mid, gkfs::rpc::tag::proxy_get_dirents_extended, rpc_proxy_get_dirents_in_t, rpc_get_dirents_out_t, proxy_rpc_srv_get_dirents_extended) } void Loading Loading @@ -141,6 +144,9 @@ register_client_rpcs(margo_instance_id mid) { MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size, rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_get_dirents_extended_id = MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended, rpc_get_dirents_in_t, rpc_get_dirents_out_t, NULL); } void Loading
src/proxy/rpc/forward_metadata.cpp +71 −0 Original line number Diff line number Diff line Loading @@ -299,4 +299,75 @@ forward_update_metadentry_size(const string& path, const size_t size, return make_pair(err, ret_size); } pair<int, size_t> forward_get_dirents_single(const std::string& path, int server, void* buf, size_t bulk_size) { hg_bulk_t bulk_handle = nullptr; hg_handle_t rpc_handle = nullptr; rpc_get_dirents_in_t daemon_in{}; // register local target buffer for bulk access auto* bulk_buf = buf; auto size = make_shared<size_t>(bulk_size); // XXX Why shared ptr? auto ret = margo_bulk_create(PROXY_DATA->client_rpc_mid(), 1, &bulk_buf, size.get(), HG_BULK_WRITE_ONLY, &bulk_handle); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error("{}() Failed to create rpc bulk handle", __func__); return ::make_pair(EBUSY, 0); } daemon_in.path = path.c_str(); daemon_in.bulk_handle = bulk_handle; auto* endp = PROXY_DATA->rpc_endpoints().at(server); ret = margo_create(PROXY_DATA->client_rpc_mid(), endp, PROXY_DATA->rpc_client_ids().rpc_get_dirents_extended_id, &rpc_handle); if(ret != HG_SUCCESS) { margo_destroy(rpc_handle); margo_bulk_free(bulk_handle); return ::make_pair(EBUSY, 0); } // Send RPC margo_request rpc_waiter{}; ret = margo_iforward(rpc_handle, &daemon_in, &rpc_waiter); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path, target); margo_destroy(rpc_handle); margo_bulk_free(bulk_handle); return ::make_pair(EBUSY, 0); } PROXY_DATA->log()->debug("{}() 1 RPC sent, waiting for reply ...", __func__); int err = 0; size_t dirents_size = 0; ret = margo_wait(rpc_waiter); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, server); err = EBUSY; } // decode response rpc_get_dirents_out_t daemon_out{}; ret = margo_get_output(rpc_handle, &daemon_out); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error( "{}() Failed to get rpc output for path {} recipient {}", __func__, path, server); err = EBUSY; } PROXY_DATA->log()->debug( "{}() Got response from target '{}': err '{}' with dirent_size '{}'", __func__, server, daemon_out.err, daemon_out.dirents_size); if(daemon_out.err != 0) err = daemon_out.err; else dirents_size = daemon_out.dirents_size; margo_free_output(rpc_handle, &daemon_out); margo_destroy(rpc_handle); margo_bulk_free(bulk_handle); return ::make_pair(err, dirents_size); } } // namespace gkfs::rpc