From 98de93134807a1d5d3608bc593310ca351ac4a4d Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 18 Nov 2022 10:12:35 +0100 Subject: [PATCH] API: Refactor ADM_register_pfs_storage() --- examples/c/ADM_register_pfs_storage.c | 61 +++++++--- examples/c/ADM_remove_pfs_storage.c | 61 ++++++---- examples/c/ADM_update_pfs_storage.c | 79 +++++++++---- examples/cxx/ADM_register_pfs_storage.cpp | 16 +-- src/common/api/admire_types.h | 8 +- src/common/api/admire_types.hpp | 13 ++- src/common/api/convert.cpp | 27 +++++ src/common/api/convert.hpp | 52 +++++++++ src/common/api/internal_types.hpp | 19 ++++ src/common/api/types.cpp | 52 +++++++-- src/common/net/proto/rpc_types.h | 16 ++- src/lib/admire.cpp | 29 ++--- src/lib/admire.h | 5 +- src/lib/admire.hpp | 7 +- src/lib/c_wrapper.cpp | 16 ++- src/lib/detail/impl.cpp | 48 +++++++- src/lib/detail/impl.hpp | 4 + src/scord/CMakeLists.txt | 2 +- src/scord/pfs_storage_manager.hpp | 131 ++++++++++++++++++++++ src/scord/rpc_handlers.cpp | 35 +++++- 20 files changed, 558 insertions(+), 123 deletions(-) create mode 100644 src/scord/pfs_storage_manager.hpp diff --git a/examples/c/ADM_register_pfs_storage.c b/examples/c/ADM_register_pfs_storage.c index 7b9a806f..5b65a411 100644 --- a/examples/c/ADM_register_pfs_storage.c +++ b/examples/c/ADM_register_pfs_storage.c @@ -25,11 +25,6 @@ #include #include #include -#include -#include "common.h" - -#define NINPUTS 10 -#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -41,28 +36,60 @@ main(int argc, char* argv[]) { } int exit_status = EXIT_SUCCESS; - ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; + + // pfs information + const char* pfs_name = "gpfs_scratch"; + const char* pfs_mount = "/gpfs/scratch"; + ADM_pfs_context_t pfs_ctx = NULL; + ADM_pfs_storage_t pfs_storage = NULL; - ADM_pfs_storage_t pfs_storage; + // Let's prepare all the information required by the API calls. + // ADM_register_pfs_storage() requires a set of nodes for the PFS + // storage to use and an appropriate execution context defining how it + // should behave: - ADM_pfs_context_t ctx = ADM_pfs_context_create("/gpfs"); - assert(ctx); + // 1. define the PFS execution context + pfs_ctx = ADM_pfs_context_create(pfs_mount); - ADM_return_t ret = ADM_register_pfs_storage(server, ctx, &pfs_storage); + if(pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing PFS context\n"); + goto cleanup; + } + + // All the information required by the ADM_register_pfs_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); + goto cleanup; + } - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully: %s\n", + // 2. Register the adhoc storage + if(ADM_register_pfs_storage(server, pfs_name, ADM_PFS_STORAGE_GPFS, pfs_ctx, + &pfs_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_register_pfs_storage() remote procedure completed " - "successfully\n"); + // The PFS storage is now registered into the system :) + exit_status = EXIT_SUCCESS; + + // Once the PFS storage is no longer required we need to notify the server + if((ret = ADM_remove_pfs_storage(server, pfs_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_pfs_storage() failed: %s\n", + ADM_strerror(ret)); + pfs_storage = NULL; + exit_status = EXIT_FAILURE; + // intentionally fall through... + } cleanup: ADM_server_destroy(server); + ADM_pfs_context_destroy(pfs_ctx); + exit(exit_status); } diff --git a/examples/c/ADM_remove_pfs_storage.c b/examples/c/ADM_remove_pfs_storage.c index e5a81c1a..8f0ab302 100644 --- a/examples/c/ADM_remove_pfs_storage.c +++ b/examples/c/ADM_remove_pfs_storage.c @@ -37,41 +37,60 @@ main(int argc, char* argv[]) { } int exit_status = EXIT_SUCCESS; - ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; - ADM_pfs_context_t ctx = ADM_pfs_context_create("/gpfs"); - assert(ctx); + // pfs information + const char* pfs_name = "gpfs_scratch"; + const char* pfs_mount = "/gpfs/scratch"; + ADM_pfs_context_t pfs_ctx = NULL; + ADM_pfs_storage_t pfs_storage = NULL; - ADM_pfs_storage_t pfs_storage; - ADM_return_t ret = ADM_register_pfs_storage(server, ctx, &pfs_storage); + // Let's prepare all the information required by the API calls. + // ADM_remove_pfs_storage() obviously requires a PFS storage to have + // been registered onto the system, so let's prepare first the data required + // to call ADM_register_pfs_storage(): - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully: %s\n", - ADM_strerror(ret)); - exit_status = EXIT_FAILURE; + // 1. define the PFS execution context + pfs_ctx = ADM_pfs_context_create(pfs_mount); + + if(pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing PFS context\n"); + goto cleanup; + } + + // All the information required by the ADM_register_pfs_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); goto cleanup; } - fprintf(stdout, "ADM_register_pfs_storage() remote procedure completed " - "successfully\n"); + // 2. Register the adhoc storage + if(ADM_register_pfs_storage(server, pfs_name, ADM_PFS_STORAGE_GPFS, pfs_ctx, + &pfs_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_pfs_storage() failed: %s\n", + ADM_strerror(ret)); + goto cleanup; + } - ret = ADM_remove_pfs_storage(server, pfs_storage); + // Now that we have an existing PFS storage registered into the system + // we can try to remove it... - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_remove_pfs_storage() remote procedure not completed " - "successfully: %s\n", + if((ret = ADM_remove_pfs_storage(server, pfs_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_remove_pfs_storage() remote procedure completed " - "successfully\n"); + // Everything is fine now... + exit_status = EXIT_SUCCESS; cleanup: ADM_server_destroy(server); + ADM_pfs_context_destroy(pfs_ctx); + exit(exit_status); } diff --git a/examples/c/ADM_update_pfs_storage.c b/examples/c/ADM_update_pfs_storage.c index eb159575..f67f610f 100644 --- a/examples/c/ADM_update_pfs_storage.c +++ b/examples/c/ADM_update_pfs_storage.c @@ -37,41 +37,80 @@ main(int argc, char* argv[]) { } int exit_status = EXIT_SUCCESS; - ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; - ADM_pfs_context_t ctx = ADM_pfs_context_create("/gpfs"); - assert(ctx); + // pfs information + const char* pfs_name = "gpfs_scratch"; + const char* pfs_mount = "/gpfs/scratch"; + const char* new_pfs_mount = "/gpfs/scratch2"; + ADM_pfs_context_t pfs_ctx = NULL; + ADM_pfs_context_t new_pfs_ctx = NULL; + ADM_pfs_storage_t pfs_storage = NULL; - ADM_pfs_storage_t pfs_storage; - ADM_return_t ret = ADM_register_pfs_storage(server, ctx, &pfs_storage); + // Let's prepare all the information required by the API calls. + // ADM_register_pfs_storage() requires a set of nodes for the PFS + // storage to use and an appropriate execution context defining how it + // should behave: - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully: %s\n", + // 1. define the PFS execution context + pfs_ctx = ADM_pfs_context_create(pfs_mount); + + if(pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing PFS context\n"); + goto cleanup; + } + + // All the information required by the ADM_register_pfs_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); + goto cleanup; + } + + // 2. Register the adhoc storage + if(ADM_register_pfs_storage(server, pfs_name, ADM_PFS_STORAGE_GPFS, pfs_ctx, + &pfs_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_register_pfs_storage() remote procedure completed " - "successfully\n"); + // Now that we have an existing PFS storage registered into the + // system, let's prepare a new execution context for the PFS + // storage system + new_pfs_ctx = ADM_pfs_context_create(new_pfs_mount); - ret = ADM_update_pfs_storage(server, ctx, pfs_storage); + if(new_pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing new PFS context\n"); + goto cleanup; + } - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_update_pfs_storage() remote procedure not completed " - "successfully: %s\n", + // We can now request the update to the server + if((ret = ADM_update_pfs_storage(server, new_pfs_ctx, pfs_storage)) != + ADM_SUCCESS) { + fprintf(stderr, "ADM_update_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_update_pfs_storage() remote procedure completed " - "successfully\n"); + // At this point, the PFS storage has been updated... + exit_status = EXIT_SUCCESS; + + // Once the PFS storage is no longer required we need to notify the server + if((ret = ADM_remove_pfs_storage(server, pfs_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_pfs_storage() failed: %s\n", + ADM_strerror(ret)); + pfs_storage = NULL; + exit_status = EXIT_FAILURE; + // intentionally fall through... + } cleanup: ADM_server_destroy(server); + ADM_pfs_context_destroy(pfs_ctx); + exit(exit_status); } diff --git a/examples/cxx/ADM_register_pfs_storage.cpp b/examples/cxx/ADM_register_pfs_storage.cpp index 4ff9c553..b3ad5e47 100644 --- a/examples/cxx/ADM_register_pfs_storage.cpp +++ b/examples/cxx/ADM_register_pfs_storage.cpp @@ -38,25 +38,19 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_pfs_context_t ctx{}; - ADM_pfs_storage_t pfs_storage{}; - ADM_return_t ret = ADM_SUCCESS; + std::string pfs_name = "gpfs_scratch"; + std::string pfs_mount = "/gpfs/scratch"; try { - ret = admire::register_pfs_storage(server, ctx, &pfs_storage); + admire::register_pfs_storage(server, pfs_name, + admire::pfs_storage::type::gpfs, + admire::pfs_storage::ctx{pfs_mount}); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_pfs_storage() failed: {}\n", e.what()); exit(EXIT_FAILURE); } - if(ret != ADM_SUCCESS) { - fmt::print(stdout, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully\n"); - exit(EXIT_FAILURE); - } - fmt::print(stdout, "ADM_register_pfs_storage() remote procedure completed " "successfully\n"); } diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index 727fc841..197780d7 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -510,11 +510,11 @@ ADM_adhoc_context_destroy(ADM_adhoc_context_t ctx); * @param[in] pfs_ctx Some specific context information for the storage * tier or NULL if none is required. For instance, an adhoc storage system may * find it useful to provide an ADM_adhoc_context_t describing the instance. - * @return A valid ADM_ADHOC_STORAGE if successful, or NULL in case of failure. + * @return A valid ADM_PFS_STORAGE if successful, or NULL in case of failure. */ -ADM_adhoc_storage_t -ADM_pfs_storage_create(const char* name, ADM_adhoc_storage_type_t type, - uint64_t id, ADM_adhoc_context_t adhoc_ctx); +ADM_pfs_storage_t +ADM_pfs_storage_create(const char* name, ADM_pfs_storage_type_t type, + uint64_t id, ADM_pfs_context_t pfs_ctx); /** * Destroy an ADM_ADHOC_STORAGE created by ADM_adhoc_storage_destroy(). diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index a6613afa..49eed8ec 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -425,12 +425,18 @@ struct pfs_storage { pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id, std::filesystem::path mount_point); + + pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id, + const pfs_storage::ctx& pfs_ctx); + + explicit pfs_storage(ADM_pfs_storage_t storage); + pfs_storage(const pfs_storage& other) noexcept; - pfs_storage(pfs_storage&&) noexcept = default; + pfs_storage(pfs_storage&&) noexcept; pfs_storage& operator=(const pfs_storage& other) noexcept; pfs_storage& - operator=(pfs_storage&&) noexcept = default; + operator=(pfs_storage&&) noexcept; ~pfs_storage(); std::string @@ -442,6 +448,9 @@ struct pfs_storage { pfs_storage::ctx context() const; + void + update(admire::pfs_storage::ctx new_ctx); + private: class impl; std::unique_ptr m_pimpl; diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 3dc82ce3..1822695f 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -117,6 +117,33 @@ convert(const admire::adhoc_storage& st) { return managed_ctype{c_st, std::move(managed_ctx)}; } +managed_ctype +convert(const pfs_storage::ctx& ctx) { + return managed_ctype{ + ADM_pfs_context_create(ctx.mount_point().c_str())}; +} + +managed_ctype +convert(const std::optional& pfs_storage) { + + if(!pfs_storage) { + return managed_ctype{}; + } + + return convert(pfs_storage.value()); +} + +managed_ctype +convert(const admire::pfs_storage& st) { + + auto managed_ctx = convert(st.context()); + ADM_pfs_storage_t c_st = ADM_pfs_storage_create( + st.name().c_str(), static_cast(st.type()), + st.id(), managed_ctx.get()); + + return managed_ctype{c_st, std::move(managed_ctx)}; +} + managed_ctype convert(const admire::dataset& dataset) { return managed_ctype( diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 1104baba..7c95b640 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -51,6 +51,12 @@ convert(const adhoc_storage::ctx& ctx); managed_ctype convert(const admire::adhoc_storage& st); +managed_ctype +convert(const pfs_storage::ctx& ctx); + +managed_ctype +convert(const admire::pfs_storage& st); + managed_ctype convert(const admire::dataset& dataset); @@ -224,6 +230,52 @@ struct admire::api::managed_ctype { managed_ctype m_ctx; }; +template <> +struct admire::api::managed_ctype { + + managed_ctype() = default; + + explicit managed_ctype(ADM_pfs_context_t ctx) : m_pfs_context(ctx) {} + + ADM_pfs_context_t + get() const { + return m_pfs_context.get(); + } + + ADM_pfs_context_t + release() { + return m_pfs_context.release(); + } + + scord::utils::ctype_ptr + m_pfs_context; +}; + +template <> +struct admire::api::managed_ctype { + + managed_ctype() = default; + + explicit managed_ctype(ADM_pfs_storage_t st, + managed_ctype&& ctx) + : m_storage(st), m_ctx(std::move(ctx)) {} + + ADM_pfs_storage_t + get() const { + return m_storage.get(); + } + + ADM_pfs_storage_t + release() { + std::ignore = m_ctx.release(); + return m_storage.release(); + } + + scord::utils::ctype_ptr + m_storage; + managed_ctype m_ctx; +}; + template <> struct admire::api::managed_ctype { diff --git a/src/common/api/internal_types.hpp b/src/common/api/internal_types.hpp index 5aa9e412..131b2eed 100644 --- a/src/common/api/internal_types.hpp +++ b/src/common/api/internal_types.hpp @@ -111,6 +111,25 @@ struct adhoc_storage_info { mutable scord::abt::shared_mutex m_info_mutex; }; +struct pfs_storage_info { + + explicit pfs_storage_info(admire::pfs_storage pfs_storage) + : m_pfs_storage(std::move(pfs_storage)) {} + + admire::pfs_storage + pfs_storage() const { + return m_pfs_storage; + } + + void + update(admire::pfs_storage::ctx pfs_context) { + m_pfs_storage.update(std::move(pfs_context)); + } + + admire::pfs_storage m_pfs_storage; + std::shared_ptr m_client_info; +}; + } // namespace admire::internal #endif // SCORD_INTERNAL_TYPES_HPP diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 56a41359..98074ca6 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -719,7 +719,11 @@ ADM_pfs_context_create(const char* mountpoint) { return NULL; } - adm_pfs_context->c_mount = mountpoint; + if(mountpoint) { + size_t n = strlen(mountpoint); + adm_pfs_context->c_mount = (const char*) calloc(n + 1, sizeof(char)); + strcpy((char*) adm_pfs_context->c_mount, mountpoint); + } return adm_pfs_context; } @@ -1488,21 +1492,13 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} -pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : m_pimpl(std::make_unique(*other.m_pimpl)) {} - -pfs_storage& -pfs_storage::operator=(const pfs_storage& other) noexcept { - this->m_pimpl = std::make_unique(*other.m_pimpl); - return *this; -} - std::filesystem::path pfs_storage::ctx::mount_point() const { return m_mount_point; } class pfs_storage::impl { + public: explicit impl(enum pfs_storage::type type, std::string name, std::uint64_t id, pfs_storage::ctx ctx) @@ -1514,6 +1510,7 @@ public: operator=(const impl& other) noexcept = default; impl& operator=(impl&&) noexcept = default; + ~impl() = default; enum type type() const { @@ -1535,6 +1532,11 @@ public: return m_ctx; } + void + update(pfs_storage::ctx new_ctx) { + m_ctx = std::move(new_ctx); + } + private: enum type m_type; std::string m_name; @@ -1548,6 +1550,29 @@ pfs_storage::pfs_storage(enum pfs_storage::type type, std::string name, type, std::move(name), id, pfs_storage::ctx{std::move(mount_point)})) {} +pfs_storage::pfs_storage(enum pfs_storage::type type, std::string name, + std::uint64_t id, const pfs_storage::ctx& pfs_ctx) + : m_pimpl(std::make_unique(type, std::move(name), id, pfs_ctx)) {} + +pfs_storage::pfs_storage(ADM_pfs_storage_t st) + : m_pimpl(std::make_unique( + static_cast(st->s_type), st->s_name, + st->s_id, pfs_storage::ctx{st->s_pfs_ctx})) {} + +pfs_storage::pfs_storage(const pfs_storage& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +pfs_storage::pfs_storage(pfs_storage&&) noexcept = default; + +pfs_storage& +pfs_storage::operator=(const pfs_storage& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +pfs_storage& +pfs_storage::operator=(pfs_storage&&) noexcept = default; + pfs_storage::~pfs_storage() = default; std::string @@ -1570,7 +1595,13 @@ pfs_storage::context() const { return m_pimpl->context(); } +void +pfs_storage::update(admire::pfs_storage::ctx new_ctx) { + return m_pimpl->update(std::move(new_ctx)); +} + class job_requirements::impl { + public: impl(std::vector inputs, std::vector outputs) @@ -1675,6 +1706,7 @@ job_requirements::adhoc_storage() const { } namespace qos { + class entity::impl { public: template diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 07a8c029..b7c503db 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -382,15 +382,25 @@ MERCURY_GEN_PROC( /// ADM_deploy_adhoc_storage MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((hg_uint64_t) (id))) -MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((hg_uint64_t) (op_id)) ((hg_int32_t) (retval)) ); /// ADM_register_pfs_storage -MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC( + ADM_register_pfs_storage_in_t, + ((hg_const_string_t) (name)) + ((ADM_pfs_storage_type_t) (type)) + ((ADM_pfs_context_t) (ctx)) +); -MERCURY_GEN_PROC(ADM_register_pfs_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC( + ADM_register_pfs_storage_out_t, + ((hg_uint64_t) (op_id)) + ((hg_int32_t) (retval)) + ((hg_uint64_t) (id)) +); /// ADM_update_pfs_storage MERCURY_GEN_PROC(ADM_update_pfs_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 3e0a9648..c0b5cbd7 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -294,31 +294,18 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } } -ADM_return_t -register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, - ADM_pfs_storage_t* pfs_storage) { - (void) srv; - (void) ctx; - (void) pfs_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); +admire::pfs_storage +register_pfs_storage(const server& srv, const std::string& name, + enum pfs_storage::type type, const pfs_storage::ctx& ctx) { - LOGGER_INFO("ADM_register_pfs_storage(...)"); + const auto rv = detail::register_pfs_storage(srv, name, type, ctx); - ADM_register_pfs_storage_in_t in{}; - ADM_register_pfs_storage_out_t out; - - const auto rpc = endp.call("ADM_register_pfs_storage", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_register_pfs_storage() = {}", out.ret); - return static_cast(out.ret); + if(!rv) { + throw std::runtime_error(fmt::format( + "ADM_register_pfs_storage() error: {}", rv.error().message())); } - LOGGER_INFO("ADM_register_pfs_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + return rv.value(); } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index 87b62349..a7ba56ee 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -162,6 +162,8 @@ ADM_deploy_adhoc_storage(ADM_server_t server, * Register a PFS storage tier. * * @param[in] server The server to which the request is directed + * @param[in] name The desired name for the PFS. + * @param[in] type The desired type for the PFS. * @param[in] ctx The EXECUTION_CONTEXT for the PFS. * @param[out] adhoc_storage An ADM_STORAGE referring to the newly-created * PFS instance. @@ -169,7 +171,8 @@ ADM_deploy_adhoc_storage(ADM_server_t server, * successfully. */ ADM_return_t -ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, +ADM_register_pfs_storage(ADM_server_t server, const char* name, + ADM_pfs_storage_type_t type, ADM_pfs_context_t ctx, ADM_pfs_storage_t* pfs_storage); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 2980244f..054d5f45 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -72,9 +72,10 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); void deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); -ADM_return_t -register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, - ADM_pfs_storage_t* pfs_storage); +admire::pfs_storage +register_pfs_storage(const server& srv, const std::string& name, + enum admire::pfs_storage::type type, + const admire::pfs_storage::ctx& ctx); ADM_return_t update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index be03a197..8d7d7139 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -133,12 +133,22 @@ ADM_deploy_adhoc_storage(ADM_server_t server, } ADM_return_t -ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, +ADM_register_pfs_storage(ADM_server_t server, const char* name, + ADM_pfs_storage_type_t type, ADM_pfs_context_t ctx, ADM_pfs_storage_t* pfs_storage) { - const admire::server srv{server}; + const auto rv = admire::detail::register_pfs_storage( + admire::server{server}, name, + static_cast(type), + admire::pfs_storage::ctx{ctx}); - return admire::register_pfs_storage(srv, ctx, pfs_storage); + if(!rv) { + return rv.error(); + } + + *pfs_storage = admire::api::convert(rv.value()).release(); + + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 40f80930..b582115f 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -358,8 +358,7 @@ register_adhoc_storage(const server& srv, const std::string& name, } admire::error_code -deploy_adhoc_storage(const server& srv, - const adhoc_storage& adhoc_storage) { +deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -373,7 +372,7 @@ deploy_adhoc_storage(const server& srv, ADM_deploy_adhoc_storage_in_t in{adhoc_storage.id()}; ADM_deploy_adhoc_storage_out_t out; - + const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out); if(const auto rv = admire::error_code{out.retval}; !rv) { @@ -514,4 +513,47 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { out.op_id); return admire::error_code::success; } + +tl::expected +register_pfs_storage(const server& srv, const std::string& name, + enum pfs_storage::type type, const pfs_storage::ctx& ctx) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{name: {}, type: {}, pfs_ctx: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), name, type, ctx); + + const auto rpc_name = name.c_str(); + const auto rpc_type = static_cast(type); + const auto rpc_ctx = api::convert(ctx); + + ADM_register_pfs_storage_in_t in{rpc_name, rpc_type, rpc_ctx.get()}; + ADM_register_pfs_storage_out_t out; + + const auto rpc = endp.call("ADM_register_pfs_storage", &in, &out); + + if(const auto rv = admire::error_code{out.retval}; !rv) { + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), rv, out.op_id); + return tl::make_unexpected(rv); + } + + auto rpc_pfs_storage = admire::pfs_storage{type, name, out.id, ctx}; + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}, id: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), + admire::error_code::success, out.id, out.op_id); + + return rpc_pfs_storage; +} + } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 51aa88e9..98909dcc 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -69,6 +69,10 @@ admire::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +tl::expected +register_pfs_storage(const server& srv, const std::string& name, + enum pfs_storage::type type, const pfs_storage::ctx& ctx); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 61d63df6..8278ec4c 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -26,7 +26,7 @@ add_executable(scord) target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp - env.hpp job_manager.hpp adhoc_storage_manager.hpp) + env.hpp job_manager.hpp adhoc_storage_manager.hpp pfs_storage_manager.hpp) target_include_directories( scord diff --git a/src/scord/pfs_storage_manager.hpp b/src/scord/pfs_storage_manager.hpp new file mode 100644 index 00000000..48b31037 --- /dev/null +++ b/src/scord/pfs_storage_manager.hpp @@ -0,0 +1,131 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_PFS_STORAGE_MANAGER_HPP +#define SCORD_PFS_STORAGE_MANAGER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace scord { + +struct pfs_storage_manager : scord::utils::singleton { + + tl::expected, + admire::error_code> + create(enum admire::pfs_storage::type type, const std::string& name, + const admire::pfs_storage::ctx& ctx) { + + static std::atomic_uint64_t current_id; + std::uint64_t id = current_id++; + + abt::unique_lock lock(m_pfs_storages_mutex); + + if(const auto it = m_pfs_storages.find(id); + it == m_pfs_storages.end()) { + const auto& [it_pfs, inserted] = m_pfs_storages.emplace( + id, std::make_shared( + admire::pfs_storage{type, name, id, ctx})); + + if(!inserted) { + LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); + return tl::make_unexpected(admire::error_code::snafu); + } + + return it_pfs->second; + } + + LOGGER_ERROR("{}: PFS storage '{}' already exists", __FUNCTION__, id); + return tl::make_unexpected(admire::error_code::entity_exists); + } + + admire::error_code + update(std::uint64_t id, admire::pfs_storage::ctx new_ctx) { + + abt::unique_lock lock(m_pfs_storages_mutex); + + if(const auto it = m_pfs_storages.find(id); + it != m_pfs_storages.end()) { + const auto current_pfs_info = it->second; + current_pfs_info->update(std::move(new_ctx)); + return admire::error_code::success; + } + + LOGGER_ERROR("{}: Adhoc storage '{}' does not exist", __FUNCTION__, id); + return admire::error_code::no_such_entity; + } + + tl::expected, + admire::error_code> + find(std::uint64_t id) { + + abt::shared_lock lock(m_pfs_storages_mutex); + + if(auto it = m_pfs_storages.find(id); it != m_pfs_storages.end()) { + return it->second; + } + + LOGGER_ERROR("PFS storage '{}' was not registered or was already " + "deleted", + id); + return tl::make_unexpected(admire::error_code::no_such_entity); + } + + admire::error_code + remove(std::uint64_t id) { + + abt::unique_lock lock(m_pfs_storages_mutex); + + if(m_pfs_storages.count(id) != 0) { + m_pfs_storages.erase(id); + return admire::error_code::success; + } + + LOGGER_ERROR("PFS storage '{}' was not registered or was already " + "deleted", + id); + + return admire::error_code::no_such_entity; + } + +private: + friend class scord::utils::singleton; + pfs_storage_manager() = default; + + mutable abt::shared_mutex m_pfs_storages_mutex; + std::unordered_map> + m_pfs_storages; +}; + +} // namespace scord + +#endif // SCORD_PFS_STORAGE_MANAGER_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 515caed7..47371427 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -30,6 +30,7 @@ #include "rpc_handlers.hpp" #include "job_manager.hpp" #include "adhoc_storage_manager.hpp" +#include "pfs_storage_manager.hpp" // Process running #include @@ -549,6 +550,7 @@ DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage); static void ADM_register_pfs_storage(hg_handle_t h) { + using admire::pfs_storage; using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; @@ -561,11 +563,38 @@ ADM_register_pfs_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; + const std::string pfs_name{in.name}; + const auto pfs_type = static_cast(in.type); + const pfs_storage::ctx pfs_ctx{in.ctx}; - LOGGER_INFO("ADM_register_pfs_storage()"); + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{name: {}, type: {}, pfs_ctx: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + pfs_name, pfs_type, pfs_ctx); - out.ret = 0; + admire::error_code ec; + std::uint64_t out_pfs_id = 0; + auto& pfs_manager = scord::pfs_storage_manager::instance(); + + if(const auto pm_result = pfs_manager.create(pfs_type, pfs_name, pfs_ctx); + pm_result.has_value()) { + const auto& adhoc_storage_info = pm_result.value(); + out_pfs_id = adhoc_storage_info->pfs_storage().id(); + } else { + LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"", + rpc_id, pm_result.error()); + ec = pm_result.error(); + } + + out.op_id = rpc_id; + out.retval = ec; + out.id = out_pfs_id; + + LOGGER_INFO("rpc id: {} name: {} to: {} => " + "body: {{retval: {}, id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec, out.id); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab