From 1e3790b1ea531d3af0977a6bd43f94b0ae24fdcd Mon Sep 17 00:00:00 2001 From: amanzano Date: Wed, 7 Sep 2022 10:06:00 +0200 Subject: [PATCH 01/17] Rebase changes done --- examples/cxx/ADM_register_adhoc_storage.cpp | 7 ++++- src/common/api/admire_types.hpp | 9 ++++++ src/common/api/types.cpp | 19 +++++++++++- src/common/net/proto/rpc_types.h | 4 +-- src/lib/admire.cpp | 28 +++++------------- src/lib/admire.h | 3 +- src/lib/admire.hpp | 5 ++++ src/lib/c_wrapper.cpp | 14 +++++++-- src/lib/detail/impl.cpp | 32 ++++++++++++++++++++- src/lib/detail/impl.hpp | 4 +++ src/scord/rpc_handlers.cpp | 25 ++++++++++++---- 11 files changed, 116 insertions(+), 34 deletions(-) diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 50fe30bd..2d2c88fa 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -25,6 +25,9 @@ #include #include +#define NINPUTS 10 +#define NOUTPUTS 5 + int main(int argc, char* argv[]) { @@ -38,12 +41,14 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + ADM_job_t job{}; + std::string id; ADM_adhoc_context_t ctx{}; ADM_storage_t adhoc_storage{}; ADM_return_t ret = ADM_SUCCESS; try { - ret = admire::register_adhoc_storage(server, ctx, &adhoc_storage); + ret = admire::register_adhoc_storage(server, job, id, ctx, &adhoc_storage); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", e.what()); diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 396006a8..d2918956 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -301,6 +301,11 @@ struct adhoc_storage : public storage { adhoc_storage(enum storage::type type, std::string id, ADM_adhoc_context_t ctx); adhoc_storage(const adhoc_storage& other) noexcept; + /*adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, + ADM_adhoc_context_t ctx); + adhoc_storage(enum storage::type type, std::string id, + const admire::adhoc_storage::ctx& ctx);*/ + adhoc_storage(adhoc_storage&&) noexcept = default; adhoc_storage& operator=(const adhoc_storage&) noexcept; @@ -308,6 +313,10 @@ struct adhoc_storage : public storage { operator=(adhoc_storage&&) noexcept = default; ~adhoc_storage() override; + + std::uint64_t + id() const; + std::shared_ptr context() const final; diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 85636c77..355c213b 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1112,8 +1112,12 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { + static std::uint64_t generate_id() { + return 42; + } + public: - explicit impl(adhoc_storage::ctx ctx) : m_ctx(std::move(ctx)) {} + explicit impl(adhoc_storage::ctx ctx) : m_id(generate_id()), m_ctx(std::move(ctx)) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& @@ -1121,12 +1125,15 @@ public: impl& operator=(impl&&) noexcept = default; + std::uint64_t id() const { return m_id; } + adhoc_storage::ctx context() const { return m_ctx; } private: + std::uint64_t m_id; adhoc_storage::ctx m_ctx; }; @@ -1154,6 +1161,16 @@ adhoc_storage::operator=(const adhoc_storage& other) noexcept { return *this; } +/*adhoc_storage::adhoc_storage(enum storage::type type, std::string id, const adhoc_storage::ctx& ctx) + : storage(type, std::move(id)), + m_pimpl(std::make_unique(ctx)) {}*/ + +std::uint64_t +adhoc_storage::id() const { + return m_pimpl->id(); +} + + std::shared_ptr adhoc_storage::context() const { return std::make_shared(m_pimpl->context()); diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 3f60cc22..e37972ea 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -296,9 +296,9 @@ MERCURY_GEN_PROC( ); /// ADM_register_adhoc_storage -MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((ADM_job_t) (job))((hg_const_string_t) (id))((ADM_adhoc_context_t)(ctx))); -MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval))((ADM_storage_t)(adhoc_storage))); /// ADM_update_adhoc_storage MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 78e2007b..be97b69f 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -232,31 +232,17 @@ remove_job(const server& srv, const job& job) { } ADM_return_t -register_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, - ADM_storage_t* adhoc_storage) { +register_adhoc_storage(const server& srv, const job& job, const std::string& id, + const adhoc_storage::ctx& ctx) { - (void) srv; - (void) ctx; - (void) adhoc_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_register_adhoc_storage(...)"); + const auto rv = detail::register_adhoc_storage(srv, job, id, ctx); - ADM_register_adhoc_storage_in_t in{}; - ADM_register_adhoc_storage_out_t out; - - const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_register_adhoc_storage() = {}", out.ret); - return static_cast(out.ret); + if(!rv) { + throw std::runtime_error(fmt::format("ADM_register_adhoc_storage() error: {}", + ADM_strerror(rv.error()))); } - LOGGER_INFO("ADM_register_adhoc_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + return rv.value(); } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index ea9008d7..eb976c72 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -94,7 +94,8 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); * successfully. */ ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_adhoc_context_t ctx, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, std::string id, + ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 52ffb122..02055360 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -57,8 +57,13 @@ ADM_return_t remove_job(const server& srv, const job& job); ADM_return_t +<<<<<<< HEAD register_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage); +======= +register_adhoc_storage(const server& srv, const job& job, const std::string& id, + const adhoc_storage::ctx& ctx); +>>>>>>> 3eee4b5 (Files updated, some errors fixed) ADM_return_t update_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index cd58fbc4..03d73163 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -79,12 +79,22 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) { } ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_adhoc_context_t ctx, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, std::string id, + ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage) { const admire::server srv{server}; - return admire::register_adhoc_storage(srv, ctx, adhoc_storage); + const auto rv = + admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); + + if(!rv) { + return rv.error(); + } + + *adhoc_storage = admire::api::convert(*rv).release(); + + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 07ba498c..bdb8bf88 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -307,6 +307,37 @@ remove_job(const server& srv, const job& job) { return ADM_SUCCESS; } +tl::expected +register_adhoc_storage(const server& srv, const job& job, const std::string& id, + const adhoc_storage::ctx& ctx) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job); + + const auto rpc_job = api::convert(job); + const auto rpc_id = id.c_str(); + const auto rpc_ctx = api::convert(ctx); + + //ADM_register_adhoc_storage_in_t in{rpc_job.get()}; + ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_id, rpc_ctx.get()}; + ADM_register_adhoc_storage_out_t out; + + endp.call("ADM_register_adhoc_storage", &in, &out); + + if(out.retval < 0) { + const auto retval = static_cast(out.retval); + LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + return retval; + } + + LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + return ADM_SUCCESS; + +} + tl::expected transfer_datasets(const server& srv, const job& job, const std::vector& sources, @@ -356,5 +387,4 @@ transfer_datasets(const server& srv, const job& job, return tx; } - } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index bf524d89..662f4e47 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -50,6 +50,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +tl::expected +register_adhoc_storage(const server& srv, const job& job, const std::string& id, + const adhoc_storage::ctx& ctx); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 12efe2b5..2f2d476d 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -219,14 +219,29 @@ ADM_register_adhoc_storage(hg_handle_t h) { [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); - ret = margo_get_input(h, &in); - assert(ret == HG_SUCCESS); + const admire::job job(in.job); + const std::string id(in.id); + const admire::adhoc_storage::ctx ctx(in.ctx); + // const admire::job_requirements reqs(&in.reqs); - out.ret = -1; + const auto server_id = remote_procedure::new_id(); + LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", server_id, __FUNCTION__, job); - LOGGER_INFO("ADM_register_adhoc_storage()"); + const auto adhoc_storage = admire::adhoc_storage( + admire::adhoc_storage::type::gekkofs, id, ctx); //ctx.get() - out.ret = 0; + + uint64_t server_id = adhoc_storage.id(); + + // admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::in_job_shared, admire::adhoc_storage::access_type::write_only, 10, 10, false} + + admire::error_code rv = ADM_SUCCESS; + + out.retval = rv; + out.adhoc_storage = admire::api::convert(adhoc_storage).get(); + + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", id, + __FUNCTION__, rv, adhoc_storage); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab From 0a53bb0a28149a912a14fd55b2a1c3dc98908cb0 Mon Sep 17 00:00:00 2001 From: amanzano Date: Wed, 7 Sep 2022 10:08:45 +0200 Subject: [PATCH 02/17] More rebase changes done --- src/common/api/admire_types.hpp | 4 +++- src/common/api/convert.cpp | 4 ++-- src/lib/admire.cpp | 2 +- src/lib/admire.h | 1 + src/lib/admire.hpp | 7 +------ src/scord/rpc_handlers.cpp | 7 ++++--- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index d2918956..cc806d8d 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -303,6 +303,8 @@ struct adhoc_storage : public storage { adhoc_storage(const adhoc_storage& other) noexcept; /*adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, ADM_adhoc_context_t ctx); + //adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, + //ADM_adhoc_context_t ctx); adhoc_storage(enum storage::type type, std::string id, const admire::adhoc_storage::ctx& ctx);*/ @@ -608,7 +610,7 @@ struct fmt::formatter : formatter { const auto str = fmt::format("{{type: {}, id: {}, context: {}}}", s.type(), - std::quoted(s.id()), + std::quoted(std::to_string(s.id())), (pctx ? fmt::format("{}", *pctx) : "NULL")); return formatter::format(str, ctx); } diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 591b56e9..a16e604a 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -70,10 +70,10 @@ convert(const admire::adhoc_storage& st) { auto managed_ctx = convert(*std::static_pointer_cast( - st.context())); + st.context())); ADM_storage_t c_st = ADM_storage_create( - st.id().c_str(), static_cast(st.type()), + (std::to_string(st.id())).c_str(), static_cast(st.type()), managed_ctx.get()); return managed_ctype{c_st, std::move(managed_ctx)}; diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index be97b69f..a247cfc2 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -231,7 +231,7 @@ remove_job(const server& srv, const job& job) { return detail::remove_job(srv, job); } -ADM_return_t +admire::adhoc_storage register_adhoc_storage(const server& srv, const job& job, const std::string& id, const adhoc_storage::ctx& ctx) { diff --git a/src/lib/admire.h b/src/lib/admire.h index eb976c72..354b9d25 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -28,6 +28,7 @@ #include #include #include +#include #include "admire_types.h" #ifdef __cplusplus diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 02055360..65e934b9 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -56,14 +56,9 @@ update_job(const server& srv, const job&, const job_requirements& reqs); ADM_return_t remove_job(const server& srv, const job& job); -ADM_return_t -<<<<<<< HEAD -register_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, - ADM_storage_t* adhoc_storage); -======= +admire::adhoc_storage register_adhoc_storage(const server& srv, const job& job, const std::string& id, const adhoc_storage::ctx& ctx); ->>>>>>> 3eee4b5 (Files updated, some errors fixed) ADM_return_t update_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 2f2d476d..7cc78487 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -224,14 +224,15 @@ ADM_register_adhoc_storage(hg_handle_t h) { const admire::adhoc_storage::ctx ctx(in.ctx); // const admire::job_requirements reqs(&in.reqs); - const auto server_id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", server_id, __FUNCTION__, job); + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", rpc_id, __FUNCTION__, job); const auto adhoc_storage = admire::adhoc_storage( admire::adhoc_storage::type::gekkofs, id, ctx); //ctx.get() uint64_t server_id = adhoc_storage.id(); + LOGGER_INFO("RPC ID {} ({}) <= {{server_id: {{{}}}}}", rpc_id, __FUNCTION__, server_id); //new // admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::in_job_shared, admire::adhoc_storage::access_type::write_only, 10, 10, false} @@ -240,7 +241,7 @@ ADM_register_adhoc_storage(hg_handle_t h) { out.retval = rv; out.adhoc_storage = admire::api::convert(adhoc_storage).get(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", id, + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", rpc_id, __FUNCTION__, rv, adhoc_storage); ret = margo_respond(h, &out); -- GitLab From 7f583b6cce330b07e96b408074336fc8e0323424 Mon Sep 17 00:00:00 2001 From: amanzano Date: Wed, 14 Sep 2022 18:20:03 +0200 Subject: [PATCH 03/17] c examples updated --- examples/c/ADM_deploy_adhoc_storage.c | 26 ++++++++++++++++++++++++- examples/c/ADM_register_adhoc_storage.c | 13 ++++++++++++- examples/c/ADM_remove_adhoc_storage.c | 26 ++++++++++++++++++++++++- examples/c/ADM_update_adhoc_storage.c | 26 ++++++++++++++++++++++++- 4 files changed, 87 insertions(+), 4 deletions(-) diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 73a0e1ac..68386337 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -26,6 +26,10 @@ #include #include #include +#include "common.h" + +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -39,6 +43,12 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; + ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + assert(inputs); + ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + assert(outputs); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); assert(ctx); @@ -46,9 +56,23 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_requirements_t reqs = + ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); + assert(reqs); + + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } + + const char* id = "id"; + ADM_storage_t adhoc_storage; - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index c1740b7e..50ceaa25 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -43,6 +43,7 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); @@ -59,9 +60,19 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } + + const char* id = "id"; + ADM_storage_t adhoc_storage; - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index adb4a1f4..92fb7359 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -26,6 +26,10 @@ #include #include #include +#include "common.h" + +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -39,6 +43,12 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; + ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + assert(inputs); + ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + assert(outputs); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); assert(ctx); @@ -46,8 +56,22 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_requirements_t reqs = + ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); + assert(reqs); + + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } + + const char* id = "id"; + ADM_storage_t adhoc_storage; - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index 2b71046a..96af3778 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -26,6 +26,10 @@ #include #include #include +#include "common.h" + +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -39,6 +43,12 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; + ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + assert(inputs); + ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + assert(outputs); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); assert(ctx); @@ -46,8 +56,22 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_requirements_t reqs = + ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); + assert(reqs); + + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } + + const char* id = "id"; + ADM_storage_t adhoc_storage; - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, -- GitLab From 1eb52a0435d563a422f2214583340f972f8d95d4 Mon Sep 17 00:00:00 2001 From: amanzano Date: Fri, 16 Sep 2022 10:11:26 +0200 Subject: [PATCH 04/17] cxx examples udpated --- examples/cxx/ADM_register_adhoc_storage.cpp | 19 +++++++++++++++---- examples/cxx/ADM_register_job.cpp | 2 +- examples/cxx/ADM_transfer_datasets.cpp | 2 +- examples/cxx/ADM_update_job.cpp | 6 ++++-- 4 files changed, 21 insertions(+), 8 deletions(-) diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 2d2c88fa..0e59a1c9 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -24,6 +24,7 @@ #include #include +#include "common.hpp" #define NINPUTS 10 #define NOUTPUTS 5 @@ -41,14 +42,24 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_job_t job{}; + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + auto p = std::make_unique( + admire::storage::type::gekkofs, "foobar", 52, + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, 42, 100, false); + + admire::job_requirements reqs(inputs, outputs, std::move(p)); + std::string id; - ADM_adhoc_context_t ctx{}; - ADM_storage_t adhoc_storage{}; + //ADM_storage_t adhoc_storage{}; + const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false}; ADM_return_t ret = ADM_SUCCESS; try { - ret = admire::register_adhoc_storage(server, job, id, ctx, &adhoc_storage); + [[maybe_unused]] const auto job = admire::register_job(server, reqs); + const auto adhoc_storage = admire::register_adhoc_storage(server, job, id, adhoc_storage_ctx); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", e.what()); diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index bec2e5ed..cc49615e 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -44,7 +44,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", 52, admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 7d39e932..d8837536 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -52,7 +52,7 @@ main(int argc, char* argv[]) { const auto mapping = admire::transfer::mapping::n_to_n; auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", 42, admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index a1156911..1d24cb19 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -44,10 +44,12 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", 42, admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); + //const auto adhoc_storage = admire::register_adhoc_storage(server, job, "foobar", adhoc_storage_ctx); + admire::job_requirements reqs{inputs, outputs, std::move(p)}; @@ -56,7 +58,7 @@ main(int argc, char* argv[]) { prepare_datasets("output-new-dataset-{}", NOUTPUTS); auto p2 = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", 42, admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); -- GitLab From 9d032dbbb43eed6329d7d0ae0b717fb165fa8ef0 Mon Sep 17 00:00:00 2001 From: amanzano Date: Fri, 16 Sep 2022 10:13:08 +0200 Subject: [PATCH 05/17] type files updated --- src/common/api/admire_types.hpp | 20 ++++++----- src/common/api/types.cpp | 61 ++++++++++++++++++++------------ src/common/net/proto/rpc_types.h | 3 +- 3 files changed, 52 insertions(+), 32 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index cc806d8d..4652190f 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -236,7 +236,7 @@ struct storage { virtual ~ctx() = default; }; - storage(storage::type type, std::string id); + storage(storage::type type, std::string id, std::uint64_t server_id); virtual ~storage() = default; @@ -245,12 +245,14 @@ struct storage { type type() const; + virtual std::shared_ptr context() const = 0; protected: std::string m_id; enum type m_type; + std::uint64_t m_server_id; }; struct adhoc_storage : public storage { @@ -293,19 +295,19 @@ struct adhoc_storage : public storage { std::uint32_t m_walltime; bool m_should_flush; }; - - adhoc_storage(enum storage::type type, std::string id, + + adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush); - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, ADM_adhoc_context_t ctx); adhoc_storage(const adhoc_storage& other) noexcept; - /*adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, - ADM_adhoc_context_t ctx); + adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, + const admire::adhoc_storage::ctx& ctx); //adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, //ADM_adhoc_context_t ctx); - adhoc_storage(enum storage::type type, std::string id, + /*adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, const admire::adhoc_storage::ctx& ctx);*/ adhoc_storage(adhoc_storage&&) noexcept = default; @@ -342,9 +344,9 @@ struct pfs_storage : public storage { std::filesystem::path m_mount_point; }; - pfs_storage(enum storage::type type, std::string id, + pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, std::filesystem::path mount_point); - pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx); + pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, ADM_pfs_context_t ctx); pfs_storage(const pfs_storage& other) noexcept; pfs_storage(pfs_storage&&) noexcept = default; pfs_storage& diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 355c213b..48d6c6e0 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1059,8 +1059,9 @@ dataset::id() const { } -storage::storage(enum storage::type type, std::string id) - : m_id(std::move(id)), m_type(type) {} +storage::storage(enum storage::type type, std::string id, + std::uint64_t server_id) + : m_id(std::move(id)), m_type(type), m_server_id(server_id) {} std::string storage::id() const { @@ -1072,6 +1073,7 @@ storage::type() const { return m_type; } + adhoc_storage::ctx::ctx(adhoc_storage::execution_mode exec_mode, adhoc_storage::access_type access_type, std::uint32_t nodes, std::uint32_t walltime, @@ -1112,12 +1114,14 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { - static std::uint64_t generate_id() { + static std::uint64_t + generate_id() { return 42; } public: - explicit impl(adhoc_storage::ctx ctx) : m_id(generate_id()), m_ctx(std::move(ctx)) {} + explicit impl(adhoc_storage::ctx ctx) + : m_id(generate_id()), m_ctx(std::move(ctx)) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& @@ -1125,7 +1129,10 @@ public: impl& operator=(impl&&) noexcept = default; - std::uint64_t id() const { return m_id; } + std::uint64_t + id() const { + return m_id; + } adhoc_storage::ctx context() const { @@ -1139,20 +1146,33 @@ private: adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - execution_mode exec_mode, access_type access_type, - std::uint32_t nodes, std::uint32_t walltime, - bool should_flush) - : storage(type, std::move(id)), + std::uint64_t server_id, execution_mode exec_mode, + access_type access_type, std::uint32_t nodes, + std::uint32_t walltime, bool should_flush) + : storage(type, std::move(id), server_id), m_pimpl(std::make_unique(adhoc_storage::ctx{ exec_mode, access_type, nodes, walltime, should_flush})) {} adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - ADM_adhoc_context_t ctx) - : storage(type, std::move(id)), + std::uint64_t server_id, ADM_adhoc_context_t ctx) + : storage(type, std::move(id), server_id), m_pimpl(std::make_unique(adhoc_storage::ctx{ctx})) {} +/* +adhoc_storage::adhoc_storage(enum storage::type type, std::string id, + std::uint64_t server_id, + const adhoc_storage::ctx& ctx) + : storage(type, std::move(id), server_id), + m_pimpl(std::make_unique(ctx)) {}*/ + +adhoc_storage::adhoc_storage(enum storage::type type, std::string id, + std::uint64_t server_id, + const adhoc_storage::ctx& ctx) + : storage(type, std::move(id), server_id), + m_pimpl(std::make_unique(ctx)) {} // este es el nuevo añadido + adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept - : storage(other.m_type, other.m_id), + : storage(other.m_type, other.m_id, other.m_server_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} adhoc_storage& @@ -1161,10 +1181,6 @@ adhoc_storage::operator=(const adhoc_storage& other) noexcept { return *this; } -/*adhoc_storage::adhoc_storage(enum storage::type type, std::string id, const adhoc_storage::ctx& ctx) - : storage(type, std::move(id)), - m_pimpl(std::make_unique(ctx)) {}*/ - std::uint64_t adhoc_storage::id() const { return m_pimpl->id(); @@ -1184,7 +1200,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : storage(other.m_type, other.m_id), + : storage(other.m_type, other.m_id, other.m_server_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} pfs_storage& @@ -1218,15 +1234,15 @@ private: pfs_storage::ctx m_ctx; }; -pfs_storage::pfs_storage(enum storage::type type, std::string id, +pfs_storage::pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, std::filesystem::path mount_point) - : storage(type, std::move(id)), + : storage(type, std::move(id), server_id), m_pimpl(std::make_unique( pfs_storage::ctx{std::move(mount_point)})) {} -pfs_storage::pfs_storage(enum storage::type type, std::string id, +pfs_storage::pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, ADM_pfs_context_t ctx) - : storage(type, std::move(id)), + : storage(type, std::move(id), server_id), m_pimpl(std::make_unique(pfs_storage::ctx{ctx})) {} pfs_storage::~pfs_storage() = default; @@ -1274,6 +1290,7 @@ public: static_cast( reqs->r_storage->s_type), reqs->r_storage->s_id, + reqs->r_storage->s_server_id, reqs->r_storage->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: @@ -1281,7 +1298,7 @@ public: m_storage = std::make_unique( static_cast( reqs->r_storage->s_type), - reqs->r_storage->s_id, reqs->r_storage->s_pfs_ctx); + reqs->r_storage->s_id, reqs->r_storage->s_server_id, reqs->r_storage->s_pfs_ctx); break; } } diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index e37972ea..e05b83e1 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -179,6 +179,7 @@ MERCURY_GEN_STRUCT_PROC( typedef struct adm_storage { const char* s_id; ADM_storage_type_t s_type; + uint64_t s_server_id; union { ADM_adhoc_context_t s_adhoc_ctx; ADM_pfs_context_t s_pfs_ctx; @@ -298,7 +299,7 @@ MERCURY_GEN_PROC( /// ADM_register_adhoc_storage MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((ADM_job_t) (job))((hg_const_string_t) (id))((ADM_adhoc_context_t)(ctx))); -MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval))((ADM_storage_t)(adhoc_storage))); +MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval))); /// ADM_update_adhoc_storage MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) -- GitLab From 04e58352d85285d50484707be62337271cfdf564 Mon Sep 17 00:00:00 2001 From: amanzano Date: Fri, 16 Sep 2022 10:14:34 +0200 Subject: [PATCH 06/17] lib files updated --- src/lib/admire.cpp | 4 ++++ src/lib/admire.h | 3 +-- src/lib/c_wrapper.cpp | 8 ++++++-- src/lib/detail/impl.cpp | 13 +++++++------ 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index a247cfc2..cba6a03b 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -219,6 +219,7 @@ register_job(const server& srv, const job_requirements& reqs) { } return rv.value(); + } ADM_return_t @@ -234,6 +235,9 @@ remove_job(const server& srv, const job& job) { admire::adhoc_storage register_adhoc_storage(const server& srv, const job& job, const std::string& id, const adhoc_storage::ctx& ctx) { + //return detail::register_adhoc_storage(srv, job, id, ctx); + + const auto rv = detail::register_adhoc_storage(srv, job, id, ctx); diff --git a/src/lib/admire.h b/src/lib/admire.h index 354b9d25..b0061b29 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -28,7 +28,6 @@ #include #include #include -#include #include "admire_types.h" #ifdef __cplusplus @@ -95,7 +94,7 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); * successfully. */ ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, std::string id, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id, ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 03d73163..a37e7997 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -79,12 +79,16 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) { } ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, std::string id, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id, ADM_adhoc_context_t ctx, - ADM_storage_t* adhoc_storage) { + ADM_storage_t* adhoc_storage) { const admire::server srv{server}; + //return admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); + + + const auto rv = admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index bdb8bf88..eabb3703 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -325,17 +325,18 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id, ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_id, rpc_ctx.get()}; ADM_register_adhoc_storage_out_t out; - endp.call("ADM_register_adhoc_storage", &in, &out); + const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); if(out.retval < 0) { - const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); - return retval; + LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval); + return tl::make_unexpected(static_cast(out.retval)); } + const auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, id, 64, ctx}; + LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); - return ADM_SUCCESS; - + + return rpc_adhoc_storage; } tl::expected -- GitLab From fc6525ee8f2bc9a524967b4ffad66b8469b11f61 Mon Sep 17 00:00:00 2001 From: amanzano Date: Fri, 16 Sep 2022 10:16:42 +0200 Subject: [PATCH 07/17] rpc_handlers.cpp updated --- src/scord/rpc_handlers.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 7cc78487..f02d6245 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -228,7 +228,7 @@ ADM_register_adhoc_storage(hg_handle_t h) { LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", rpc_id, __FUNCTION__, job); const auto adhoc_storage = admire::adhoc_storage( - admire::adhoc_storage::type::gekkofs, id, ctx); //ctx.get() + admire::adhoc_storage::type::gekkofs, id, 64, ctx); //ctx.get() uint64_t server_id = adhoc_storage.id(); @@ -239,10 +239,13 @@ ADM_register_adhoc_storage(hg_handle_t h) { admire::error_code rv = ADM_SUCCESS; out.retval = rv; + /* out.adhoc_storage = admire::api::convert(adhoc_storage).get(); LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", rpc_id, - __FUNCTION__, rv, adhoc_storage); + __FUNCTION__, rv, adhoc_storage); */ + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}}}", rpc_id, + __FUNCTION__, rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab From 7c8e72c32bdb371dbbbf21ab80b2a8a1d31287c0 Mon Sep 17 00:00:00 2001 From: amanzano Date: Fri, 16 Sep 2022 15:50:42 +0200 Subject: [PATCH 08/17] Execution of tests fixed --- examples/c/ADM_deploy_adhoc_storage.c | 1 + examples/c/ADM_register_adhoc_storage.c | 2 ++ examples/c/ADM_remove_adhoc_storage.c | 1 + examples/c/ADM_update_adhoc_storage.c | 2 ++ examples/cxx/ADM_transfer_datasets.cpp | 2 +- src/lib/c_wrapper.cpp | 1 + src/scord/rpc_handlers.cpp | 6 +++++- 7 files changed, 13 insertions(+), 2 deletions(-) diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 68386337..de719dc7 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -73,6 +73,7 @@ main(int argc, char* argv[]) { ADM_storage_t adhoc_storage; ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); + //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index 50ceaa25..6b6eea60 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -73,6 +73,8 @@ main(int argc, char* argv[]) { ADM_storage_t adhoc_storage; ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); + //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); + if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index 92fb7359..9d1f89df 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -72,6 +72,7 @@ main(int argc, char* argv[]) { ADM_storage_t adhoc_storage; ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); + //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index 96af3778..df947cdb 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -66,12 +66,14 @@ main(int argc, char* argv[]) { fprintf(stdout, "ADM_register_job() remote procedure not completed " "successfully\n"); exit_status = EXIT_FAILURE; + goto cleanup; } const char* id = "id"; ADM_storage_t adhoc_storage; ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); + //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index d8837536..8b0a671e 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -52,7 +52,7 @@ main(int argc, char* argv[]) { const auto mapping = admire::transfer::mapping::n_to_n; auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", 42, + admire::storage::type::gekkofs, "foobar", 52, admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index a37e7997..983fb9b6 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -92,6 +92,7 @@ ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id, const auto rv = admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); + if(!rv) { return rv.error(); } diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index f02d6245..e8f6ae04 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -219,6 +219,9 @@ ADM_register_adhoc_storage(hg_handle_t h) { [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + const admire::job job(in.job); const std::string id(in.id); const admire::adhoc_storage::ctx ctx(in.ctx); @@ -232,7 +235,8 @@ ADM_register_adhoc_storage(hg_handle_t h) { uint64_t server_id = adhoc_storage.id(); - LOGGER_INFO("RPC ID {} ({}) <= {{server_id: {{{}}}}}", rpc_id, __FUNCTION__, server_id); //new + LOGGER_WARN("server_id: {}", server_id); + // LOGGER_INFO("RPC ID {} ({}) <= {{server_id: {{{}}}}}", rpc_id, __FUNCTION__, server_id); //new // admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::in_job_shared, admire::adhoc_storage::access_type::write_only, 10, 10, false} -- GitLab From c2ee36c1f1065ae0686cdb2e8ba77498681fb73a Mon Sep 17 00:00:00 2001 From: amanzano Date: Thu, 22 Sep 2022 12:55:51 +0200 Subject: [PATCH 09/17] cxx examples updated --- examples/cxx/ADM_register_adhoc_storage.cpp | 4 +++- examples/cxx/ADM_register_job.cpp | 2 +- examples/cxx/ADM_transfer_datasets.cpp | 2 +- examples/cxx/ADM_update_job.cpp | 4 ++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 0e59a1c9..f3583acd 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -45,8 +45,10 @@ main(int argc, char* argv[]) { const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + //const auto server_id = admire::adhoc_storage::impl::generate_id(); + auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", 52, + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index cc49615e..b0f98864 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -44,7 +44,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", 52, + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 8b0a671e..a6bf48a3 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -52,7 +52,7 @@ main(int argc, char* argv[]) { const auto mapping = admire::transfer::mapping::n_to_n; auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", 52, + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index 1d24cb19..789c69f9 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -44,7 +44,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", 42, + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); @@ -58,7 +58,7 @@ main(int argc, char* argv[]) { prepare_datasets("output-new-dataset-{}", NOUTPUTS); auto p2 = std::make_unique( - admire::storage::type::gekkofs, "foobar", 42, + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); -- GitLab From 73cdd8c2c0928f3d27bfc23a5783cd15443ed04b Mon Sep 17 00:00:00 2001 From: amanzano Date: Thu, 22 Sep 2022 12:59:02 +0200 Subject: [PATCH 10/17] types updated, declaration of server_id removed --- src/common/api/admire_types.hpp | 14 +++++----- src/common/api/types.cpp | 46 +++++++++++++++++--------------- src/common/net/proto/rpc_types.h | 2 +- src/lib/detail/impl.cpp | 2 +- src/scord/rpc_handlers.cpp | 5 ++-- 5 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 4652190f..7a4bd56d 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -236,7 +236,7 @@ struct storage { virtual ~ctx() = default; }; - storage(storage::type type, std::string id, std::uint64_t server_id); + storage(storage::type type, std::string id);//std::uint64_t server_id); virtual ~storage() = default; @@ -252,7 +252,7 @@ struct storage { protected: std::string m_id; enum type m_type; - std::uint64_t m_server_id; + //std::uint64_t m_server_id; }; struct adhoc_storage : public storage { @@ -296,14 +296,14 @@ struct adhoc_storage : public storage { bool m_should_flush; }; - adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, + adhoc_storage(enum storage::type type, std::string id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush); - adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, + adhoc_storage(enum storage::type type, std::string id, ADM_adhoc_context_t ctx); adhoc_storage(const adhoc_storage& other) noexcept; - adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, + adhoc_storage(enum storage::type type, std::string id, const admire::adhoc_storage::ctx& ctx); //adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, //ADM_adhoc_context_t ctx); @@ -344,9 +344,9 @@ struct pfs_storage : public storage { std::filesystem::path m_mount_point; }; - pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, + pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id, std::filesystem::path mount_point); - pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, ADM_pfs_context_t ctx); + pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx); //std::uint64_t server_id, ADM_pfs_context_t ctx); pfs_storage(const pfs_storage& other) noexcept; pfs_storage(pfs_storage&&) noexcept = default; pfs_storage& diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 48d6c6e0..8ed6bc60 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1059,9 +1059,9 @@ dataset::id() const { } -storage::storage(enum storage::type type, std::string id, - std::uint64_t server_id) - : m_id(std::move(id)), m_type(type), m_server_id(server_id) {} +storage::storage(enum storage::type type, std::string id) + //std::uint64_t server_id) + : m_id(std::move(id)), m_type(type) {}//, m_server_id(server_id) {} std::string storage::id() const { @@ -1114,13 +1114,15 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { + static std::uint64_t generate_id() { - return 42; + static std::atomic_uint64_t s_current_server_id = 0; + return s_current_server_id++; } public: - explicit impl(adhoc_storage::ctx ctx) + explicit impl(adhoc_storage::ctx ctx) //, std::uint64_t counter) : m_id(generate_id()), m_ctx(std::move(ctx)) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; @@ -1145,17 +1147,17 @@ private: }; -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - std::uint64_t server_id, execution_mode exec_mode, +adhoc_storage::adhoc_storage(enum storage::type type, std::string id, //std::uint64_t server_id, + execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush) - : storage(type, std::move(id), server_id), + : storage(type, std::move(id)),//, server_id), m_pimpl(std::make_unique(adhoc_storage::ctx{ exec_mode, access_type, nodes, walltime, should_flush})) {} -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - std::uint64_t server_id, ADM_adhoc_context_t ctx) - : storage(type, std::move(id), server_id), +adhoc_storage::adhoc_storage(enum storage::type type, std::string id, //std::uint64_t server_id, + ADM_adhoc_context_t ctx) + : storage(type, std::move(id)), //, server_id), m_pimpl(std::make_unique(adhoc_storage::ctx{ctx})) {} /* @@ -1166,13 +1168,13 @@ adhoc_storage::adhoc_storage(enum storage::type type, std::string id, m_pimpl(std::make_unique(ctx)) {}*/ adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - std::uint64_t server_id, + //std::uint64_t server_id, const adhoc_storage::ctx& ctx) - : storage(type, std::move(id), server_id), + : storage(type, std::move(id)),//, server_id), m_pimpl(std::make_unique(ctx)) {} // este es el nuevo añadido adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept - : storage(other.m_type, other.m_id, other.m_server_id), + : storage(other.m_type, other.m_id), //other.m_server_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} adhoc_storage& @@ -1200,7 +1202,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : storage(other.m_type, other.m_id, other.m_server_id), + : storage(other.m_type, other.m_id), //other.m_server_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} pfs_storage& @@ -1234,15 +1236,15 @@ private: pfs_storage::ctx m_ctx; }; -pfs_storage::pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, +pfs_storage::pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id, std::filesystem::path mount_point) - : storage(type, std::move(id), server_id), + : storage(type, std::move(id)), //server_id), m_pimpl(std::make_unique( pfs_storage::ctx{std::move(mount_point)})) {} -pfs_storage::pfs_storage(enum storage::type type, std::string id, std::uint64_t server_id, +pfs_storage::pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id, ADM_pfs_context_t ctx) - : storage(type, std::move(id), server_id), + : storage(type, std::move(id)), //server_id), m_pimpl(std::make_unique(pfs_storage::ctx{ctx})) {} pfs_storage::~pfs_storage() = default; @@ -1290,7 +1292,7 @@ public: static_cast( reqs->r_storage->s_type), reqs->r_storage->s_id, - reqs->r_storage->s_server_id, + //reqs->r_storage->s_server_id, reqs->r_storage->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: @@ -1298,7 +1300,9 @@ public: m_storage = std::make_unique( static_cast( reqs->r_storage->s_type), - reqs->r_storage->s_id, reqs->r_storage->s_server_id, reqs->r_storage->s_pfs_ctx); + reqs->r_storage->s_id, + //reqs->r_storage->s_server_id, + reqs->r_storage->s_pfs_ctx); break; } } diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index e05b83e1..8238e262 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -299,7 +299,7 @@ MERCURY_GEN_PROC( /// ADM_register_adhoc_storage MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((ADM_job_t) (job))((hg_const_string_t) (id))((ADM_adhoc_context_t)(ctx))); -MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval))); +MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval)) ((uint64_t)(server_id))); /// ADM_update_adhoc_storage MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index eabb3703..d3bc0e6f 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -332,7 +332,7 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id, return tl::make_unexpected(static_cast(out.retval)); } - const auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, id, 64, ctx}; + const auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, id, ctx}; LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index e8f6ae04..fa8125ea 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -231,7 +231,7 @@ ADM_register_adhoc_storage(hg_handle_t h) { LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", rpc_id, __FUNCTION__, job); const auto adhoc_storage = admire::adhoc_storage( - admire::adhoc_storage::type::gekkofs, id, 64, ctx); //ctx.get() + admire::adhoc_storage::type::gekkofs, id, ctx); //ctx.get() uint64_t server_id = adhoc_storage.id(); @@ -243,12 +243,13 @@ ADM_register_adhoc_storage(hg_handle_t h) { admire::error_code rv = ADM_SUCCESS; out.retval = rv; + out.server_id = adhoc_storage.id(); /* out.adhoc_storage = admire::api::convert(adhoc_storage).get(); LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", rpc_id, __FUNCTION__, rv, adhoc_storage); */ - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}}}", rpc_id, + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, server_id: {}}}", rpc_id, out.server_id, __FUNCTION__, rv); ret = margo_respond(h, &out); -- GitLab From 68b08afe08a018c9e483b685c73f2fb2f0b4f4bd Mon Sep 17 00:00:00 2001 From: amanzano Date: Thu, 22 Sep 2022 18:03:12 +0200 Subject: [PATCH 11/17] Formatting and cleanup --- examples/cxx/ADM_register_adhoc_storage.cpp | 10 ++-- examples/cxx/ADM_register_job.cpp | 2 +- examples/cxx/ADM_update_job.cpp | 6 +-- src/common/api/admire_types.hpp | 19 +++----- src/common/api/types.cpp | 52 +++++++++------------ src/common/net/proto/rpc_types.h | 15 ++++-- src/lib/admire.cpp | 8 ++-- src/lib/c_wrapper.cpp | 10 ++-- src/lib/detail/impl.cpp | 4 +- src/scord/rpc_handlers.cpp | 18 ++----- 10 files changed, 60 insertions(+), 84 deletions(-) diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index f3583acd..26cf5405 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -45,8 +45,6 @@ main(int argc, char* argv[]) { const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); - //const auto server_id = admire::adhoc_storage::impl::generate_id(); - auto p = std::make_unique( admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, @@ -55,13 +53,15 @@ main(int argc, char* argv[]) { admire::job_requirements reqs(inputs, outputs, std::move(p)); std::string id; - //ADM_storage_t adhoc_storage{}; - const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false}; + const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, 42, 100, false}; ADM_return_t ret = ADM_SUCCESS; try { [[maybe_unused]] const auto job = admire::register_job(server, reqs); - const auto adhoc_storage = admire::register_adhoc_storage(server, job, id, adhoc_storage_ctx); + const auto adhoc_storage = admire::register_adhoc_storage( + server, job, id, adhoc_storage_ctx); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", e.what()); diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index b0f98864..bec2e5ed 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -44,7 +44,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index 789c69f9..a1156911 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -44,12 +44,10 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); - //const auto adhoc_storage = admire::register_adhoc_storage(server, job, "foobar", adhoc_storage_ctx); - admire::job_requirements reqs{inputs, outputs, std::move(p)}; @@ -58,7 +56,7 @@ main(int argc, char* argv[]) { prepare_datasets("output-new-dataset-{}", NOUTPUTS); auto p2 = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 7a4bd56d..f9571dc9 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -236,7 +236,7 @@ struct storage { virtual ~ctx() = default; }; - storage(storage::type type, std::string id);//std::uint64_t server_id); + storage(storage::type type, std::string id); virtual ~storage() = default; @@ -252,7 +252,6 @@ struct storage { protected: std::string m_id; enum type m_type; - //std::uint64_t m_server_id; }; struct adhoc_storage : public storage { @@ -295,21 +294,17 @@ struct adhoc_storage : public storage { std::uint32_t m_walltime; bool m_should_flush; }; - - adhoc_storage(enum storage::type type, std::string id, + + adhoc_storage(enum storage::type type, std::string id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush); - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string id, ADM_adhoc_context_t ctx); adhoc_storage(const adhoc_storage& other) noexcept; adhoc_storage(enum storage::type type, std::string id, const admire::adhoc_storage::ctx& ctx); - //adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, - //ADM_adhoc_context_t ctx); - /*adhoc_storage(enum storage::type type, std::string id, std::uint64_t server_id, - const admire::adhoc_storage::ctx& ctx);*/ - + adhoc_storage(adhoc_storage&&) noexcept = default; adhoc_storage& operator=(const adhoc_storage&) noexcept; @@ -344,9 +339,9 @@ struct pfs_storage : public storage { std::filesystem::path m_mount_point; }; - pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id, + pfs_storage(enum storage::type type, std::string id, std::filesystem::path mount_point); - pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx); //std::uint64_t server_id, ADM_pfs_context_t ctx); + pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx); pfs_storage(const pfs_storage& other) noexcept; pfs_storage(pfs_storage&&) noexcept = default; pfs_storage& diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 8ed6bc60..2ce900f9 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1060,8 +1060,7 @@ dataset::id() const { storage::storage(enum storage::type type, std::string id) - //std::uint64_t server_id) - : m_id(std::move(id)), m_type(type) {}//, m_server_id(server_id) {} + : m_id(std::move(id)), m_type(type) {} std::string storage::id() const { @@ -1114,7 +1113,7 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { - + static std::uint64_t generate_id() { static std::atomic_uint64_t s_current_server_id = 0; @@ -1122,7 +1121,7 @@ class adhoc_storage::impl { } public: - explicit impl(adhoc_storage::ctx ctx) //, std::uint64_t counter) + explicit impl(adhoc_storage::ctx ctx) : m_id(generate_id()), m_ctx(std::move(ctx)) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; @@ -1147,34 +1146,25 @@ private: }; -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, //std::uint64_t server_id, - execution_mode exec_mode, - access_type access_type, std::uint32_t nodes, - std::uint32_t walltime, bool should_flush) - : storage(type, std::move(id)),//, server_id), +adhoc_storage::adhoc_storage(enum storage::type type, std::string id, + execution_mode exec_mode, access_type access_type, + std::uint32_t nodes, std::uint32_t walltime, + bool should_flush) + : storage(type, std::move(id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ exec_mode, access_type, nodes, walltime, should_flush})) {} -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, //std::uint64_t server_id, +adhoc_storage::adhoc_storage(enum storage::type type, std::string id, ADM_adhoc_context_t ctx) - : storage(type, std::move(id)), //, server_id), + : storage(type, std::move(id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ctx})) {} -/* adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - std::uint64_t server_id, const adhoc_storage::ctx& ctx) - : storage(type, std::move(id), server_id), - m_pimpl(std::make_unique(ctx)) {}*/ - -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, - //std::uint64_t server_id, - const adhoc_storage::ctx& ctx) - : storage(type, std::move(id)),//, server_id), - m_pimpl(std::make_unique(ctx)) {} // este es el nuevo añadido + : storage(type, std::move(id)), m_pimpl(std::make_unique(ctx)) {} adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept - : storage(other.m_type, other.m_id), //other.m_server_id), + : storage(other.m_type, other.m_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} adhoc_storage& @@ -1202,7 +1192,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : storage(other.m_type, other.m_id), //other.m_server_id), + : storage(other.m_type, other.m_id), // other.m_server_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} pfs_storage& @@ -1236,15 +1226,17 @@ private: pfs_storage::ctx m_ctx; }; -pfs_storage::pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id, +pfs_storage::pfs_storage(enum storage::type type, + std::string id, // std::uint64_t server_id, std::filesystem::path mount_point) - : storage(type, std::move(id)), //server_id), + : storage(type, std::move(id)), // server_id), m_pimpl(std::make_unique( pfs_storage::ctx{std::move(mount_point)})) {} -pfs_storage::pfs_storage(enum storage::type type, std::string id, //std::uint64_t server_id, +pfs_storage::pfs_storage(enum storage::type type, + std::string id, // std::uint64_t server_id, ADM_pfs_context_t ctx) - : storage(type, std::move(id)), //server_id), + : storage(type, std::move(id)), // server_id), m_pimpl(std::make_unique(pfs_storage::ctx{ctx})) {} pfs_storage::~pfs_storage() = default; @@ -1292,7 +1284,7 @@ public: static_cast( reqs->r_storage->s_type), reqs->r_storage->s_id, - //reqs->r_storage->s_server_id, + // reqs->r_storage->s_server_id, reqs->r_storage->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: @@ -1300,8 +1292,8 @@ public: m_storage = std::make_unique( static_cast( reqs->r_storage->s_type), - reqs->r_storage->s_id, - //reqs->r_storage->s_server_id, + reqs->r_storage->s_id, + // reqs->r_storage->s_server_id, reqs->r_storage->s_pfs_ctx); break; } diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 8238e262..9b61a07a 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -179,7 +179,7 @@ MERCURY_GEN_STRUCT_PROC( typedef struct adm_storage { const char* s_id; ADM_storage_type_t s_type; - uint64_t s_server_id; + uint64_t s_server_id; union { ADM_adhoc_context_t s_adhoc_ctx; ADM_pfs_context_t s_pfs_ctx; @@ -297,9 +297,18 @@ MERCURY_GEN_PROC( ); /// ADM_register_adhoc_storage -MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((ADM_job_t) (job))((hg_const_string_t) (id))((ADM_adhoc_context_t)(ctx))); +MERCURY_GEN_PROC( + ADM_register_adhoc_storage_in_t, + ((ADM_job_t) (job)) + ((hg_const_string_t) (id)) + ((ADM_adhoc_context_t)(ctx)) +); -MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (retval)) ((uint64_t)(server_id))); +MERCURY_GEN_PROC( + ADM_register_adhoc_storage_out_t, + ((int32_t) (retval)) + ((uint64_t)(server_id)) +); /// ADM_update_adhoc_storage MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index cba6a03b..0ee2f373 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -235,15 +235,13 @@ remove_job(const server& srv, const job& job) { admire::adhoc_storage register_adhoc_storage(const server& srv, const job& job, const std::string& id, const adhoc_storage::ctx& ctx) { - //return detail::register_adhoc_storage(srv, job, id, ctx); - - const auto rv = detail::register_adhoc_storage(srv, job, id, ctx); if(!rv) { - throw std::runtime_error(fmt::format("ADM_register_adhoc_storage() error: {}", - ADM_strerror(rv.error()))); + throw std::runtime_error( + fmt::format("ADM_register_adhoc_storage() error: {}", + ADM_strerror(rv.error()))); } return rv.value(); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 983fb9b6..300c9364 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -81,18 +81,14 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) { ADM_return_t ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id, ADM_adhoc_context_t ctx, - ADM_storage_t* adhoc_storage) { + ADM_storage_t* adhoc_storage) { const admire::server srv{server}; - //return admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); + const auto rv = admire::detail::register_adhoc_storage( + srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); - - const auto rv = - admire::detail::register_adhoc_storage(srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); - - if(!rv) { return rv.error(); } diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index d3bc0e6f..616ebfb8 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -321,7 +321,6 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id, const auto rpc_id = id.c_str(); const auto rpc_ctx = api::convert(ctx); - //ADM_register_adhoc_storage_in_t in{rpc_job.get()}; ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_id, rpc_ctx.get()}; ADM_register_adhoc_storage_out_t out; @@ -332,7 +331,8 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id, return tl::make_unexpected(static_cast(out.retval)); } - const auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, id, ctx}; + const auto rpc_adhoc_storage = + admire::adhoc_storage{admire::storage::type::gekkofs, id, ctx}; LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index fa8125ea..3f87d589 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -225,32 +225,20 @@ ADM_register_adhoc_storage(hg_handle_t h) { const admire::job job(in.job); const std::string id(in.id); const admire::adhoc_storage::ctx ctx(in.ctx); - // const admire::job_requirements reqs(&in.reqs); const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", rpc_id, __FUNCTION__, job); const auto adhoc_storage = admire::adhoc_storage( - admire::adhoc_storage::type::gekkofs, id, ctx); //ctx.get() - - - uint64_t server_id = adhoc_storage.id(); - LOGGER_WARN("server_id: {}", server_id); - // LOGGER_INFO("RPC ID {} ({}) <= {{server_id: {{{}}}}}", rpc_id, __FUNCTION__, server_id); //new - - // admire::adhoc_storage::ctx{admire::adhoc_storage::execution_mode::in_job_shared, admire::adhoc_storage::access_type::write_only, 10, 10, false} + admire::adhoc_storage::type::gekkofs, id, ctx); admire::error_code rv = ADM_SUCCESS; out.retval = rv; out.server_id = adhoc_storage.id(); - /* - out.adhoc_storage = admire::api::convert(adhoc_storage).get(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, adhoc_storage: {{{}}}}}", rpc_id, - __FUNCTION__, rv, adhoc_storage); */ - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, server_id: {}}}", rpc_id, out.server_id, - __FUNCTION__, rv); + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, server_id: {}}}", rpc_id, + out.server_id, __FUNCTION__, rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab From bc26d35d6419e55770d45054f90ce151f9e43995 Mon Sep 17 00:00:00 2001 From: amanzano Date: Tue, 27 Sep 2022 11:51:30 +0200 Subject: [PATCH 12/17] Rename adhoc_storage id to user_id in examples ADM_register_adhoc_storage.cpp var id renamed to user_id --- examples/c/ADM_deploy_adhoc_storage.c | 5 ++--- examples/c/ADM_register_adhoc_storage.c | 5 ++--- examples/c/ADM_remove_adhoc_storage.c | 7 +++---- examples/c/ADM_update_adhoc_storage.c | 5 ++--- examples/cxx/ADM_register_adhoc_storage.cpp | 5 ++--- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index de719dc7..274a9e0b 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -68,12 +68,11 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - const char* id = "id"; + const char* user_id = "adhoc_storage_42"; ADM_storage_t adhoc_storage; - ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); - //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index 6b6eea60..af9034a7 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -68,12 +68,11 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - const char* id = "id"; + const char* user_id = "adhoc_storage_42"; ADM_storage_t adhoc_storage; - ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); - //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index 9d1f89df..544ded6a 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -56,7 +56,7 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); - ADM_job_requirements_t reqs = + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); @@ -68,11 +68,10 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - const char* id = "id"; + const char* user_id = "adhoc_storage_42"; ADM_storage_t adhoc_storage; - ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); - //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index df947cdb..45a90d54 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -69,11 +69,10 @@ main(int argc, char* argv[]) { goto cleanup; } - const char* id = "id"; + const char* user_id = "adhoc_storage_42"; ADM_storage_t adhoc_storage; - ret = ADM_register_adhoc_storage(server, job, id, ctx, &adhoc_storage); - //adhoc_storage = ADM_register_adhoc_storage(server, job, id, ctx); + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 26cf5405..b097a34e 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -29,7 +29,6 @@ #define NINPUTS 10 #define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -52,7 +51,7 @@ main(int argc, char* argv[]) { admire::job_requirements reqs(inputs, outputs, std::move(p)); - std::string id; + std::string user_id = "adhoc_storage_42"; const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false}; @@ -61,7 +60,7 @@ main(int argc, char* argv[]) { try { [[maybe_unused]] const auto job = admire::register_job(server, reqs); const auto adhoc_storage = admire::register_adhoc_storage( - server, job, id, adhoc_storage_ctx); + server, job, user_id, adhoc_storage_ctx); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", e.what()); -- GitLab From cd5df8699327105a95daba6021c0242902bdfab9 Mon Sep 17 00:00:00 2001 From: amanzano Date: Tue, 27 Sep 2022 11:57:03 +0200 Subject: [PATCH 13/17] Code cleanup --- examples/c/ADM_deploy_adhoc_storage.c | 1 - examples/c/ADM_register_adhoc_storage.c | 2 -- examples/cxx/ADM_register_adhoc_storage.cpp | 2 +- examples/cxx/ADM_transfer_datasets.cpp | 2 +- src/common/api/admire_types.hpp | 1 - src/common/api/convert.cpp | 7 +++---- src/common/api/types.cpp | 20 ++++++-------------- 7 files changed, 11 insertions(+), 24 deletions(-) diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 274a9e0b..f3c32447 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -71,7 +71,6 @@ main(int argc, char* argv[]) { const char* user_id = "adhoc_storage_42"; ADM_storage_t adhoc_storage; - ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index af9034a7..6667bd74 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -71,10 +71,8 @@ main(int argc, char* argv[]) { const char* user_id = "adhoc_storage_42"; ADM_storage_t adhoc_storage; - ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); - if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_adhoc_storage() remote procedure not completed " diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index b097a34e..7cae76be 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -58,7 +58,7 @@ main(int argc, char* argv[]) { ADM_return_t ret = ADM_SUCCESS; try { - [[maybe_unused]] const auto job = admire::register_job(server, reqs); + const auto job = admire::register_job(server, reqs); const auto adhoc_storage = admire::register_adhoc_storage( server, job, user_id, adhoc_storage_ctx); } catch(const std::exception& e) { diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index a6bf48a3..7d39e932 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -52,7 +52,7 @@ main(int argc, char* argv[]) { const auto mapping = admire::transfer::mapping::n_to_n; auto p = std::make_unique( - admire::storage::type::gekkofs, "foobar", + admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, admire::adhoc_storage::access_type::read_write, 42, 100, false); diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index f9571dc9..cd63fbe3 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -245,7 +245,6 @@ struct storage { type type() const; - virtual std::shared_ptr context() const = 0; diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index a16e604a..09231d8d 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -70,11 +70,10 @@ convert(const admire::adhoc_storage& st) { auto managed_ctx = convert(*std::static_pointer_cast( - st.context())); - + st.context())); ADM_storage_t c_st = ADM_storage_create( - (std::to_string(st.id())).c_str(), static_cast(st.type()), - managed_ctx.get()); + (std::to_string(st.id())).c_str(), + static_cast(st.type()), managed_ctx.get()); return managed_ctype{c_st, std::move(managed_ctx)}; } diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 2ce900f9..a8e8e824 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1072,7 +1072,6 @@ storage::type() const { return m_type; } - adhoc_storage::ctx::ctx(adhoc_storage::execution_mode exec_mode, adhoc_storage::access_type access_type, std::uint32_t nodes, std::uint32_t walltime, @@ -1113,7 +1112,6 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { - static std::uint64_t generate_id() { static std::atomic_uint64_t s_current_server_id = 0; @@ -1178,7 +1176,6 @@ adhoc_storage::id() const { return m_pimpl->id(); } - std::shared_ptr adhoc_storage::context() const { return std::make_shared(m_pimpl->context()); @@ -1192,7 +1189,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : storage(other.m_type, other.m_id), // other.m_server_id), + : storage(other.m_type, other.m_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} pfs_storage& @@ -1226,17 +1223,15 @@ private: pfs_storage::ctx m_ctx; }; -pfs_storage::pfs_storage(enum storage::type type, - std::string id, // std::uint64_t server_id, +pfs_storage::pfs_storage(enum storage::type type, std::string id, std::filesystem::path mount_point) - : storage(type, std::move(id)), // server_id), + : storage(type, std::move(id)), m_pimpl(std::make_unique( pfs_storage::ctx{std::move(mount_point)})) {} -pfs_storage::pfs_storage(enum storage::type type, - std::string id, // std::uint64_t server_id, +pfs_storage::pfs_storage(enum storage::type type, std::string id, ADM_pfs_context_t ctx) - : storage(type, std::move(id)), // server_id), + : storage(type, std::move(id)), m_pimpl(std::make_unique(pfs_storage::ctx{ctx})) {} pfs_storage::~pfs_storage() = default; @@ -1284,7 +1279,6 @@ public: static_cast( reqs->r_storage->s_type), reqs->r_storage->s_id, - // reqs->r_storage->s_server_id, reqs->r_storage->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: @@ -1292,9 +1286,7 @@ public: m_storage = std::make_unique( static_cast( reqs->r_storage->s_type), - reqs->r_storage->s_id, - // reqs->r_storage->s_server_id, - reqs->r_storage->s_pfs_ctx); + reqs->r_storage->s_id, reqs->r_storage->s_pfs_ctx); break; } } -- GitLab From c66b64f27c114973e26977e7be4565182e9a6f70 Mon Sep 17 00:00:00 2001 From: amanzano Date: Wed, 28 Sep 2022 16:56:56 +0200 Subject: [PATCH 14/17] Rename id to user_id in public APIs of register_adhoc_storage --- src/common/api/admire_types.hpp | 6 +++--- src/common/api/convert.cpp | 4 ++-- src/common/api/types.cpp | 25 ++++++++++++------------- src/lib/admire.cpp | 5 +++-- src/lib/c_wrapper.cpp | 6 +++--- src/lib/detail/impl.cpp | 10 ++++++---- 6 files changed, 29 insertions(+), 27 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index cd63fbe3..8ece894a 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -236,12 +236,12 @@ struct storage { virtual ~ctx() = default; }; - storage(storage::type type, std::string id); + storage(storage::type type, std::string user_id); virtual ~storage() = default; std::string - id() const; + user_id() const; type type() const; @@ -249,7 +249,7 @@ struct storage { context() const = 0; protected: - std::string m_id; + std::string m_user_id; enum type m_type; }; diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 09231d8d..c9873ee6 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -72,8 +72,8 @@ convert(const admire::adhoc_storage& st) { convert(*std::static_pointer_cast( st.context())); ADM_storage_t c_st = ADM_storage_create( - (std::to_string(st.id())).c_str(), - static_cast(st.type()), managed_ctx.get()); + st.user_id().c_str(), static_cast(st.type()), + managed_ctx.get()); return managed_ctype{c_st, std::move(managed_ctx)}; } diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index a8e8e824..07815c6f 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1058,13 +1058,12 @@ dataset::id() const { return m_pimpl->id(); } - -storage::storage(enum storage::type type, std::string id) - : m_id(std::move(id)), m_type(type) {} +storage::storage(enum storage::type type, std::string user_id) + : m_user_id(std::move(user_id)), m_type(type) {} std::string -storage::id() const { - return m_id; +storage::user_id() const { + return m_user_id; } enum storage::type @@ -1144,25 +1143,25 @@ private: }; -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, +adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush) - : storage(type, std::move(id)), + : storage(type, std::move(user_id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ exec_mode, access_type, nodes, walltime, should_flush})) {} -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, +adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, ADM_adhoc_context_t ctx) - : storage(type, std::move(id)), + : storage(type, std::move(user_id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ctx})) {} -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, +adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, const adhoc_storage::ctx& ctx) - : storage(type, std::move(id)), m_pimpl(std::make_unique(ctx)) {} + : storage(type, std::move(user_id)), m_pimpl(std::make_unique(ctx)) {} adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept - : storage(other.m_type, other.m_id), + : storage(other.m_type, other.m_user_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} adhoc_storage& @@ -1189,7 +1188,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : storage(other.m_type, other.m_id), + : storage(other.m_type, other.m_user_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} pfs_storage& diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 0ee2f373..8b2af228 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -233,10 +233,11 @@ remove_job(const server& srv, const job& job) { } admire::adhoc_storage -register_adhoc_storage(const server& srv, const job& job, const std::string& id, +register_adhoc_storage(const server& srv, const job& job, + const std::string& user_id, const adhoc_storage::ctx& ctx) { - const auto rv = detail::register_adhoc_storage(srv, job, id, ctx); + const auto rv = detail::register_adhoc_storage(srv, job, user_id, ctx); if(!rv) { throw std::runtime_error( diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 300c9364..5a36a08d 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -79,14 +79,14 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) { } ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id, - ADM_adhoc_context_t ctx, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, + const char* user_id, ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage) { const admire::server srv{server}; const auto rv = admire::detail::register_adhoc_storage( - srv, admire::job{job}, id, admire::adhoc_storage::ctx{ctx}); + srv, admire::job{job}, user_id, admire::adhoc_storage::ctx{ctx}); if(!rv) { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 616ebfb8..34b1c482 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -308,7 +308,8 @@ remove_job(const server& srv, const job& job) { } tl::expected -register_adhoc_storage(const server& srv, const job& job, const std::string& id, +register_adhoc_storage(const server& srv, const job& job, + const std::string& user_id, const adhoc_storage::ctx& ctx) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -318,10 +319,11 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id, LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job); const auto rpc_job = api::convert(job); - const auto rpc_id = id.c_str(); + const auto rpc_user_id = user_id.c_str(); const auto rpc_ctx = api::convert(ctx); - ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_id, rpc_ctx.get()}; + ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_user_id, + rpc_ctx.get()}; ADM_register_adhoc_storage_out_t out; const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); @@ -332,7 +334,7 @@ register_adhoc_storage(const server& srv, const job& job, const std::string& id, } const auto rpc_adhoc_storage = - admire::adhoc_storage{admire::storage::type::gekkofs, id, ctx}; + admire::adhoc_storage{admire::storage::type::gekkofs, user_id, ctx}; LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); -- GitLab From 719eeed401c0eb7372650e2bce75ecc3cba8adc7 Mon Sep 17 00:00:00 2001 From: amanzano Date: Thu, 29 Sep 2022 17:22:08 +0200 Subject: [PATCH 15/17] Update header files for register_adhoc_storage --- src/lib/admire.h | 6 ++++-- src/lib/admire.hpp | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/lib/admire.h b/src/lib/admire.h index b0061b29..972a8d03 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -87,6 +87,8 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); * Register an adhoc storage system. * * @param[in] server The server to which the request is directed + * @param[in] job An ADM_JOB identifying the originating job. + * @param[in] user_id The desired user id for the adhoc storage system. * @param[in] ctx The EXECUTION_CONTEXT for the adhoc storage system. * @param[out] adhoc_storage An ADM_STORAGE referring to the newly-created * adhoc storage instance. @@ -94,8 +96,8 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); * successfully. */ ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, const char* id, - ADM_adhoc_context_t ctx, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, + const char* user_id, ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 65e934b9..4e3c59c9 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -57,7 +57,8 @@ ADM_return_t remove_job(const server& srv, const job& job); admire::adhoc_storage -register_adhoc_storage(const server& srv, const job& job, const std::string& id, +register_adhoc_storage(const server& srv, const job& job, + const std::string& user_id, const adhoc_storage::ctx& ctx); ADM_return_t -- GitLab From 28fe772e4b2b8d0757dff338d1e348bcefaaebd9 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 30 Sep 2022 11:19:50 +0200 Subject: [PATCH 16/17] Update logging for register_adhoc_storage to new style --- src/common/net/proto/rpc_types.h | 7 ++++--- src/lib/detail/impl.cpp | 19 +++++++++++++++---- src/scord/rpc_handlers.cpp | 12 +++++++++--- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 9b61a07a..e8b44482 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -298,15 +298,16 @@ MERCURY_GEN_PROC( /// ADM_register_adhoc_storage MERCURY_GEN_PROC( - ADM_register_adhoc_storage_in_t, + ADM_register_adhoc_storage_in_t, ((ADM_job_t) (job)) ((hg_const_string_t) (id)) ((ADM_adhoc_context_t)(ctx)) ); MERCURY_GEN_PROC( - ADM_register_adhoc_storage_out_t, - ((int32_t) (retval)) + ADM_register_adhoc_storage_out_t, + ((hg_uint64_t) (op_id)) + ((int32_t) (retval)) ((uint64_t)(server_id)) ); diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 34b1c482..e0662609 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -314,9 +314,13 @@ register_adhoc_storage(const server& srv, const job& job, scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job); + LOGGER_INFO("rpc id: name: {} from: {} => " + "body: {{job: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), job); const auto rpc_job = api::convert(job); const auto rpc_user_id = user_id.c_str(); @@ -329,14 +333,21 @@ register_adhoc_storage(const server& srv, const job& job, const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); if(out.retval < 0) { - LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval); - return tl::make_unexpected(static_cast(out.retval)); + const auto retval = static_cast(out.retval); + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), retval, + out.op_id); + return tl::make_unexpected(retval); } const auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, user_id, ctx}; - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), ADM_SUCCESS, + out.op_id); return rpc_adhoc_storage; } diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 3f87d589..c096848b 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -227,18 +227,24 @@ ADM_register_adhoc_storage(hg_handle_t h) { const admire::adhoc_storage::ctx ctx(in.ctx); const auto rpc_id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}}}", rpc_id, __FUNCTION__, job); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + job); const auto adhoc_storage = admire::adhoc_storage( admire::adhoc_storage::type::gekkofs, id, ctx); admire::error_code rv = ADM_SUCCESS; + out.op_id = rpc_id; out.retval = rv; out.server_id = adhoc_storage.id(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, server_id: {}}}", rpc_id, - out.server_id, __FUNCTION__, rv); + LOGGER_INFO("rpc id: {} name: {} to: {} => " + "body: {{retval: {}, server_id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + rv, out.server_id); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab From 5c9f1805488257af2e15f83327181d22d1fb2208 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Sat, 1 Oct 2022 13:00:41 +0200 Subject: [PATCH 17/17] IDs for adhoc_storage instances no longer assigned automatically The previous automatic ID generation caused problems with the usage of adhoc_storage constructors outside RPC handler code (e.g. during conversions). It makes more sense for IDs to be assigned by a server-centric adhoc_storage_manager. --- src/common/api/admire_types.hpp | 35 +++++++++++++++++++++++---------- src/common/api/types.cpp | 32 +++++++++++++++++++----------- src/lib/detail/impl.cpp | 4 +++- src/scord/rpc_handlers.cpp | 16 +++++++++++++-- 4 files changed, 62 insertions(+), 25 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 8ece894a..28293cd3 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -294,27 +294,29 @@ struct adhoc_storage : public storage { bool m_should_flush; }; - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string user_id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush); - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string user_id, ADM_adhoc_context_t ctx); - adhoc_storage(const adhoc_storage& other) noexcept; - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string user_id, const admire::adhoc_storage::ctx& ctx); - adhoc_storage(adhoc_storage&&) noexcept = default; + adhoc_storage(const adhoc_storage& other) noexcept; + adhoc_storage(adhoc_storage&&) noexcept; adhoc_storage& operator=(const adhoc_storage&) noexcept; adhoc_storage& - operator=(adhoc_storage&&) noexcept = default; + operator=(adhoc_storage&&) noexcept; ~adhoc_storage() override; - - std::uint64_t + const std::optional& id() const; + std::optional& + id(); + std::shared_ptr context() const final; @@ -594,6 +596,19 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter> + : formatter { + + // parse is inherited from formatter. + template + auto + format(const std::optional& v, FormatContext& ctx) const { + return formatter::format( + v ? std::to_string(v.value()) : "none", ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. @@ -605,8 +620,8 @@ struct fmt::formatter : formatter { s.context()); const auto str = - fmt::format("{{type: {}, id: {}, context: {}}}", s.type(), - std::quoted(std::to_string(s.id())), + fmt::format("{{type: {}, id: {}, user_id: {}, context: {}}}", + s.type(), s.id(), std::quoted(s.user_id()), (pctx ? fmt::format("{}", *pctx) : "NULL")); return formatter::format(str, ctx); } diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 07815c6f..f778737d 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1111,34 +1111,33 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { - static std::uint64_t - generate_id() { - static std::atomic_uint64_t s_current_server_id = 0; - return s_current_server_id++; - } - public: - explicit impl(adhoc_storage::ctx ctx) - : m_id(generate_id()), m_ctx(std::move(ctx)) {} + explicit impl(adhoc_storage::ctx ctx) : m_id(), m_ctx(std::move(ctx)) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& operator=(const impl& other) noexcept = default; impl& operator=(impl&&) noexcept = default; + ~impl() = default; - std::uint64_t + const std::optional& id() const { return m_id; } + std::optional& + id() { + return m_id; + } + adhoc_storage::ctx context() const { return m_ctx; } private: - std::uint64_t m_id; + std::optional m_id; adhoc_storage::ctx m_ctx; }; @@ -1164,22 +1163,31 @@ adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept : storage(other.m_type, other.m_user_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} +adhoc_storage::adhoc_storage(adhoc_storage&&) noexcept = default; + adhoc_storage& adhoc_storage::operator=(const adhoc_storage& other) noexcept { this->m_pimpl = std::make_unique(*other.m_pimpl); return *this; } -std::uint64_t +adhoc_storage& +adhoc_storage::operator=(adhoc_storage&&) noexcept = default; + +const std::optional& adhoc_storage::id() const { return m_pimpl->id(); } +std::optional& +adhoc_storage::id() { + return m_pimpl->id(); +} + std::shared_ptr adhoc_storage::context() const { return std::make_shared(m_pimpl->context()); } - adhoc_storage::~adhoc_storage() = default; pfs_storage::ctx::ctx(std::filesystem::path mount_point) diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index e0662609..49ce4597 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -341,9 +341,11 @@ register_adhoc_storage(const server& srv, const job& job, return tl::make_unexpected(retval); } - const auto rpc_adhoc_storage = + auto rpc_adhoc_storage = admire::adhoc_storage{admire::storage::type::gekkofs, user_id, ctx}; + rpc_adhoc_storage.id() = out.server_id; + LOGGER_INFO("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), ADM_SUCCESS, diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index c096848b..73c380f6 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -37,6 +37,18 @@ struct remote_procedure { } }; +struct adhoc_storage_manager { + + template + static admire::adhoc_storage + create(Args&&... args) { + static std::atomic_uint64_t current_id; + auto adhoc_storage = admire::adhoc_storage(std::forward(args)...); + adhoc_storage.id() = current_id++; + return adhoc_storage; + } +}; + static void ADM_ping(hg_handle_t h) { @@ -232,14 +244,14 @@ ADM_register_adhoc_storage(hg_handle_t h) { rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job); - const auto adhoc_storage = admire::adhoc_storage( + const auto adhoc_storage = adhoc_storage_manager::create( admire::adhoc_storage::type::gekkofs, id, ctx); admire::error_code rv = ADM_SUCCESS; out.op_id = rpc_id; out.retval = rv; - out.server_id = adhoc_storage.id(); + out.server_id = *adhoc_storage.id(); LOGGER_INFO("rpc id: {} name: {} to: {} => " "body: {{retval: {}, server_id: {}}}", -- GitLab