diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 8f476ff292f37d206c509367218b10afc21eb88d..04ad737aa507708da6839d879433d03aa873887a 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -25,7 +25,9 @@ #include #include #include +#include #include "common.h" +#include #define NADHOC_NODES 25 #define NINPUTS 10 @@ -96,7 +98,7 @@ main(int argc, char* argv[]) { } // 2. Register the adhoc storage - if(ADM_register_adhoc_storage(server, adhoc_name, ADM_STORAGE_GEKKOFS, + if(ADM_register_adhoc_storage(server, adhoc_name, ADM_STORAGE_DATACLAY, adhoc_ctx, &adhoc_storage) != ADM_SUCCESS) { fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n", ADM_strerror(ret)); @@ -118,7 +120,8 @@ main(int argc, char* argv[]) { } // We can now request the deployment to the server - if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) { + if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != + ADM_SUCCESS) { fprintf(stderr, "ADM_deploy_adhoc_storage() failed: %s\n", ADM_strerror(ret)); goto cleanup; diff --git a/examples/cxx/ADM_deploy_adhoc_storage.cpp b/examples/cxx/ADM_deploy_adhoc_storage.cpp index c23cffd3a506ec9c8710703b2ddc81cf6e363d2e..cc4ae7a23558af9cbd7b305eec038902947e1788 100644 --- a/examples/cxx/ADM_deploy_adhoc_storage.cpp +++ b/examples/cxx/ADM_deploy_adhoc_storage.cpp @@ -24,7 +24,12 @@ #include #include +#include "common.hpp" +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -38,21 +43,32 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_storage_t adhoc_storage{}; - ADM_return_t ret = ADM_SUCCESS; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false}; try { - ret = admire::deploy_adhoc_storage(server, adhoc_storage); - } catch(const std::exception& e) { - fmt::print(stderr, "FATAL: ADM_deploy_adhoc_storage() failed: {}\n", - e.what()); - exit(EXIT_FAILURE); - } + const auto adhoc_storage = admire::register_adhoc_storage( + server, name, admire::storage::type::dataclay, + adhoc_storage_ctx); - if(ret != ADM_SUCCESS) { fmt::print(stdout, - "ADM_deploy_adhoc_storage() remote procedure not completed " + "ADM_register_adhoc_storage() remote procedure completed " "successfully\n"); + + admire::deploy_adhoc_storage(server, adhoc_storage); + + } catch(const std::exception& e) { + fmt::print( + stderr, + "FATAL: ADM_register_adhoc_storage() or ADM_deploy_adhoc_storage() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index f877d7f234fb2f8580a9268dc13757078f9b86d2..b18c19b649a62c5034d534bab6c6df284ecda8e6 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -55,7 +55,7 @@ main(int argc, char* argv[]) { try { const auto adhoc_storage = admire::register_adhoc_storage( - server, name, admire::storage::type::gekkofs, + server, name, admire::storage::type::dataclay, adhoc_storage_ctx); fmt::print(stdout, diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 6eaf0a8e687e35a72985a9f830bcaec1dc31afba..b624c8281bafa00d1c243dac1c89c1e3c64edb86 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -174,6 +174,7 @@ typedef struct adm_pfs_context { const char* c_mount; } adm_pfs_context; + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_pfs_context, // NOLINT @@ -371,9 +372,12 @@ MERCURY_GEN_PROC( ); /// ADM_deploy_adhoc_storage -MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((hg_uint64_t) (id))) -MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, + ((hg_uint64_t) (op_id)) + ((hg_int32_t) (retval)) +); /// ADM_register_pfs_storage MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 98bc455a22e47b832ab93cb81cdfd37efd83fb2c..e3e74f11655525d482433ce5e8e6a28da629bc18 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -283,30 +283,16 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } } -ADM_return_t -deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage) { - - (void) srv; - (void) adhoc_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_deploy_adhoc_storage(...)"); - - ADM_deploy_adhoc_storage_in_t in{}; - ADM_deploy_adhoc_storage_out_t out; - - const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out); +void +deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { + + const auto ec = detail::deploy_adhoc_storage(srv, adhoc_storage); - if(out.ret < 0) { - LOGGER_ERROR("ADM_deploy_adhoc_storage() = {}", out.ret); - return static_cast(out.ret); + if(!ec) { + throw std::runtime_error(fmt::format( + "ADM_deploy_adhoc_storage() error: {}", ec.message())); } - - LOGGER_INFO("ADM_deploy_adhoc_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + } ADM_return_t diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 4e690b182e2b573678662b726f42e071fa2a9e60..980fcb277834831ade37c3efbd4139e9915f9b27 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -69,8 +69,8 @@ update_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage, void remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); -ADM_return_t -deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage); +void +deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); ADM_return_t register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 5cfd5ecab33d751380e056fe961cec8de3c6106e..fd5744e5d9db65f82d89cfc07b4e21fc20f4cb8f 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -125,7 +125,8 @@ ADM_deploy_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) { const admire::server srv{server}; - return admire::deploy_adhoc_storage(srv, adhoc_storage); + return admire::detail::deploy_adhoc_storage( + srv, admire::adhoc_storage{adhoc_storage}); } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 6e5bd92210a74e349a669135b5e1cc9cbf055274..e2d84d41d55ed3febaa4b2b644536a143be111f3 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -357,6 +357,41 @@ register_adhoc_storage(const server& srv, const std::string& name, return rpc_adhoc_storage; } +admire::error_code +deploy_adhoc_storage(const server& srv, + const adhoc_storage& adhoc_storage) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_storage: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), adhoc_storage); + + ADM_deploy_adhoc_storage_in_t in{adhoc_storage.id()}; + ADM_deploy_adhoc_storage_out_t out; + + const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out); + + if(const auto rv = admire::error_code{out.retval}; !rv) { + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), rv, out.op_id); + return rv; + } + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}}] [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + admire::error_code::success, out.op_id); + + return admire::error_code::success; +} + tl::expected transfer_datasets(const server& srv, const job& job, const std::vector& sources, diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 03bf2252da4e5b93f1c99c650ef0647d4dbfed7f..51aa88e981649703e0954eca699ce5717a534011 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -65,6 +65,10 @@ update_adhoc_storage(const server& srv, admire::error_code remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +admire::error_code +deploy_adhoc_storage(const server& srv, + const adhoc_storage& adhoc_storage); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 9ac3c96bd769848291301a1c652fd97a57415dfe..8bc7ef8190466f97f1ce2649eadfdf35aa263572 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -31,6 +31,10 @@ #include "job_manager.hpp" #include "adhoc_storage_manager.hpp" +// Process running +#include +#include + struct remote_procedure { static std::uint64_t new_id() { @@ -439,11 +443,97 @@ ADM_deploy_adhoc_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; - LOGGER_INFO("ADM_deploy_adhoc_storage()"); + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + in.id); - out.ret = 0; + auto ec = admire::error_code::success; + auto& adhoc_manager = scord::adhoc_storage_manager::instance(); + + if(const auto am_result = adhoc_manager.find(in.id); + am_result.has_value()) { + const auto& storage_info = am_result.value(); + const auto adhoc_storage = storage_info->adhoc_storage(); + ec = admire::error_code::success; + if(adhoc_storage.type() == admire::storage::type::gekkofs) { + const auto adhoc_ctx = + (admire::adhoc_storage::ctx*) adhoc_storage.context().get(); + /* Number of nodes */ + const std::string nodes = + std::to_string(adhoc_ctx->resources().nodes().size()); + + /* Walltime */ + const std::string walltime = std::to_string(adhoc_ctx->walltime()); + + /* Launch script */ + switch(const auto pid = fork()) { + case 0: { + std::vector args; + args.push_back("gkfs"); + // args.push_back("-c"); + // args.push_back("gkfs.conf"); + args.push_back("-n"); + args.push_back(nodes.c_str()); + // args.push_back("-w"); + // args.push_back(walltime.c_str()); + args.push_back("--srun"); + args.push_back("start"); + args.push_back(NULL); + std::vector env; + env.push_back(NULL); + + execvpe("gkfs", const_cast(args.data()), + const_cast(env.data())); + LOGGER_INFO( + "ADM_deploy_adhoc_storage() script didn't execute"); + exit(EXIT_FAILURE); + break; + } + case -1: { + ec = admire::error_code::other; + LOGGER_ERROR("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + rpc_id, std::quoted(__FUNCTION__), + std::quoted(get_address(h)), ec); + break; + } + default: { + int wstatus = 0; + pid_t retwait = waitpid(pid, &wstatus, 0); + if(retwait == -1) { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error waitpid code: {}\"", + rpc_id, retwait); + ec = admire::error_code::other; + } else { + if(WEXITSTATUS(wstatus) != 0) { + ec = admire::error_code::other; + } else { + ec = admire::error_code::success; + } + } + break; + } + } + } + + } else { + ec = am_result.error(); + LOGGER_ERROR("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + rpc_id, std::quoted(__FUNCTION__), + std::quoted(get_address(h)), ec); + } + + out.retval = ec; + + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS);