Loading src/lib/detail/impl.cpp +24 −32 Original line number Diff line number Diff line Loading @@ -259,48 +259,40 @@ register_job(const server& srv, const job::resources& job_resources, admire::error_code update_job(const server& srv, const job& job, const job::resources& job_resources) { const job::resources& new_resources) { (void) srv; (void) job; (void) job_resources; return admire::error_code::snafu; #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}, job_resources: {}}}", "body: {{job_id: {}, new_resources: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job, job_resources); const auto rpc_job = api::convert(job); const auto rpc_job_resources = api::convert(job_resources); std::quoted(rpc_client.self_address()), job.id(), new_resources); ADM_update_job_in_t in{rpc_job.get(), rpc_job_resources.get()}; ADM_update_job_out_t out; if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id(), new_resources); call_rv.has_value()) { const auto rpc = endp.call("ADM_update_job", &in, &out); const scord::network::generic_response resp{call_rv.value()}; if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return rv; std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, out.op_id); return admire::error_code::success; #endif LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } admire::error_code Loading src/lib/detail/impl.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -41,7 +41,7 @@ register_job(const server& srv, const job::resources& job_resources, admire::error_code update_job(const server& srv, const job& job, const job::resources& job_resources); const job::resources& new_resources); admire::error_code remove_job(const server& srv, const job& job); Loading src/scord/rpc_handlers.cpp +32 −53 Original line number Diff line number Diff line Loading @@ -125,6 +125,38 @@ register_job(const scord::network::request& req, req.respond(resp); } void update_job(const request& req, admire::job_id job_id, const admire::job::resources& new_resources) { using scord::network::get_address; const auto rpc_name = "ADM_"s + __FUNCTION__; const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job_id: {}, new_resources: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), job_id, new_resources); auto& jm = scord::job_manager::instance(); const auto ec = jm.update(job_id, new_resources); if(!ec) { LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, ec); } const auto resp = generic_response{rpc_id, ec}; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), ec); req.respond(resp); } void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, Loading Loading @@ -332,59 +364,6 @@ deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) { } // namespace scord::network::handlers static void ADM_update_job(hg_handle_t h) { using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; ADM_update_job_in_t in; ADM_update_job_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); const admire::job job(in.job); const admire::job::resources job_resources(in.job_resources); const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}, job_resources: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job, job_resources); auto& jm = scord::job_manager::instance(); const auto ec = jm.update(job.id(), job_resources); if(ec != ADM_SUCCESS) { LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, ec); } out.op_id = rpc_id; out.retval = ec; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); ret = margo_free_input(h, &in); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); } DEFINE_MARGO_RPC_HANDLER(ADM_update_job); static void ADM_remove_job(hg_handle_t h) { Loading src/scord/rpc_handlers.hpp +4 −3 Original line number Diff line number Diff line Loading @@ -52,6 +52,10 @@ register_job(const scord::network::request& req, const admire::job_requirements& job_requirements, admire::slurm_job_id slurm_id); void update_job(const request& req, admire::job_id job_id, const admire::job::resources& new_resources); } // namespace scord::network::handlers #include <margo.h> Loading @@ -63,9 +67,6 @@ extern "C" { // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { /// ADM_update_job DECLARE_MARGO_RPC_HANDLER(ADM_update_job); /// ADM_remove_job DECLARE_MARGO_RPC_HANDLER(ADM_remove_job); Loading src/scord/scord.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -192,6 +192,8 @@ main(int argc, char* argv[]) { scord::network::handlers::deploy_adhoc_storage); daemon.set_handler("ADM_register_job"s, scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); #if 0 const auto rpc_registration_cb = [](auto&& ctx) { Loading Loading
src/lib/detail/impl.cpp +24 −32 Original line number Diff line number Diff line Loading @@ -259,48 +259,40 @@ register_job(const server& srv, const job::resources& job_resources, admire::error_code update_job(const server& srv, const job& job, const job::resources& job_resources) { const job::resources& new_resources) { (void) srv; (void) job; (void) job_resources; return admire::error_code::snafu; #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}, job_resources: {}}}", "body: {{job_id: {}, new_resources: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job, job_resources); const auto rpc_job = api::convert(job); const auto rpc_job_resources = api::convert(job_resources); std::quoted(rpc_client.self_address()), job.id(), new_resources); ADM_update_job_in_t in{rpc_job.get(), rpc_job_resources.get()}; ADM_update_job_out_t out; if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id(), new_resources); call_rv.has_value()) { const auto rpc = endp.call("ADM_update_job", &in, &out); const scord::network::generic_response resp{call_rv.value()}; if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return rv; std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, out.op_id); return admire::error_code::success; #endif LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } admire::error_code Loading
src/lib/detail/impl.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -41,7 +41,7 @@ register_job(const server& srv, const job::resources& job_resources, admire::error_code update_job(const server& srv, const job& job, const job::resources& job_resources); const job::resources& new_resources); admire::error_code remove_job(const server& srv, const job& job); Loading
src/scord/rpc_handlers.cpp +32 −53 Original line number Diff line number Diff line Loading @@ -125,6 +125,38 @@ register_job(const scord::network::request& req, req.respond(resp); } void update_job(const request& req, admire::job_id job_id, const admire::job::resources& new_resources) { using scord::network::get_address; const auto rpc_name = "ADM_"s + __FUNCTION__; const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job_id: {}, new_resources: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), job_id, new_resources); auto& jm = scord::job_manager::instance(); const auto ec = jm.update(job_id, new_resources); if(!ec) { LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, ec); } const auto resp = generic_response{rpc_id, ec}; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), ec); req.respond(resp); } void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, Loading Loading @@ -332,59 +364,6 @@ deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) { } // namespace scord::network::handlers static void ADM_update_job(hg_handle_t h) { using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; ADM_update_job_in_t in; ADM_update_job_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); const admire::job job(in.job); const admire::job::resources job_resources(in.job_resources); const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}, job_resources: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job, job_resources); auto& jm = scord::job_manager::instance(); const auto ec = jm.update(job.id(), job_resources); if(ec != ADM_SUCCESS) { LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, ec); } out.op_id = rpc_id; out.retval = ec; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); ret = margo_free_input(h, &in); assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); } DEFINE_MARGO_RPC_HANDLER(ADM_update_job); static void ADM_remove_job(hg_handle_t h) { Loading
src/scord/rpc_handlers.hpp +4 −3 Original line number Diff line number Diff line Loading @@ -52,6 +52,10 @@ register_job(const scord::network::request& req, const admire::job_requirements& job_requirements, admire::slurm_job_id slurm_id); void update_job(const request& req, admire::job_id job_id, const admire::job::resources& new_resources); } // namespace scord::network::handlers #include <margo.h> Loading @@ -63,9 +67,6 @@ extern "C" { // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { /// ADM_update_job DECLARE_MARGO_RPC_HANDLER(ADM_update_job); /// ADM_remove_job DECLARE_MARGO_RPC_HANDLER(ADM_remove_job); Loading
src/scord/scord.cpp +2 −0 Original line number Diff line number Diff line Loading @@ -192,6 +192,8 @@ main(int argc, char* argv[]) { scord::network::handlers::deploy_adhoc_storage); daemon.set_handler("ADM_register_job"s, scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); #if 0 const auto rpc_registration_cb = [](auto&& ctx) { Loading