Loading src/lib/detail/impl.cpp +23 −28 Original line number Diff line number Diff line Loading @@ -533,43 +533,38 @@ update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, admire::error_code remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage) { (void) srv; (void) pfs_storage; return admire::error_code::snafu; #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{pfs_storage_id: {}}}", "body: {{pfs_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), pfs_storage.id()); ADM_remove_pfs_storage_in_t in{pfs_storage.id()}; ADM_remove_pfs_storage_out_t out; if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, pfs_storage.id()); call_rv.has_value()) { const auto rpc = endp.call("ADM_remove_pfs_storage", &in, &out); const scord::network::generic_response resp{call_rv.value()}; if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return rv; } std::quoted(endp.address()), resp.error_code(), resp.op_id()); LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, out.op_id); return resp.error_code(); } } return admire::error_code::success; #endif LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } admire::error_code Loading src/scord/rpc_handlers.cpp +14 −34 Original line number Diff line number Diff line Loading @@ -475,58 +475,38 @@ update_pfs_storage(const request& req, std::uint64_t pfs_id, req.respond(resp); } } // namespace scord::network::handlers static void ADM_remove_pfs_storage(hg_handle_t h) { using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; ADM_remove_pfs_storage_in_t in; ADM_remove_pfs_storage_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); void remove_pfs_storage(const request& req, std::uint64_t pfs_id) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); using scord::network::get_address; const auto rpc_name = "ADM_"s + __FUNCTION__; const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{pfs_storage_id: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), in.server_id); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{pfs_id: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), pfs_id); auto& pfs_manager = scord::pfs_storage_manager::instance(); admire::error_code ec = pfs_manager.remove(in.server_id); admire::error_code ec = pfs_manager.remove(pfs_id); if(!ec) { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing pfs storage: {}\"", rpc_id, in.server_id); rpc_id, pfs_id); } out.op_id = rpc_id; out.retval = ec; const auto resp = generic_response{rpc_id, ec}; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); ret = margo_free_input(h, &in); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); req.respond(resp); } DEFINE_MARGO_RPC_HANDLER(ADM_remove_pfs_storage); } // namespace scord::network::handlers /** * Specifes the origin location in a storage tier where input is located, as Loading src/scord/rpc_handlers.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -69,6 +69,9 @@ void update_pfs_storage(const request& req, std::uint64_t pfs_id, const admire::pfs_storage::ctx& new_ctx); void remove_pfs_storage(const request& req, std::uint64_t pfs_id); } // namespace scord::network::handlers #include <margo.h> Loading @@ -80,9 +83,6 @@ extern "C" { // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { /// ADM_remove_pfs_storage DECLARE_MARGO_RPC_HANDLER(ADM_remove_pfs_storage); /// ADM_input DECLARE_MARGO_RPC_HANDLER(ADM_input); Loading src/scord/scord.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -195,6 +195,7 @@ main(int argc, char* argv[]) { daemon.set_handler(EXPAND(deploy_adhoc_storage)); daemon.set_handler(EXPAND(register_pfs_storage)); daemon.set_handler(EXPAND(update_pfs_storage)); daemon.set_handler(EXPAND(remove_pfs_storage)); #undef EXPAND Loading Loading
src/lib/detail/impl.cpp +23 −28 Original line number Diff line number Diff line Loading @@ -533,43 +533,38 @@ update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, admire::error_code remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage) { (void) srv; (void) pfs_storage; return admire::error_code::snafu; #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{pfs_storage_id: {}}}", "body: {{pfs_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), pfs_storage.id()); ADM_remove_pfs_storage_in_t in{pfs_storage.id()}; ADM_remove_pfs_storage_out_t out; if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, pfs_storage.id()); call_rv.has_value()) { const auto rpc = endp.call("ADM_remove_pfs_storage", &in, &out); const scord::network::generic_response resp{call_rv.value()}; if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return rv; } std::quoted(endp.address()), resp.error_code(), resp.op_id()); LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, out.op_id); return resp.error_code(); } } return admire::error_code::success; #endif LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } admire::error_code Loading
src/scord/rpc_handlers.cpp +14 −34 Original line number Diff line number Diff line Loading @@ -475,58 +475,38 @@ update_pfs_storage(const request& req, std::uint64_t pfs_id, req.respond(resp); } } // namespace scord::network::handlers static void ADM_remove_pfs_storage(hg_handle_t h) { using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; ADM_remove_pfs_storage_in_t in; ADM_remove_pfs_storage_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); void remove_pfs_storage(const request& req, std::uint64_t pfs_id) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); using scord::network::get_address; const auto rpc_name = "ADM_"s + __FUNCTION__; const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{pfs_storage_id: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), in.server_id); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{pfs_id: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), pfs_id); auto& pfs_manager = scord::pfs_storage_manager::instance(); admire::error_code ec = pfs_manager.remove(in.server_id); admire::error_code ec = pfs_manager.remove(pfs_id); if(!ec) { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing pfs storage: {}\"", rpc_id, in.server_id); rpc_id, pfs_id); } out.op_id = rpc_id; out.retval = ec; const auto resp = generic_response{rpc_id, ec}; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); ret = margo_free_input(h, &in); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); req.respond(resp); } DEFINE_MARGO_RPC_HANDLER(ADM_remove_pfs_storage); } // namespace scord::network::handlers /** * Specifes the origin location in a storage tier where input is located, as Loading
src/scord/rpc_handlers.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -69,6 +69,9 @@ void update_pfs_storage(const request& req, std::uint64_t pfs_id, const admire::pfs_storage::ctx& new_ctx); void remove_pfs_storage(const request& req, std::uint64_t pfs_id); } // namespace scord::network::handlers #include <margo.h> Loading @@ -80,9 +83,6 @@ extern "C" { // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { /// ADM_remove_pfs_storage DECLARE_MARGO_RPC_HANDLER(ADM_remove_pfs_storage); /// ADM_input DECLARE_MARGO_RPC_HANDLER(ADM_input); Loading
src/scord/scord.cpp +1 −0 Original line number Diff line number Diff line Loading @@ -195,6 +195,7 @@ main(int argc, char* argv[]) { daemon.set_handler(EXPAND(deploy_adhoc_storage)); daemon.set_handler(EXPAND(register_pfs_storage)); daemon.set_handler(EXPAND(update_pfs_storage)); daemon.set_handler(EXPAND(remove_pfs_storage)); #undef EXPAND Loading