diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index b20bf515d4456caf6c38bfd0635ed4800014cf39..3024bdb98620f9647fcce5c6fbca05a1ae9cbe75 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -78,7 +78,12 @@ main(int argc, char* argv[]) { fprintf(stdout, "ADM_register_adhoc_storage() remote procedure completed " "successfully\n"); - ret = ADM_update_adhoc_storage(server, ctx, adhoc_storage); + ADM_adhoc_context_t new_ctx = ADM_adhoc_context_create( + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 200, false); + assert(new_ctx); + + ret = ADM_update_adhoc_storage(server, new_ctx, adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stderr, @@ -93,6 +98,20 @@ main(int argc, char* argv[]) { "successfully\n"); cleanup: + for(int i = 0; i < NINPUTS; ++i) { + ADM_dataset_destroy(inputs[i]); + } + + for(int i = 0; i < NOUTPUTS; ++i) { + ADM_dataset_destroy(outputs[i]); + } + + ADM_storage_destroy(adhoc_storage); + + ADM_adhoc_context_destroy(ctx); + + ADM_adhoc_context_destroy(new_ctx); + ADM_server_destroy(server); exit(exit_status); } diff --git a/examples/cxx/ADM_update_adhoc_storage.cpp b/examples/cxx/ADM_update_adhoc_storage.cpp index fec1c4b57b5b55428bd38bee57b331bc30128214..a353d4578b55187550bd36fff0c5232777641d5c 100644 --- a/examples/cxx/ADM_update_adhoc_storage.cpp +++ b/examples/cxx/ADM_update_adhoc_storage.cpp @@ -24,7 +24,11 @@ #include #include +#include "common.hpp" +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -38,25 +42,43 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_adhoc_context_t ctx{}; - ADM_storage_t adhoc_storage{}; - ADM_return_t ret = ADM_SUCCESS; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); + const auto new_adhoc_nodes = prepare_nodes(NADHOC_NODES * 2); + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false}; + + const auto new_adhoc_storage_ctx = admire::adhoc_storage::ctx{ + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{new_adhoc_nodes}, 200, false}; try { - ret = admire::update_adhoc_storage(server, ctx, adhoc_storage); - } catch(const std::exception& e) { - fmt::print(stderr, "FATAL: ADM_update_adhoc_storage() failed: {}\n", - e.what()); - exit(EXIT_FAILURE); - } + const auto adhoc_storage = admire::register_adhoc_storage( + server, name, admire::storage::type::gekkofs, + adhoc_storage_ctx); + + const auto ret = admire::update_adhoc_storage( + server, new_adhoc_storage_ctx, adhoc_storage); + + if(!ret) { + fmt::print(stderr, "FATAL: ADM_update_adhoc_storage() failed: {}\n", + ret.message()); + exit(EXIT_FAILURE); + } - if(ret != ADM_SUCCESS) { fmt::print(stdout, - "ADM_update_adhoc_storage() remote procedure not completed " + "ADM_update_adhoc_storage() remote procedure completed " "successfully\n"); + exit(EXIT_SUCCESS); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - - fmt::print(stdout, "ADM_update_adhoc_storage() remote procedure completed " - "successfully\n"); } diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 8d13791bcd88b4910ac8b8e86e90454f4c5c2bb3..cfaf9c79cb71b6fc3cd0cc47a0fe36f55fa6ccff 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -402,6 +402,7 @@ struct adhoc_storage : public storage { execution_mode exec_mode, access_type access_type, adhoc_storage::resources res, std::uint32_t walltime, bool should_flush); + explicit adhoc_storage(ADM_storage_t storage); adhoc_storage(enum storage::type type, std::string name, std::uint64_t id, ADM_adhoc_context_t ctx); adhoc_storage(enum storage::type type, std::string name, std::uint64_t id, diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 0f1a05d7de659eac0e2f613ead2f75fc5d4a3ce7..2a337e98349df2b49ca8aae6a20554ed14d60aaa 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -629,7 +629,6 @@ ADM_data_operation_destroy(ADM_data_operation_t op) { return ret; } - ADM_adhoc_context_t ADM_adhoc_context_create(ADM_adhoc_mode_t exec_mode, ADM_adhoc_access_t access_type, @@ -1380,7 +1379,6 @@ private: adhoc_storage::ctx m_ctx; }; - adhoc_storage::adhoc_storage(enum storage::type type, std::string name, std::uint64_t id, execution_mode exec_mode, access_type access_type, @@ -1391,6 +1389,24 @@ adhoc_storage::adhoc_storage(enum storage::type type, std::string name, adhoc_storage::ctx{exec_mode, access_type, std::move(res), walltime, should_flush})) {} +adhoc_storage::adhoc_storage(ADM_storage_t st) + : storage(static_cast(st->s_type), st->s_name, + st->s_id) { + + switch(st->s_type) { + case ADM_STORAGE_LUSTRE: + case ADM_STORAGE_GPFS: + throw std::runtime_error( + fmt::format("Invalid type {} for adhoc_storage", + static_cast(st->s_type))); + break; + + default: + break; + } + m_pimpl = std::make_unique(adhoc_storage::ctx{st->s_adhoc_ctx}); +} + adhoc_storage::adhoc_storage(enum storage::type type, std::string name, std::uint64_t id, ADM_adhoc_context_t ctx) : storage(type, std::move(name), id), diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 4161ef5d0f4ee33d96ddfefe53e9051182a1df38..dc828276f6f9d8020707d15a856614aeeee5779b 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -346,9 +346,17 @@ MERCURY_GEN_PROC( ); /// ADM_update_adhoc_storage -MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC( + ADM_update_adhoc_storage_in_t, + ((ADM_adhoc_context_t)(adhoc_storage_ctx)) + ((hg_uint64_t)(server_id)) +); -MERCURY_GEN_PROC(ADM_update_adhoc_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC( + ADM_update_adhoc_storage_out_t, + ((hg_uint64_t) (op_id)) + ((hg_int32_t) (retval)) +); /// ADM_remove_adhoc_storage MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 54a511242a80599489d865e9fdb6da526e046582..debc4b8ed59bf5aa1510c122c0aff6ebae10862c 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -248,32 +248,11 @@ register_adhoc_storage(const server& srv, const std::string& name, return rv.value(); } -ADM_return_t -update_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, - ADM_storage_t adhoc_storage) { - - (void) srv; - (void) ctx; - (void) adhoc_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_update_adhoc_storage(...)"); - - ADM_update_adhoc_storage_in_t in{}; - ADM_update_adhoc_storage_out_t out; - - const auto rpc = endp.call("ADM_update_adhoc_storage", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_update_adhoc_storage() = {}", out.ret); - return static_cast(out.ret); - } - - LOGGER_INFO("ADM_update_adhoc_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; +admire::error_code +update_adhoc_storage(const server& srv, + const adhoc_storage::ctx& adhoc_storage_ctx, + const adhoc_storage& adhoc_storage) { + return detail::update_adhoc_storage(srv, adhoc_storage_ctx, adhoc_storage); } ADM_return_t diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 00e44943881c11c48687cfc0205a947038250b8f..ceab7c6dab4228045138b5167e7a57065b7e3d1c 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -62,9 +62,10 @@ register_adhoc_storage(const server& srv, const std::string& name, enum adhoc_storage::type type, const adhoc_storage::ctx& ctx); -ADM_return_t -update_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, - ADM_storage_t adhoc_storage); +admire::error_code +update_adhoc_storage(const server& srv, + const adhoc_storage::ctx& adhoc_storage_ctx, + const adhoc_storage& adhoc_storage); ADM_return_t remove_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 05faa3f793f820e21688eef380121d30df384131..a841b31c72ed83c3e8429071421a065436572598 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -106,7 +106,9 @@ ADM_update_adhoc_storage(ADM_server_t server, ADM_adhoc_context_t ctx, const admire::server srv{server}; - return admire::update_adhoc_storage(srv, ctx, adhoc_storage); + return admire::detail::update_adhoc_storage( + srv, admire::adhoc_storage::ctx{ctx}, + admire::adhoc_storage{adhoc_storage}); } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 91451d9e03610ec9d3363180f1ffa80ae0c723d7..5d7c34c124b5f018e5021b977d9df88126e11adb 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -407,4 +407,43 @@ transfer_datasets(const server& srv, const job& job, return tx; } +admire::error_code +update_adhoc_storage(const server& srv, + const adhoc_storage::ctx& adhoc_storage_ctx, + const adhoc_storage& adhoc_storage) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: name: {} from: {} => " + "body: {{id: {}, adhoc_ctx{}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), adhoc_storage.id(), + adhoc_storage_ctx); + + const auto rpc_ctx = api::convert(adhoc_storage_ctx); + + ADM_update_adhoc_storage_in_t in{rpc_ctx.get(), adhoc_storage.id()}; + ADM_update_adhoc_storage_out_t out; + + const auto rpc = endp.call("ADM_update_adhoc_storage", &in, &out); + + if(const auto rv = admire::error_code{out.retval}; !rv) { + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), rv, + out.op_id); + return rv; + } + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + admire::error_code::success, out.op_id); + + return admire::error_code::success; +} + } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 613b960e6113454a81f007f4b691467af140066c..f80a1cfb85b0081d7b418c320347da1b5811b4fe 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -57,6 +57,11 @@ register_adhoc_storage(const server& srv, const std::string& name, enum adhoc_storage::type type, const adhoc_storage::ctx& ctx); +admire::error_code +update_adhoc_storage(const server& srv, + const adhoc_storage::ctx& adhoc_storage_ctx, + const adhoc_storage& adhoc_storage); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index b172e67e023f29e12b804a83020736e6d4007647..3c32c84ee4e4038269089360a8188199484ad28c 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -336,11 +336,31 @@ ADM_update_adhoc_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; + const admire::adhoc_storage::ctx adhoc_storage_ctx(in.adhoc_storage_ctx); + const std::uint64_t server_id(in.server_id); - LOGGER_INFO("ADM_update_adhoc_storage()"); + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_storage_id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + server_id); - out.ret = 0; + auto& adhoc_manager = scord::adhoc_storage_manager::instance(); + const auto ec = adhoc_manager.update(server_id, adhoc_storage_ctx); + + if(!ec) { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error updating adhoc_storage: {}\"", + rpc_id, ec); + } + + out.op_id = rpc_id; + out.retval = ec; + + LOGGER_INFO("rpc id: {} name: {} to: {} => " + "body: {{retval: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp index 64afe54db165a12d3bf2621f196be26454511906..77b09e574e4ceb1905b6a7428b88d95cd9345978 100644 --- a/src/scord/scord.cpp +++ b/src/scord/scord.cpp @@ -224,7 +224,6 @@ main(int argc, char* argv[]) { REGISTER_RPC(ctx, "ADM_input", ADM_input_in_t, ADM_input_out_t, ADM_input, true); - REGISTER_RPC(ctx, "ADM_output", ADM_output_in_t, ADM_output_out_t, ADM_output, true);