Loading include/daemon/handler/rpc_util.hpp +40 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,8 @@ extern "C" { namespace gkfs::rpc { // Convenience methods for Margo RPC handlers (server side) template <typename InputType, typename OutputType> inline hg_return_t cleanup(hg_handle_t* handle, InputType* input, OutputType* output, Loading Loading @@ -95,6 +97,44 @@ cleanup_respond(hg_handle_t* handle, OutputType* output) { return ret; } // Convenience methods for Margo RPC client template <typename OutputType> inline hg_return_t margo_client_cleanup(hg_handle_t* handle, OutputType* output, margo_instance_id* mid, hg_addr_t* addr, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if(bulk_handle) { ret = margo_bulk_free(*bulk_handle); if(ret != HG_SUCCESS) return ret; } if(output && handle) { ret = margo_free_output(*handle, output); if(ret != HG_SUCCESS) return ret; } if(mid && addr) { ret = margo_addr_free(*mid, *addr); if(ret != HG_SUCCESS) return ret; } if(handle) { ret = margo_destroy(*handle); if(ret != HG_SUCCESS) return ret; } return ret; } template <typename OutputType> inline hg_return_t margo_client_cleanup(hg_handle_t* handle, OutputType* output, margo_instance_id* mid, hg_addr_t* addr) { return margo_client_cleanup(handle, output, mid, addr, nullptr); } } // namespace gkfs::rpc Loading include/daemon/relocation/config_manager.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ std::vector<std::pair<std::string, std::string>> read_hosts_file(); void test_d2d_rpc(); do_relocation(); } // namespace gkfs::relocation Loading include/daemon/relocation/transmitter.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ namespace gkfs::relocation { void transmit_metadata(gkfs::rpc::host_t localhost); transmit_metadata_and_data(gkfs::rpc::host_t localhost); } // namespace gkfs::relocation Loading src/daemon/daemon.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -429,7 +429,7 @@ parse_input(const po::variables_map& vm) { if(gkfs::config::dynamic_placement && vm.count("start-relocation")) { cout << "Starting relocation...\n"; gkfs::relocation::test_d2d_rpc(); gkfs::relocation::do_relocation(); // Don't create any directories and no interest in the other options exit(0); } Loading src/daemon/handler/srv_management.cpp +0 −30 Original line number Diff line number Diff line Loading @@ -14,12 +14,8 @@ #include <daemon/daemon.hpp> #include <daemon/handler/rpc_defs.hpp> #include <daemon/handler/rpc_util.hpp> #include <daemon/relocation/transmitter.hpp> #include <global/rpc/distributor.hpp> #include <global/rpc/rpc_types.hpp> #include <iostream> using namespace std; Loading Loading @@ -59,32 +55,6 @@ rpc_srv_get_fs_config(hg_handle_t handle) { return HG_SUCCESS; } hg_return_t rpc_srv_relocation_start(hg_handle_t handle) { cout << "TODO(dauer) relocation start received\n"; rpc_relocation_start_in_t in{}; rpc_err_out_t out{}; // out.err = EIO; // Getting some information from margo auto ret = margo_get_input(handle, &in); if(ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Could not get RPC input data with err {}", __func__, ret); return gkfs::rpc::cleanup_respond(&handle, &in, &out); } // GKFS_DATA->mdb()->print_all(); auto const host_id = in.host_id; gkfs::relocation::transmit_metadata(host_id); out.err = 0; // TODO(dauer) return gkfs::rpc::cleanup_respond(&handle, &in, &out); } } // namespace DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_fs_config) DEFINE_MARGO_RPC_HANDLER(rpc_srv_relocation_start) Loading
include/daemon/handler/rpc_util.hpp +40 −0 Original line number Diff line number Diff line Loading @@ -24,6 +24,8 @@ extern "C" { namespace gkfs::rpc { // Convenience methods for Margo RPC handlers (server side) template <typename InputType, typename OutputType> inline hg_return_t cleanup(hg_handle_t* handle, InputType* input, OutputType* output, Loading Loading @@ -95,6 +97,44 @@ cleanup_respond(hg_handle_t* handle, OutputType* output) { return ret; } // Convenience methods for Margo RPC client template <typename OutputType> inline hg_return_t margo_client_cleanup(hg_handle_t* handle, OutputType* output, margo_instance_id* mid, hg_addr_t* addr, hg_bulk_t* bulk_handle) { auto ret = HG_SUCCESS; if(bulk_handle) { ret = margo_bulk_free(*bulk_handle); if(ret != HG_SUCCESS) return ret; } if(output && handle) { ret = margo_free_output(*handle, output); if(ret != HG_SUCCESS) return ret; } if(mid && addr) { ret = margo_addr_free(*mid, *addr); if(ret != HG_SUCCESS) return ret; } if(handle) { ret = margo_destroy(*handle); if(ret != HG_SUCCESS) return ret; } return ret; } template <typename OutputType> inline hg_return_t margo_client_cleanup(hg_handle_t* handle, OutputType* output, margo_instance_id* mid, hg_addr_t* addr) { return margo_client_cleanup(handle, output, mid, addr, nullptr); } } // namespace gkfs::rpc Loading
include/daemon/relocation/config_manager.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -24,7 +24,7 @@ std::vector<std::pair<std::string, std::string>> read_hosts_file(); void test_d2d_rpc(); do_relocation(); } // namespace gkfs::relocation Loading
include/daemon/relocation/transmitter.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -19,7 +19,7 @@ namespace gkfs::relocation { void transmit_metadata(gkfs::rpc::host_t localhost); transmit_metadata_and_data(gkfs::rpc::host_t localhost); } // namespace gkfs::relocation Loading
src/daemon/daemon.cpp +1 −1 Original line number Diff line number Diff line Loading @@ -429,7 +429,7 @@ parse_input(const po::variables_map& vm) { if(gkfs::config::dynamic_placement && vm.count("start-relocation")) { cout << "Starting relocation...\n"; gkfs::relocation::test_d2d_rpc(); gkfs::relocation::do_relocation(); // Don't create any directories and no interest in the other options exit(0); } Loading
src/daemon/handler/srv_management.cpp +0 −30 Original line number Diff line number Diff line Loading @@ -14,12 +14,8 @@ #include <daemon/daemon.hpp> #include <daemon/handler/rpc_defs.hpp> #include <daemon/handler/rpc_util.hpp> #include <daemon/relocation/transmitter.hpp> #include <global/rpc/distributor.hpp> #include <global/rpc/rpc_types.hpp> #include <iostream> using namespace std; Loading Loading @@ -59,32 +55,6 @@ rpc_srv_get_fs_config(hg_handle_t handle) { return HG_SUCCESS; } hg_return_t rpc_srv_relocation_start(hg_handle_t handle) { cout << "TODO(dauer) relocation start received\n"; rpc_relocation_start_in_t in{}; rpc_err_out_t out{}; // out.err = EIO; // Getting some information from margo auto ret = margo_get_input(handle, &in); if(ret != HG_SUCCESS) { GKFS_DATA->spdlogger()->error( "{}() Could not get RPC input data with err {}", __func__, ret); return gkfs::rpc::cleanup_respond(&handle, &in, &out); } // GKFS_DATA->mdb()->print_all(); auto const host_id = in.host_id; gkfs::relocation::transmit_metadata(host_id); out.err = 0; // TODO(dauer) return gkfs::rpc::cleanup_respond(&handle, &in, &out); } } // namespace DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_fs_config) DEFINE_MARGO_RPC_HANDLER(rpc_srv_relocation_start)