Loading src/lib/detail/impl.cpp +98 −98 Original line number Diff line number Diff line Loading @@ -373,104 +373,6 @@ register_adhoc_storage(const server& srv, const std::string& name, return tl::make_unexpected(admire::error_code::other); } admire::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{adhoc_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), adhoc_storage.id()); if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); call_rv.has_value()) { const scord::network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping) { (void) srv; (void) job; (void) sources; (void) targets; (void) limits; (void) mapping; return tl::make_unexpected(admire::error_code::snafu); #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO( "rpc id: {} name: {} from: {} => " "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job, sources, targets, limits, mapping); const auto rpc_job = api::convert(job); const auto rpc_sources = api::convert(sources); const auto rpc_targets = api::convert(targets); const auto rpc_qos_limits = api::convert(limits); ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(), rpc_targets.get(), rpc_qos_limits.get(), static_cast<ADM_transfer_mapping_t>(mapping)}; ADM_transfer_datasets_out_t out; [[maybe_unused]] const auto rpc = endp.call("ADM_transfer_datasets", &in, &out); if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return tl::make_unexpected(rv); } const admire::transfer tx = api::convert(out.tx); LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}, transfer: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, tx, out.op_id); return tx; #endif } admire::error_code update_adhoc_storage(const server& srv, const adhoc_storage::ctx& new_ctx, const adhoc_storage& adhoc_storage) { Loading Loading @@ -686,4 +588,102 @@ remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage) { #endif } admire::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{adhoc_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), adhoc_storage.id()); if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); call_rv.has_value()) { const scord::network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping) { (void) srv; (void) job; (void) sources; (void) targets; (void) limits; (void) mapping; return tl::make_unexpected(admire::error_code::snafu); #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO( "rpc id: {} name: {} from: {} => " "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job, sources, targets, limits, mapping); const auto rpc_job = api::convert(job); const auto rpc_sources = api::convert(sources); const auto rpc_targets = api::convert(targets); const auto rpc_qos_limits = api::convert(limits); ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(), rpc_targets.get(), rpc_qos_limits.get(), static_cast<ADM_transfer_mapping_t>(mapping)}; ADM_transfer_datasets_out_t out; [[maybe_unused]] const auto rpc = endp.call("ADM_transfer_datasets", &in, &out); if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return tl::make_unexpected(rv); } const admire::transfer tx = api::convert(out.tx); LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}, transfer: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, tx, out.op_id); return tx; #endif } } // namespace admire::detail src/lib/detail/impl.hpp +7 −7 Original line number Diff line number Diff line Loading @@ -46,13 +46,6 @@ update_job(const server& srv, const job& job, admire::error_code remove_job(const server& srv, const job& job); tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping); tl::expected<admire::adhoc_storage, admire::error_code> register_adhoc_storage(const server& srv, const std::string& name, enum adhoc_storage::type type, Loading @@ -79,6 +72,13 @@ update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, admire::error_code remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage); tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping); } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP src/scord/rpc_handlers.hpp +14 −13 Original line number Diff line number Diff line Loading @@ -32,19 +32,6 @@ namespace scord::network::handlers { void ping(const scord::network::request& req); void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, const admire::adhoc_storage::ctx& ctx); void update_adhoc_storage(const request& req, std::uint64_t adhoc_id, const admire::adhoc_storage::ctx& new_ctx); void remove_adhoc_storage(const request& req, std::uint64_t adhoc_id); void deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id); void register_job(const scord::network::request& req, Loading @@ -59,6 +46,20 @@ update_job(const request& req, admire::job_id job_id, void remove_job(const request& req, admire::job_id job_id); void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, const admire::adhoc_storage::ctx& ctx); void update_adhoc_storage(const request& req, std::uint64_t adhoc_id, const admire::adhoc_storage::ctx& new_ctx); void remove_adhoc_storage(const request& req, std::uint64_t adhoc_id); void deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id); } // namespace scord::network::handlers #include <margo.h> Loading src/scord/scord.cpp +6 −6 Original line number Diff line number Diff line Loading @@ -182,6 +182,12 @@ main(int argc, char* argv[]) { scord::network::server daemon(cfg); daemon.set_handler("ADM_ping"s, scord::network::handlers::ping); daemon.set_handler("ADM_register_job"s, scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); daemon.set_handler("ADM_remove_job"s, scord::network::handlers::remove_job); daemon.set_handler("ADM_register_adhoc_storage"s, scord::network::handlers::register_adhoc_storage); daemon.set_handler("ADM_update_adhoc_storage"s, Loading @@ -190,12 +196,6 @@ main(int argc, char* argv[]) { scord::network::handlers::remove_adhoc_storage); daemon.set_handler("ADM_deploy_adhoc_storage"s, scord::network::handlers::deploy_adhoc_storage); daemon.set_handler("ADM_register_job"s, scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); daemon.set_handler("ADM_remove_job"s, scord::network::handlers::remove_job); #if 0 const auto rpc_registration_cb = [](auto&& ctx) { Loading Loading
src/lib/detail/impl.cpp +98 −98 Original line number Diff line number Diff line Loading @@ -373,104 +373,6 @@ register_adhoc_storage(const server& srv, const std::string& name, return tl::make_unexpected(admire::error_code::other); } admire::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{adhoc_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), adhoc_storage.id()); if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); call_rv.has_value()) { const scord::network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping) { (void) srv; (void) job; (void) sources; (void) targets; (void) limits; (void) mapping; return tl::make_unexpected(admire::error_code::snafu); #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO( "rpc id: {} name: {} from: {} => " "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job, sources, targets, limits, mapping); const auto rpc_job = api::convert(job); const auto rpc_sources = api::convert(sources); const auto rpc_targets = api::convert(targets); const auto rpc_qos_limits = api::convert(limits); ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(), rpc_targets.get(), rpc_qos_limits.get(), static_cast<ADM_transfer_mapping_t>(mapping)}; ADM_transfer_datasets_out_t out; [[maybe_unused]] const auto rpc = endp.call("ADM_transfer_datasets", &in, &out); if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return tl::make_unexpected(rv); } const admire::transfer tx = api::convert(out.tx); LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}, transfer: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, tx, out.op_id); return tx; #endif } admire::error_code update_adhoc_storage(const server& srv, const adhoc_storage::ctx& new_ctx, const adhoc_storage& adhoc_storage) { Loading Loading @@ -686,4 +588,102 @@ remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage) { #endif } admire::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { scord::network::client rpc_client{srv.protocol()}; const auto rpc_id = ::api::remote_procedure::new_id(); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{adhoc_id: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), adhoc_storage.id()); if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); call_rv.has_value()) { const scord::network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(endp.address()), resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_ERROR("rpc call failed"); return admire::error_code::other; } tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping) { (void) srv; (void) job; (void) sources; (void) targets; (void) limits; (void) mapping; return tl::make_unexpected(admire::error_code::snafu); #if 0 scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO( "rpc id: {} name: {} from: {} => " "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc_client.self_address()), job, sources, targets, limits, mapping); const auto rpc_job = api::convert(job); const auto rpc_sources = api::convert(sources); const auto rpc_targets = api::convert(targets); const auto rpc_qos_limits = api::convert(limits); ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(), rpc_targets.get(), rpc_qos_limits.get(), static_cast<ADM_transfer_mapping_t>(mapping)}; ADM_transfer_datasets_out_t out; [[maybe_unused]] const auto rpc = endp.call("ADM_transfer_datasets", &in, &out); if(const auto rv = admire::error_code{out.retval}; !rv) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), rv, out.op_id); return tl::make_unexpected(rv); } const admire::transfer tx = api::convert(out.tx); LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}, transfer: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), std::quoted(rpc.origin()), admire::error_code::success, tx, out.op_id); return tx; #endif } } // namespace admire::detail
src/lib/detail/impl.hpp +7 −7 Original line number Diff line number Diff line Loading @@ -46,13 +46,6 @@ update_job(const server& srv, const job& job, admire::error_code remove_job(const server& srv, const job& job); tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping); tl::expected<admire::adhoc_storage, admire::error_code> register_adhoc_storage(const server& srv, const std::string& name, enum adhoc_storage::type type, Loading @@ -79,6 +72,13 @@ update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, admire::error_code remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage); tl::expected<transfer, error_code> transfer_datasets(const server& srv, const job& job, const std::vector<dataset>& sources, const std::vector<dataset>& targets, const std::vector<qos::limit>& limits, transfer::mapping mapping); } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP
src/scord/rpc_handlers.hpp +14 −13 Original line number Diff line number Diff line Loading @@ -32,19 +32,6 @@ namespace scord::network::handlers { void ping(const scord::network::request& req); void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, const admire::adhoc_storage::ctx& ctx); void update_adhoc_storage(const request& req, std::uint64_t adhoc_id, const admire::adhoc_storage::ctx& new_ctx); void remove_adhoc_storage(const request& req, std::uint64_t adhoc_id); void deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id); void register_job(const scord::network::request& req, Loading @@ -59,6 +46,20 @@ update_job(const request& req, admire::job_id job_id, void remove_job(const request& req, admire::job_id job_id); void register_adhoc_storage(const request& req, const std::string& name, enum admire::adhoc_storage::type type, const admire::adhoc_storage::ctx& ctx); void update_adhoc_storage(const request& req, std::uint64_t adhoc_id, const admire::adhoc_storage::ctx& new_ctx); void remove_adhoc_storage(const request& req, std::uint64_t adhoc_id); void deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id); } // namespace scord::network::handlers #include <margo.h> Loading
src/scord/scord.cpp +6 −6 Original line number Diff line number Diff line Loading @@ -182,6 +182,12 @@ main(int argc, char* argv[]) { scord::network::server daemon(cfg); daemon.set_handler("ADM_ping"s, scord::network::handlers::ping); daemon.set_handler("ADM_register_job"s, scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); daemon.set_handler("ADM_remove_job"s, scord::network::handlers::remove_job); daemon.set_handler("ADM_register_adhoc_storage"s, scord::network::handlers::register_adhoc_storage); daemon.set_handler("ADM_update_adhoc_storage"s, Loading @@ -190,12 +196,6 @@ main(int argc, char* argv[]) { scord::network::handlers::remove_adhoc_storage); daemon.set_handler("ADM_deploy_adhoc_storage"s, scord::network::handlers::deploy_adhoc_storage); daemon.set_handler("ADM_register_job"s, scord::network::handlers::register_job); daemon.set_handler("ADM_update_job"s, scord::network::handlers::update_job); daemon.set_handler("ADM_remove_job"s, scord::network::handlers::remove_job); #if 0 const auto rpc_registration_cb = [](auto&& ctx) { Loading