Loading include/proxy/proxy_data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ struct margo_client_ids { hg_id_t rpc_create_id; hg_id_t rpc_stat_id; hg_id_t rpc_remove_id; hg_id_t rpc_decr_size_id; hg_id_t rpc_remove_data_id; hg_id_t rpc_update_metadentry_size_id; hg_id_t rpc_write_id; Loading src/daemon/daemon.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -249,6 +249,8 @@ register_proxy_server_rpcs(margo_instance_id mid) { rpc_stat_out_t, rpc_srv_stat); MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t, rpc_rm_metadata_out_t, rpc_srv_remove_metadata); MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t, rpc_err_out_t, rpc_srv_decr_size); MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t, rpc_err_out_t, rpc_srv_remove_data); MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size, Loading src/proxy/proxy.cpp +3 −0 Original line number Diff line number Diff line Loading @@ -147,6 +147,9 @@ register_client_rpcs(margo_instance_id mid) { PROXY_DATA->rpc_client_ids().rpc_remove_id = MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t, rpc_rm_metadata_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_decr_size_id = MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t, rpc_err_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_remove_data_id = MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t, rpc_err_out_t, NULL); Loading src/proxy/rpc/forward_metadata.cpp +42 −1 Original line number Diff line number Diff line Loading @@ -248,7 +248,48 @@ forward_remove(const std::string& path) { int forward_decr_size(const std::string& path, size_t length) { return EIO; hg_handle_t rpc_handle = nullptr; rpc_trunc_in_t daemon_in{}; rpc_err_out_t daemon_out{}; int err = 0; // fill in daemon_in.path = path.c_str(); daemon_in.length = length; // Create handle PROXY_DATA->log()->debug("{}() Creating Margo handle ...", __func__); auto endp = PROXY_DATA->rpc_endpoints().at( PROXY_DATA->distributor()->locate_file_metadata(path, 0)); auto ret = margo_create(PROXY_DATA->client_rpc_mid(), endp, PROXY_DATA->rpc_client_ids().rpc_decr_size_id, &rpc_handle); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error("{}() Critical error", __func__); return EBUSY; } ret = margo_forward(rpc_handle, &daemon_in); if(ret == HG_SUCCESS) { // Get response PROXY_DATA->log()->trace("{}() Waiting for response", __func__); ret = margo_get_output(rpc_handle, &daemon_out); if(ret == HG_SUCCESS) { PROXY_DATA->log()->debug("{}() Got response success: {}", __func__, daemon_out.err); err = daemon_out.err; margo_free_output(rpc_handle, &daemon_out); } else { // something is wrong err = EBUSY; PROXY_DATA->log()->error("{}() while getting rpc output", __func__); } } else { // something is wrong err = EBUSY; PROXY_DATA->log()->error("{}() sending rpc failed", __func__); } /* clean up resources consumed by this rpc */ margo_destroy(rpc_handle); return err; } pair<int, off64_t> Loading Loading
include/proxy/proxy_data.hpp +1 −0 Original line number Diff line number Diff line Loading @@ -28,6 +28,7 @@ struct margo_client_ids { hg_id_t rpc_create_id; hg_id_t rpc_stat_id; hg_id_t rpc_remove_id; hg_id_t rpc_decr_size_id; hg_id_t rpc_remove_data_id; hg_id_t rpc_update_metadentry_size_id; hg_id_t rpc_write_id; Loading
src/daemon/daemon.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -249,6 +249,8 @@ register_proxy_server_rpcs(margo_instance_id mid) { rpc_stat_out_t, rpc_srv_stat); MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t, rpc_rm_metadata_out_t, rpc_srv_remove_metadata); MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t, rpc_err_out_t, rpc_srv_decr_size); MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t, rpc_err_out_t, rpc_srv_remove_data); MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size, Loading
src/proxy/proxy.cpp +3 −0 Original line number Diff line number Diff line Loading @@ -147,6 +147,9 @@ register_client_rpcs(margo_instance_id mid) { PROXY_DATA->rpc_client_ids().rpc_remove_id = MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t, rpc_rm_metadata_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_decr_size_id = MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t, rpc_err_out_t, NULL); PROXY_DATA->rpc_client_ids().rpc_remove_data_id = MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t, rpc_err_out_t, NULL); Loading
src/proxy/rpc/forward_metadata.cpp +42 −1 Original line number Diff line number Diff line Loading @@ -248,7 +248,48 @@ forward_remove(const std::string& path) { int forward_decr_size(const std::string& path, size_t length) { return EIO; hg_handle_t rpc_handle = nullptr; rpc_trunc_in_t daemon_in{}; rpc_err_out_t daemon_out{}; int err = 0; // fill in daemon_in.path = path.c_str(); daemon_in.length = length; // Create handle PROXY_DATA->log()->debug("{}() Creating Margo handle ...", __func__); auto endp = PROXY_DATA->rpc_endpoints().at( PROXY_DATA->distributor()->locate_file_metadata(path, 0)); auto ret = margo_create(PROXY_DATA->client_rpc_mid(), endp, PROXY_DATA->rpc_client_ids().rpc_decr_size_id, &rpc_handle); if(ret != HG_SUCCESS) { PROXY_DATA->log()->error("{}() Critical error", __func__); return EBUSY; } ret = margo_forward(rpc_handle, &daemon_in); if(ret == HG_SUCCESS) { // Get response PROXY_DATA->log()->trace("{}() Waiting for response", __func__); ret = margo_get_output(rpc_handle, &daemon_out); if(ret == HG_SUCCESS) { PROXY_DATA->log()->debug("{}() Got response success: {}", __func__, daemon_out.err); err = daemon_out.err; margo_free_output(rpc_handle, &daemon_out); } else { // something is wrong err = EBUSY; PROXY_DATA->log()->error("{}() while getting rpc output", __func__); } } else { // something is wrong err = EBUSY; PROXY_DATA->log()->error("{}() sending rpc failed", __func__); } /* clean up resources consumed by this rpc */ margo_destroy(rpc_handle); return err; } pair<int, off64_t> Loading