diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 73a0e1accf4d5b147c663762ea4920efa438e4a2..f3c324478016d4ed1ed2fd1237f12626ab39e58b 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -26,6 +26,10 @@ #include #include #include +#include "common.h" + +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -39,6 +43,12 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; + ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + assert(inputs); + ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + assert(outputs); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); assert(ctx); @@ -46,9 +56,22 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); - ADM_storage_t adhoc_storage; + ADM_job_requirements_t reqs = + ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); + assert(reqs); + + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + const char* user_id = "adhoc_storage_42"; + + ADM_storage_t adhoc_storage; + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index c1740b7edf19bbd9633693c2644db451cf949175..6667bd74f2d976dd3ffeb1005a9033a02927de72 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -43,6 +43,7 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); @@ -59,9 +60,18 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_storage_t adhoc_storage; + ADM_return_t ret = ADM_register_job(server, reqs, &job); - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } + + const char* user_id = "adhoc_storage_42"; + + ADM_storage_t adhoc_storage; + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index adb4a1f419e09ab89edb9c041eab19bccce23437..544ded6afffa1ea3b7298a0f5efb24f9f2afe291 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -26,6 +26,10 @@ #include #include #include +#include "common.h" + +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -39,6 +43,12 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; + ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + assert(inputs); + ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + assert(outputs); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); assert(ctx); @@ -46,8 +56,22 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_requirements_t reqs = + ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); + assert(reqs); + + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + } + + const char* user_id = "adhoc_storage_42"; + ADM_storage_t adhoc_storage; - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index 2b71046acc5e4d1ccf032591d470616d9407e452..45a90d542cd840406cc46d8b37fbab47db9ea659 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -26,6 +26,10 @@ #include #include #include +#include "common.h" + +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -39,6 +43,12 @@ main(int argc, char* argv[]) { int exit_status = EXIT_SUCCESS; ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_job_t job; + ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + assert(inputs); + ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + assert(outputs); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); assert(ctx); @@ -46,8 +56,23 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_requirements_t reqs = + ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); + assert(reqs); + + ADM_return_t ret = ADM_register_job(server, reqs, &job); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit_status = EXIT_FAILURE; + goto cleanup; + } + + const char* user_id = "adhoc_storage_42"; + ADM_storage_t adhoc_storage; - ADM_return_t ret = ADM_register_adhoc_storage(server, ctx, &adhoc_storage); + ret = ADM_register_adhoc_storage(server, job, user_id, ctx, &adhoc_storage); if(ret != ADM_SUCCESS) { fprintf(stdout, diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 50fe30bdb4a498f188922c5cc958fc1cc77b2e82..7cae76be45c927a9c075d6a851ac095eb33cb3be 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -24,7 +24,10 @@ #include #include +#include "common.hpp" +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -38,12 +41,26 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_adhoc_context_t ctx{}; - ADM_storage_t adhoc_storage{}; + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + auto p = std::make_unique( + admire::storage::type::gekkofs, "foobar", + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, 42, 100, false); + + admire::job_requirements reqs(inputs, outputs, std::move(p)); + + std::string user_id = "adhoc_storage_42"; + const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, 42, 100, false}; ADM_return_t ret = ADM_SUCCESS; try { - ret = admire::register_adhoc_storage(server, ctx, &adhoc_storage); + const auto job = admire::register_job(server, reqs); + const auto adhoc_storage = admire::register_adhoc_storage( + server, job, user_id, adhoc_storage_ctx); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", e.what()); diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 396006a8cf80348520c5daa755a821281731b0bd..28293cd3ff1da08cfd33207871f2d39a5f992107 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -236,12 +236,12 @@ struct storage { virtual ~ctx() = default; }; - storage(storage::type type, std::string id); + storage(storage::type type, std::string user_id); virtual ~storage() = default; std::string - id() const; + user_id() const; type type() const; @@ -249,7 +249,7 @@ struct storage { context() const = 0; protected: - std::string m_id; + std::string m_user_id; enum type m_type; }; @@ -294,20 +294,29 @@ struct adhoc_storage : public storage { bool m_should_flush; }; - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string user_id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush); - adhoc_storage(enum storage::type type, std::string id, + adhoc_storage(enum storage::type type, std::string user_id, ADM_adhoc_context_t ctx); + adhoc_storage(enum storage::type type, std::string user_id, + const admire::adhoc_storage::ctx& ctx); + adhoc_storage(const adhoc_storage& other) noexcept; - adhoc_storage(adhoc_storage&&) noexcept = default; + adhoc_storage(adhoc_storage&&) noexcept; adhoc_storage& operator=(const adhoc_storage&) noexcept; adhoc_storage& - operator=(adhoc_storage&&) noexcept = default; + operator=(adhoc_storage&&) noexcept; ~adhoc_storage() override; + const std::optional& + id() const; + + std::optional& + id(); + std::shared_ptr context() const final; @@ -587,6 +596,19 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter> + : formatter { + + // parse is inherited from formatter. + template + auto + format(const std::optional& v, FormatContext& ctx) const { + return formatter::format( + v ? std::to_string(v.value()) : "none", ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. @@ -598,8 +620,8 @@ struct fmt::formatter : formatter { s.context()); const auto str = - fmt::format("{{type: {}, id: {}, context: {}}}", s.type(), - std::quoted(s.id()), + fmt::format("{{type: {}, id: {}, user_id: {}, context: {}}}", + s.type(), s.id(), std::quoted(s.user_id()), (pctx ? fmt::format("{}", *pctx) : "NULL")); return formatter::format(str, ctx); } diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 591b56e9148e38eef113a24dc3487477d00042d5..c9873ee61556fe85babcb61bcc78a5371e402339 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -71,9 +71,8 @@ convert(const admire::adhoc_storage& st) { auto managed_ctx = convert(*std::static_pointer_cast( st.context())); - ADM_storage_t c_st = ADM_storage_create( - st.id().c_str(), static_cast(st.type()), + st.user_id().c_str(), static_cast(st.type()), managed_ctx.get()); return managed_ctype{c_st, std::move(managed_ctx)}; diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 85636c77e31bc561cbd70c0a43bf9823626d2ccf..f778737d94e2a9e93c5094c3820ec1bf22dda709 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1058,13 +1058,12 @@ dataset::id() const { return m_pimpl->id(); } - -storage::storage(enum storage::type type, std::string id) - : m_id(std::move(id)), m_type(type) {} +storage::storage(enum storage::type type, std::string user_id) + : m_user_id(std::move(user_id)), m_type(type) {} std::string -storage::id() const { - return m_id; +storage::user_id() const { + return m_user_id; } enum storage::type @@ -1113,13 +1112,24 @@ adhoc_storage::ctx::should_flush() const { class adhoc_storage::impl { public: - explicit impl(adhoc_storage::ctx ctx) : m_ctx(std::move(ctx)) {} + explicit impl(adhoc_storage::ctx ctx) : m_id(), m_ctx(std::move(ctx)) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& operator=(const impl& other) noexcept = default; impl& operator=(impl&&) noexcept = default; + ~impl() = default; + + const std::optional& + id() const { + return m_id; + } + + std::optional& + id() { + return m_id; + } adhoc_storage::ctx context() const { @@ -1127,38 +1137,57 @@ public: } private: + std::optional m_id; adhoc_storage::ctx m_ctx; }; -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, +adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, execution_mode exec_mode, access_type access_type, std::uint32_t nodes, std::uint32_t walltime, bool should_flush) - : storage(type, std::move(id)), + : storage(type, std::move(user_id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ exec_mode, access_type, nodes, walltime, should_flush})) {} -adhoc_storage::adhoc_storage(enum storage::type type, std::string id, +adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, ADM_adhoc_context_t ctx) - : storage(type, std::move(id)), + : storage(type, std::move(user_id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ctx})) {} +adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, + const adhoc_storage::ctx& ctx) + : storage(type, std::move(user_id)), m_pimpl(std::make_unique(ctx)) {} + adhoc_storage::adhoc_storage(const adhoc_storage& other) noexcept - : storage(other.m_type, other.m_id), + : storage(other.m_type, other.m_user_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} +adhoc_storage::adhoc_storage(adhoc_storage&&) noexcept = default; + adhoc_storage& adhoc_storage::operator=(const adhoc_storage& other) noexcept { this->m_pimpl = std::make_unique(*other.m_pimpl); return *this; } +adhoc_storage& +adhoc_storage::operator=(adhoc_storage&&) noexcept = default; + +const std::optional& +adhoc_storage::id() const { + return m_pimpl->id(); +} + +std::optional& +adhoc_storage::id() { + return m_pimpl->id(); +} + std::shared_ptr adhoc_storage::context() const { return std::make_shared(m_pimpl->context()); } - adhoc_storage::~adhoc_storage() = default; pfs_storage::ctx::ctx(std::filesystem::path mount_point) @@ -1167,7 +1196,7 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : storage(other.m_type, other.m_id), + : storage(other.m_type, other.m_user_id), m_pimpl(std::make_unique(*other.m_pimpl)) {} pfs_storage& diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 3f60cc22e157198bdd7d617e8dc497a5e27395b0..e8b44482dddca218d83f8594134a38b4d43dd38d 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -179,6 +179,7 @@ MERCURY_GEN_STRUCT_PROC( typedef struct adm_storage { const char* s_id; ADM_storage_type_t s_type; + uint64_t s_server_id; union { ADM_adhoc_context_t s_adhoc_ctx; ADM_pfs_context_t s_pfs_ctx; @@ -296,9 +297,19 @@ MERCURY_GEN_PROC( ); /// ADM_register_adhoc_storage -MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC( + ADM_register_adhoc_storage_in_t, + ((ADM_job_t) (job)) + ((hg_const_string_t) (id)) + ((ADM_adhoc_context_t)(ctx)) +); -MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC( + ADM_register_adhoc_storage_out_t, + ((hg_uint64_t) (op_id)) + ((int32_t) (retval)) + ((uint64_t)(server_id)) +); /// ADM_update_adhoc_storage MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 78e2007be51625a03a658f70be6ecdd02419a90d..8b2af22831daa3195997ec685cfd09ebc30c186c 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -219,6 +219,7 @@ register_job(const server& srv, const job_requirements& reqs) { } return rv.value(); + } ADM_return_t @@ -231,32 +232,20 @@ remove_job(const server& srv, const job& job) { return detail::remove_job(srv, job); } -ADM_return_t -register_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, - ADM_storage_t* adhoc_storage) { - - (void) srv; - (void) ctx; - (void) adhoc_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); +admire::adhoc_storage +register_adhoc_storage(const server& srv, const job& job, + const std::string& user_id, + const adhoc_storage::ctx& ctx) { - LOGGER_INFO("ADM_register_adhoc_storage(...)"); + const auto rv = detail::register_adhoc_storage(srv, job, user_id, ctx); - ADM_register_adhoc_storage_in_t in{}; - ADM_register_adhoc_storage_out_t out; - - const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_register_adhoc_storage() = {}", out.ret); - return static_cast(out.ret); + if(!rv) { + throw std::runtime_error( + fmt::format("ADM_register_adhoc_storage() error: {}", + ADM_strerror(rv.error()))); } - LOGGER_INFO("ADM_register_adhoc_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + return rv.value(); } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index ea9008d78c945e9af1847129dedcbc7195763a00..972a8d030c1f266f793d1c2c36ffe3d699715044 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -87,6 +87,8 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); * Register an adhoc storage system. * * @param[in] server The server to which the request is directed + * @param[in] job An ADM_JOB identifying the originating job. + * @param[in] user_id The desired user id for the adhoc storage system. * @param[in] ctx The EXECUTION_CONTEXT for the adhoc storage system. * @param[out] adhoc_storage An ADM_STORAGE referring to the newly-created * adhoc storage instance. @@ -94,7 +96,8 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job); * successfully. */ ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_adhoc_context_t ctx, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, + const char* user_id, ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 52ffb1229480aa7001f340197d1618965e47210a..4e3c59c9d730aa37bf9301b429b0ffea3da8ceb0 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -56,9 +56,10 @@ update_job(const server& srv, const job&, const job_requirements& reqs); ADM_return_t remove_job(const server& srv, const job& job); -ADM_return_t -register_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, - ADM_storage_t* adhoc_storage); +admire::adhoc_storage +register_adhoc_storage(const server& srv, const job& job, + const std::string& user_id, + const adhoc_storage::ctx& ctx); ADM_return_t update_adhoc_storage(const server& srv, ADM_adhoc_context_t ctx, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index cd58fbc4fa44099b3979d3c2a01ea99f583b84f5..5a36a08d4d977f7c908acac8e1b4e84515106747 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -79,12 +79,23 @@ ADM_remove_job(ADM_server_t server, ADM_job_t job) { } ADM_return_t -ADM_register_adhoc_storage(ADM_server_t server, ADM_adhoc_context_t ctx, +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_t job, + const char* user_id, ADM_adhoc_context_t ctx, ADM_storage_t* adhoc_storage) { const admire::server srv{server}; - return admire::register_adhoc_storage(srv, ctx, adhoc_storage); + const auto rv = admire::detail::register_adhoc_storage( + srv, admire::job{job}, user_id, admire::adhoc_storage::ctx{ctx}); + + + if(!rv) { + return rv.error(); + } + + *adhoc_storage = admire::api::convert(*rv).release(); + + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 07ba498cd8ec2919cd0771aea49df43c88d399fc..49ce4597537d673ddf3f238efe20230c26c8345f 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -307,6 +307,53 @@ remove_job(const server& srv, const job& job) { return ADM_SUCCESS; } +tl::expected +register_adhoc_storage(const server& srv, const job& job, + const std::string& user_id, + const adhoc_storage::ctx& ctx) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: name: {} from: {} => " + "body: {{job: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), job); + + const auto rpc_job = api::convert(job); + const auto rpc_user_id = user_id.c_str(); + const auto rpc_ctx = api::convert(ctx); + + ADM_register_adhoc_storage_in_t in{rpc_job.get(), rpc_user_id, + rpc_ctx.get()}; + ADM_register_adhoc_storage_out_t out; + + const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); + + if(out.retval < 0) { + const auto retval = static_cast(out.retval); + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), retval, + out.op_id); + return tl::make_unexpected(retval); + } + + auto rpc_adhoc_storage = + admire::adhoc_storage{admire::storage::type::gekkofs, user_id, ctx}; + + rpc_adhoc_storage.id() = out.server_id; + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), ADM_SUCCESS, + out.op_id); + + return rpc_adhoc_storage; +} + tl::expected transfer_datasets(const server& srv, const job& job, const std::vector& sources, @@ -356,5 +403,4 @@ transfer_datasets(const server& srv, const job& job, return tx; } - } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index bf524d8933b83389907281804090834299242952..662f4e4778ac5f57ca573d0a0eb50674edfc6d42 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -50,6 +50,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +tl::expected +register_adhoc_storage(const server& srv, const job& job, const std::string& id, + const adhoc_storage::ctx& ctx); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 12efe2b50e1ee71d90dd37ee1807e29564b969e1..73c380f6894cc38db5e52f0affb51979aadaf508 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -37,6 +37,18 @@ struct remote_procedure { } }; +struct adhoc_storage_manager { + + template + static admire::adhoc_storage + create(Args&&... args) { + static std::atomic_uint64_t current_id; + auto adhoc_storage = admire::adhoc_storage(std::forward(args)...); + adhoc_storage.id() = current_id++; + return adhoc_storage; + } +}; + static void ADM_ping(hg_handle_t h) { @@ -222,11 +234,29 @@ ADM_register_adhoc_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; + const admire::job job(in.job); + const std::string id(in.id); + const admire::adhoc_storage::ctx ctx(in.ctx); + + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + job); - LOGGER_INFO("ADM_register_adhoc_storage()"); + const auto adhoc_storage = adhoc_storage_manager::create( + admire::adhoc_storage::type::gekkofs, id, ctx); - out.ret = 0; + admire::error_code rv = ADM_SUCCESS; + + out.op_id = rpc_id; + out.retval = rv; + out.server_id = *adhoc_storage.id(); + + LOGGER_INFO("rpc id: {} name: {} to: {} => " + "body: {{retval: {}, server_id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + rv, out.server_id); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS);