From 37d115003f0ecec509be72f008f6b3559e074686 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 23 Nov 2022 19:30:12 +0100 Subject: [PATCH] API: Refactor ADM_update_pfs_storage() --- examples/c/ADM_update_pfs_storage.c | 2 +- examples/cxx/ADM_update_pfs_storage.cpp | 25 ++++++++-------- src/common/net/proto/rpc_types.h | 12 ++++++-- src/lib/admire.cpp | 31 ++++++-------------- src/lib/admire.h | 4 +-- src/lib/admire.hpp | 6 ++-- src/lib/c_wrapper.cpp | 12 ++++---- src/lib/detail/impl.cpp | 38 +++++++++++++++++++++++++ src/lib/detail/impl.hpp | 7 +++-- src/scord/rpc_handlers.cpp | 25 ++++++++++++++-- 10 files changed, 109 insertions(+), 53 deletions(-) diff --git a/examples/c/ADM_update_pfs_storage.c b/examples/c/ADM_update_pfs_storage.c index f67f610f..55b369d6 100644 --- a/examples/c/ADM_update_pfs_storage.c +++ b/examples/c/ADM_update_pfs_storage.c @@ -89,7 +89,7 @@ main(int argc, char* argv[]) { } // We can now request the update to the server - if((ret = ADM_update_pfs_storage(server, new_pfs_ctx, pfs_storage)) != + if((ret = ADM_update_pfs_storage(server, pfs_storage, new_pfs_ctx)) != ADM_SUCCESS) { fprintf(stderr, "ADM_update_pfs_storage() failed: %s\n", ADM_strerror(ret)); diff --git a/examples/cxx/ADM_update_pfs_storage.cpp b/examples/cxx/ADM_update_pfs_storage.cpp index fa2b6de6..6a090a6c 100644 --- a/examples/cxx/ADM_update_pfs_storage.cpp +++ b/examples/cxx/ADM_update_pfs_storage.cpp @@ -37,25 +37,26 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_pfs_context_t ctx{}; - ADM_pfs_storage_t pfs_storage{}; - ADM_return_t ret = ADM_SUCCESS; + std::string pfs_name = "gpfs_scratch"; + std::string pfs_mount = "/gpfs/scratch"; + std::string new_pfs_mount = "/gpfs/scratch2"; try { - ret = admire::update_pfs_storage(server, ctx, pfs_storage); + + const auto pfs_storage = admire::register_pfs_storage( + server, pfs_name, admire::pfs_storage::type::gpfs, + admire::pfs_storage::ctx{pfs_mount}); + + admire::update_pfs_storage(server, pfs_storage, + admire::pfs_storage::ctx{new_pfs_mount}); } catch(const std::exception& e) { - fmt::print(stderr, "FATAL: ADM_update_pfs_storage() failed: {}\n", + fmt::print(stderr, + "FATAL: ADM_update_pfs_storage() or " + "ADM_update_pfs_storage() failed: {}\n", e.what()); exit(EXIT_FAILURE); } - if(ret != ADM_SUCCESS) { - fmt::print(stdout, - "ADM_update_pfs_storage() remote procedure not completed " - "successfully\n"); - exit(EXIT_FAILURE); - } - fmt::print(stdout, "ADM_update_pfs_storage() remote procedure completed " "successfully\n"); } diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index b7c503db..952e0df1 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -403,9 +403,17 @@ MERCURY_GEN_PROC( ); /// ADM_update_pfs_storage -MERCURY_GEN_PROC(ADM_update_pfs_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC( + ADM_update_pfs_storage_in_t, + ((ADM_pfs_context_t) (pfs_storage_ctx)) + ((hg_uint64_t) (server_id)) +); -MERCURY_GEN_PROC(ADM_update_pfs_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC( + ADM_update_pfs_storage_out_t, + ((hg_uint64_t) (op_id)) + ((hg_int32_t) (retval)) +); /// ADM_remove_pfs_storage MERCURY_GEN_PROC(ADM_remove_pfs_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index c0b5cbd7..8edaa4f8 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -285,7 +285,7 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { void deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { - + const auto ec = detail::deploy_adhoc_storage(srv, adhoc_storage); if(!ec) { @@ -308,31 +308,18 @@ register_pfs_storage(const server& srv, const std::string& name, return rv.value(); } -ADM_return_t -update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, - ADM_pfs_storage_t pfs_storage) { - (void) srv; - (void) ctx; - (void) pfs_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_update_pfs_storage(...)"); - ADM_update_pfs_storage_in_t in{}; - ADM_update_pfs_storage_out_t out; +void +update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, + const admire::pfs_storage::ctx& pfs_storage_ctx) { - const auto rpc = endp.call("ADM_update_pfs_storage", &in, &out); + const auto ec = + detail::update_pfs_storage(srv, pfs_storage, pfs_storage_ctx); - if(out.ret < 0) { - LOGGER_ERROR("ADM_update_pfs_storage() = {}", out.ret); - return static_cast(out.ret); + if(!ec) { + throw std::runtime_error(fmt::format( + "ADM_update_pfs_storage() error: {}", ec.message())); } - - LOGGER_INFO("ADM_update_pfs_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index a7ba56ee..186b8329 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -186,8 +186,8 @@ ADM_register_pfs_storage(ADM_server_t server, const char* name, * successfully. */ ADM_return_t -ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, - ADM_pfs_storage_t adhoc_storage); +ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_storage_t adhoc_storage, + ADM_pfs_context_t ctx); /** * Remove an already-registered PFS storage tier. diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 054d5f45..0b7a61a7 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -77,9 +77,9 @@ register_pfs_storage(const server& srv, const std::string& name, enum admire::pfs_storage::type type, const admire::pfs_storage::ctx& ctx); -ADM_return_t -update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, - ADM_pfs_storage_t pfs_storage); +void +update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, + const admire::pfs_storage::ctx& pfs_storage_ctx); ADM_return_t remove_pfs_storage(const server& srv, ADM_pfs_storage_t pfs_storage); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 8d7d7139..01bf9112 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -133,7 +133,7 @@ ADM_deploy_adhoc_storage(ADM_server_t server, } ADM_return_t -ADM_register_pfs_storage(ADM_server_t server, const char* name, +ADM_register_pfs_storage(ADM_server_t server, const char* name, ADM_pfs_storage_type_t type, ADM_pfs_context_t ctx, ADM_pfs_storage_t* pfs_storage) { @@ -152,12 +152,12 @@ ADM_register_pfs_storage(ADM_server_t server, const char* name, } ADM_return_t -ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, - ADM_pfs_storage_t pfs_storage) { +ADM_update_pfs_storage(ADM_server_t server, ADM_pfs_storage_t pfs_storage, + ADM_pfs_context_t ctx) { - const admire::server srv{server}; - - return admire::update_pfs_storage(srv, ctx, pfs_storage); + return admire::detail::update_pfs_storage(admire::server{server}, + admire::pfs_storage{pfs_storage}, + admire::pfs_storage::ctx{ctx}); } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index b582115f..5f915abd 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -556,4 +556,42 @@ register_pfs_storage(const server& srv, const std::string& name, return rpc_pfs_storage; } +admire::error_code +update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, + const admire::pfs_storage::ctx& pfs_storage_ctx) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{pfs_storage_id: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), pfs_storage.id()); + + const auto rpc_ctx = api::convert(pfs_storage_ctx); + + ADM_update_pfs_storage_in_t in{rpc_ctx.get(), pfs_storage.id()}; + ADM_update_pfs_storage_out_t out; + + const auto rpc = endp.call("ADM_update_pfs_storage", &in, &out); + + if(const auto rv = admire::error_code{out.retval}; !rv) { + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), rv, out.op_id); + return rv; + } + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), admire::error_code::success, + out.op_id); + + return admire::error_code::success; +} + } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 98909dcc..bf595d15 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -66,13 +66,16 @@ admire::error_code remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); admire::error_code -deploy_adhoc_storage(const server& srv, - const adhoc_storage& adhoc_storage); +deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); tl::expected register_pfs_storage(const server& srv, const std::string& name, enum pfs_storage::type type, const pfs_storage::ctx& ctx); +admire::error_code +update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, + const admire::pfs_storage::ctx& pfs_storage_ctx); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 47371427..31b09b48 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -623,11 +623,30 @@ ADM_update_pfs_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; + const admire::pfs_storage::ctx pfs_storage_ctx(in.pfs_storage_ctx); + const std::uint64_t server_id(in.server_id); + + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{pfs_storage_id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + server_id); - LOGGER_INFO("ADM_update_pfs_storage()"); + auto& pfs_manager = scord::pfs_storage_manager::instance(); + const auto ec = pfs_manager.update(server_id, pfs_storage_ctx); - out.ret = 0; + if(!ec) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error updating pfs_storage: {}\"", + rpc_id, ec); + } + + out.op_id = rpc_id; + out.retval = ec; + + LOGGER_INFO("rpc id: {} name: {} to: {} => " + "body: {{retval: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab