diff --git a/examples/cxx/ADM_remove_adhoc_storage.cpp b/examples/cxx/ADM_remove_adhoc_storage.cpp index c803204545a7369c788cf5608d886d4a0d978b8e..3c1336ed911e6245947e7a114af847a4c75fd929 100644 --- a/examples/cxx/ADM_remove_adhoc_storage.cpp +++ b/examples/cxx/ADM_remove_adhoc_storage.cpp @@ -22,9 +22,14 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include "fmt/format.h" -#include "admire.hpp" +#include +#include +#include "common.hpp" +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -38,24 +43,28 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_storage_t adhoc_storage{}; - ADM_return_t ret = ADM_SUCCESS; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false}; try { - ret = admire::remove_adhoc_storage(server, adhoc_storage); + const auto adhoc_storage = admire::register_adhoc_storage( + server, name, admire::storage::type::gekkofs, + adhoc_storage_ctx); + admire::remove_adhoc_storage(server, adhoc_storage); + fmt::print(stdout, + "ADM_remove_adhoc_storage() remote procedure completed " + "successfully\n"); + exit(EXIT_SUCCESS); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_remove_adhoc_storage() failed: {}\n", e.what()); exit(EXIT_FAILURE); } - - if(ret != ADM_SUCCESS) { - fmt::print(stdout, - "ADM_remove_adhoc_storage() remote procedure not completed " - "successfully\n"); - exit(EXIT_FAILURE); - } - - fmt::print(stdout, "ADM_remove_adhoc_storage() remote procedure completed " - "successfully\n"); } diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index dc828276f6f9d8020707d15a856614aeeee5779b..6eaf0a8e687e35a72985a9f830bcaec1dc31afba 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -327,7 +327,7 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_remove_job_out_t, ((hg_uint64_t) (op_id)) - ((int32_t) (retval)) + ((hg_int32_t) (retval)) ); /// ADM_register_adhoc_storage @@ -341,8 +341,8 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_register_adhoc_storage_out_t, ((hg_uint64_t) (op_id)) - ((int32_t) (retval)) - ((uint64_t) (id)) + ((hg_int32_t) (retval)) + ((hg_uint64_t) (id)) ); /// ADM_update_adhoc_storage @@ -359,9 +359,16 @@ MERCURY_GEN_PROC( ); /// ADM_remove_adhoc_storage -MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC( + ADM_remove_adhoc_storage_in_t, + ((hg_uint64_t) (server_id)) +); -MERCURY_GEN_PROC(ADM_remove_adhoc_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC( + ADM_remove_adhoc_storage_out_t, + ((hg_uint64_t) (op_id)) + ((hg_int32_t) (retval)) +); /// ADM_deploy_adhoc_storage MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index debc4b8ed59bf5aa1510c122c0aff6ebae10862c..42c2263087286e0b648c27f4679a407a94b29422 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -255,30 +255,9 @@ update_adhoc_storage(const server& srv, return detail::update_adhoc_storage(srv, adhoc_storage_ctx, adhoc_storage); } -ADM_return_t -remove_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage) { - - (void) srv; - (void) adhoc_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_remove_adhoc_storage(...)"); - - ADM_remove_adhoc_storage_in_t in{}; - ADM_remove_adhoc_storage_out_t out; - - const auto rpc = endp.call("ADM_remove_adhoc_storage", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_remove_adhoc_storage() = {}", out.ret); - return static_cast(out.ret); - } - - LOGGER_INFO("ADM_remove_adhoc_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; +admire::error_code +remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { + return detail::remove_adhoc_storage(srv, adhoc_storage); } ADM_return_t diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index ceab7c6dab4228045138b5167e7a57065b7e3d1c..7cd9045bf3670331999ac796b63327fddf32bb06 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -67,8 +67,8 @@ update_adhoc_storage(const server& srv, const adhoc_storage::ctx& adhoc_storage_ctx, const adhoc_storage& adhoc_storage); -ADM_return_t -remove_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage); +admire::error_code +remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); ADM_return_t deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index a841b31c72ed83c3e8429071421a065436572598..d4dd6aa15fbc9fe07a161f6ea0c0915bcdcbcb0a 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -116,7 +116,8 @@ ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) { const admire::server srv{server}; - return admire::remove_adhoc_storage(srv, adhoc_storage); + return admire::detail::remove_adhoc_storage( + srv, admire::adhoc_storage{adhoc_storage}); } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 5d7c34c124b5f018e5021b977d9df88126e11adb..a98db77ad8c552198c2f41dcef7c17e659c0203d 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -446,4 +446,37 @@ update_adhoc_storage(const server& srv, return admire::error_code::success; } +admire::error_code +remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_storage_id: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), adhoc_storage.id()); + + ADM_remove_adhoc_storage_in_t in{adhoc_storage.id()}; + ADM_remove_adhoc_storage_out_t out; + + const auto rpc = endp.call("ADM_remove_adhoc_storage", &in, &out); + + if(const auto rv = admire::error_code{out.retval}; !rv) { + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), rv, out.op_id); + return rv; + } + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), admire::error_code::success, + out.op_id); + return admire::error_code::success; +} } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index f80a1cfb85b0081d7b418c320347da1b5811b4fe..03bf2252da4e5b93f1c99c650ef0647d4dbfed7f 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -62,6 +62,9 @@ update_adhoc_storage(const server& srv, const adhoc_storage::ctx& adhoc_storage_ctx, const adhoc_storage& adhoc_storage); +admire::error_code +remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 3c32c84ee4e4038269089360a8188199484ad28c..9ac3c96bd769848291301a1c652fd97a57415dfe 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -389,11 +389,28 @@ ADM_remove_adhoc_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_storage_id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + in.server_id); - LOGGER_INFO("ADM_remove_adhoc_storage()"); - out.ret = 0; + auto& adhoc_manager = scord::adhoc_storage_manager::instance(); + admire::error_code ec = adhoc_manager.remove(in.server_id); + + if(!ec) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, + in.server_id); + } + + out.op_id = rpc_id; + out.retval = ec; + + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS);