Loading src/lib/detail/impl.cpp +21 −30 Original line number Diff line number Diff line Loading @@ -298,45 +298,36 @@ update_job(const server& srv, const job& job, admire::error_code remove_job(const server& srv, const job& job) { (void) srv; (void) job; return admire::error_code::snafu; #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}}}", "body: {{job_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job); std::quoted(rpc_client.self_address()), job.id()); const auto rpc_job = api::convert(job); ADM_remove_job_in_t in{rpc_job.get()}; ADM_remove_job_out_t out; if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id()); call_rv.has_value()) { const auto rpc = endp.call("ADM_remove_job", &in, &out); const scord::network::generic_response resp{call_rv.value()}; if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return rv; std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, out.op_id); return admire::error_code::success; #endif LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } tl::expected<admire::adhoc_storage, admire::error_code> Loading src/scord/rpc_handlers.cpp +43 −63 Original line number Diff line number Diff line Loading @@ -157,6 +157,49 @@ update_job(const request& req, admire::job_id job_id, req.respond(resp); } void remove_job(const request& req, admire::job_id job_id) { using scord::network::get_address; const auto rpc_name = "ADM_"s + __FUNCTION__; const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job_id: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), job_id); admire::error_code ec; auto& jm = scord::job_manager::instance(); const auto jm_result = jm.remove(job_id); if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client const auto& job_info = jm_result.value(); if(const auto adhoc_storage = job_info->requirements()->adhoc_storage(); adhoc_storage.has_value()) { auto& adhoc_manager = scord::adhoc_storage_manager::instance(); ec = adhoc_manager.remove_client_info(adhoc_storage->id()); } } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, job_id); ec = jm_result.error(); } const auto resp = generic_response{rpc_id, ec}; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), ec); req.respond(resp); } void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, Loading Loading @@ -364,69 +407,6 @@ deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) { } // namespace scord::network::handlers static void ADM_remove_job(hg_handle_t h) { using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; ADM_remove_job_in_t in; ADM_remove_job_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); const admire::job job(in.job); const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job); admire::error_code ec; auto& jm = scord::job_manager::instance(); const auto jm_result = jm.remove(job.id()); if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client const auto& job_info = jm_result.value(); if(const auto adhoc_storage = job_info->requirements()->adhoc_storage(); adhoc_storage.has_value()) { auto& adhoc_manager = scord::adhoc_storage_manager::instance(); ec = adhoc_manager.remove_client_info(adhoc_storage->id()); } } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, job.id()); ec = jm_result.error(); } out.op_id = rpc_id; out.retval = ec; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); ret = margo_free_input(h, &in); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); } DEFINE_MARGO_RPC_HANDLER(ADM_remove_job); static void ADM_register_pfs_storage(hg_handle_t h) { Loading src/scord/rpc_handlers.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -56,6 +56,9 @@ void update_job(const request& req, admire::job_id job_id, const admire::job::resources& new_resources); void remove_job(const request& req, admire::job_id job_id); } // namespace scord::network::handlers #include <margo.h> Loading @@ -67,9 +70,6 @@ extern "C" { // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { /// ADM_remove_job DECLARE_MARGO_RPC_HANDLER(ADM_remove_job); /// ADM_register_pfs_storage DECLARE_MARGO_RPC_HANDLER(ADM_register_pfs_storage); Loading src/scord/scord.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -194,6 +194,8 @@ main(int argc, char* argv[]) { scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); daemon.set_handler("ADM_remove_job"s, scord::network::handlers::remove_job); #if 0 const auto rpc_registration_cb = [](auto&& ctx) { Loading Loading
src/lib/detail/impl.cpp +21 −30 Original line number Diff line number Diff line Loading @@ -298,45 +298,36 @@ update_job(const server& srv, const job& job, admire::error_code remove_job(const server& srv, const job& job) { (void) srv; (void) job; return admire::error_code::snafu; #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}}}", "body: {{job_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job); std::quoted(rpc_client.self_address()), job.id()); const auto rpc_job = api::convert(job); ADM_remove_job_in_t in{rpc_job.get()}; ADM_remove_job_out_t out; if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id()); call_rv.has_value()) { const auto rpc = endp.call("ADM_remove_job", &in, &out); const scord::network::generic_response resp{call_rv.value()}; if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return rv; std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, out.op_id); return admire::error_code::success; #endif LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } tl::expected<admire::adhoc_storage, admire::error_code> Loading
src/scord/rpc_handlers.cpp +43 −63 Original line number Diff line number Diff line Loading @@ -157,6 +157,49 @@ update_job(const request& req, admire::job_id job_id, req.respond(resp); } void remove_job(const request& req, admire::job_id job_id) { using scord::network::get_address; const auto rpc_name = "ADM_"s + __FUNCTION__; const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job_id: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), job_id); admire::error_code ec; auto& jm = scord::job_manager::instance(); const auto jm_result = jm.remove(job_id); if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client const auto& job_info = jm_result.value(); if(const auto adhoc_storage = job_info->requirements()->adhoc_storage(); adhoc_storage.has_value()) { auto& adhoc_manager = scord::adhoc_storage_manager::instance(); ec = adhoc_manager.remove_client_info(adhoc_storage->id()); } } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, job_id); ec = jm_result.error(); } const auto resp = generic_response{rpc_id, ec}; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), ec); req.respond(resp); } void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, Loading Loading @@ -364,69 +407,6 @@ deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) { } // namespace scord::network::handlers static void ADM_remove_job(hg_handle_t h) { using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; ADM_remove_job_in_t in; ADM_remove_job_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); const admire::job job(in.job); const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job); admire::error_code ec; auto& jm = scord::job_manager::instance(); const auto jm_result = jm.remove(job.id()); if(jm_result) { // if the job was using an adhoc storage instance, inform the // appropriate adhoc_storage that the job is no longer its client const auto& job_info = jm_result.value(); if(const auto adhoc_storage = job_info->requirements()->adhoc_storage(); adhoc_storage.has_value()) { auto& adhoc_manager = scord::adhoc_storage_manager::instance(); ec = adhoc_manager.remove_client_info(adhoc_storage->id()); } } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, job.id()); ec = jm_result.error(); } out.op_id = rpc_id; out.retval = ec; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); ret = margo_free_input(h, &in); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); } DEFINE_MARGO_RPC_HANDLER(ADM_remove_job); static void ADM_register_pfs_storage(hg_handle_t h) { Loading
src/scord/rpc_handlers.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -56,6 +56,9 @@ void update_job(const request& req, admire::job_id job_id, const admire::job::resources& new_resources); void remove_job(const request& req, admire::job_id job_id); } // namespace scord::network::handlers #include <margo.h> Loading @@ -67,9 +70,6 @@ extern "C" { // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { /// ADM_remove_job DECLARE_MARGO_RPC_HANDLER(ADM_remove_job); /// ADM_register_pfs_storage DECLARE_MARGO_RPC_HANDLER(ADM_register_pfs_storage); Loading
src/scord/scord.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -194,6 +194,8 @@ main(int argc, char* argv[]) { scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); daemon.set_handler("ADM_remove_job"s, scord::network::handlers::remove_job); #if 0 const auto rpc_registration_cb = [](auto&& ctx) { Loading