diff --git a/examples/c/ADM_register_pfs_storage.c b/examples/c/ADM_register_pfs_storage.c index 7b9a806f5d6133fab1efb4c956ce3905862c6597..5b65a411c90761c9b4bc3fe577d6b5e9ea584805 100644 --- a/examples/c/ADM_register_pfs_storage.c +++ b/examples/c/ADM_register_pfs_storage.c @@ -25,11 +25,6 @@ #include #include #include -#include -#include "common.h" - -#define NINPUTS 10 -#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -41,28 +36,60 @@ main(int argc, char* argv[]) { } int exit_status = EXIT_SUCCESS; - ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; + + // pfs information + const char* pfs_name = "gpfs_scratch"; + const char* pfs_mount = "/gpfs/scratch"; + ADM_pfs_context_t pfs_ctx = NULL; + ADM_pfs_storage_t pfs_storage = NULL; - ADM_pfs_storage_t pfs_storage; + // Let's prepare all the information required by the API calls. + // ADM_register_pfs_storage() requires a set of nodes for the PFS + // storage to use and an appropriate execution context defining how it + // should behave: - ADM_pfs_context_t ctx = ADM_pfs_context_create("/gpfs"); - assert(ctx); + // 1. define the PFS execution context + pfs_ctx = ADM_pfs_context_create(pfs_mount); - ADM_return_t ret = ADM_register_pfs_storage(server, ctx, &pfs_storage); + if(pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing PFS context\n"); + goto cleanup; + } + + // All the information required by the ADM_register_pfs_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); + goto cleanup; + } - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully: %s\n", + // 2. Register the adhoc storage + if(ADM_register_pfs_storage(server, pfs_name, ADM_PFS_STORAGE_GPFS, pfs_ctx, + &pfs_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_register_pfs_storage() remote procedure completed " - "successfully\n"); + // The PFS storage is now registered into the system :) + exit_status = EXIT_SUCCESS; + + // Once the PFS storage is no longer required we need to notify the server + if((ret = ADM_remove_pfs_storage(server, pfs_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_pfs_storage() failed: %s\n", + ADM_strerror(ret)); + pfs_storage = NULL; + exit_status = EXIT_FAILURE; + // intentionally fall through... + } cleanup: ADM_server_destroy(server); + ADM_pfs_context_destroy(pfs_ctx); + exit(exit_status); } diff --git a/examples/c/ADM_remove_pfs_storage.c b/examples/c/ADM_remove_pfs_storage.c index e5a81c1adcf8c8e8d907c6b361ad480aa9c63680..8f0ab302e1128ae72b44ff49c4febca99f97c454 100644 --- a/examples/c/ADM_remove_pfs_storage.c +++ b/examples/c/ADM_remove_pfs_storage.c @@ -37,41 +37,60 @@ main(int argc, char* argv[]) { } int exit_status = EXIT_SUCCESS; - ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; - ADM_pfs_context_t ctx = ADM_pfs_context_create("/gpfs"); - assert(ctx); + // pfs information + const char* pfs_name = "gpfs_scratch"; + const char* pfs_mount = "/gpfs/scratch"; + ADM_pfs_context_t pfs_ctx = NULL; + ADM_pfs_storage_t pfs_storage = NULL; - ADM_pfs_storage_t pfs_storage; - ADM_return_t ret = ADM_register_pfs_storage(server, ctx, &pfs_storage); + // Let's prepare all the information required by the API calls. + // ADM_remove_pfs_storage() obviously requires a PFS storage to have + // been registered onto the system, so let's prepare first the data required + // to call ADM_register_pfs_storage(): - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully: %s\n", - ADM_strerror(ret)); - exit_status = EXIT_FAILURE; + // 1. define the PFS execution context + pfs_ctx = ADM_pfs_context_create(pfs_mount); + + if(pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing PFS context\n"); + goto cleanup; + } + + // All the information required by the ADM_register_pfs_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); goto cleanup; } - fprintf(stdout, "ADM_register_pfs_storage() remote procedure completed " - "successfully\n"); + // 2. Register the adhoc storage + if(ADM_register_pfs_storage(server, pfs_name, ADM_PFS_STORAGE_GPFS, pfs_ctx, + &pfs_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_pfs_storage() failed: %s\n", + ADM_strerror(ret)); + goto cleanup; + } - ret = ADM_remove_pfs_storage(server, pfs_storage); + // Now that we have an existing PFS storage registered into the system + // we can try to remove it... - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_remove_pfs_storage() remote procedure not completed " - "successfully: %s\n", + if((ret = ADM_remove_pfs_storage(server, pfs_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_remove_pfs_storage() remote procedure completed " - "successfully\n"); + // Everything is fine now... + exit_status = EXIT_SUCCESS; cleanup: ADM_server_destroy(server); + ADM_pfs_context_destroy(pfs_ctx); + exit(exit_status); } diff --git a/examples/c/ADM_update_pfs_storage.c b/examples/c/ADM_update_pfs_storage.c index eb15957562dfa425c011b86566983fc043e7511a..f67f610f3c989245219e549cfc3e74d39372e2d3 100644 --- a/examples/c/ADM_update_pfs_storage.c +++ b/examples/c/ADM_update_pfs_storage.c @@ -37,41 +37,80 @@ main(int argc, char* argv[]) { } int exit_status = EXIT_SUCCESS; - ADM_server_t server = ADM_server_create("tcp", argv[1]); + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; - ADM_pfs_context_t ctx = ADM_pfs_context_create("/gpfs"); - assert(ctx); + // pfs information + const char* pfs_name = "gpfs_scratch"; + const char* pfs_mount = "/gpfs/scratch"; + const char* new_pfs_mount = "/gpfs/scratch2"; + ADM_pfs_context_t pfs_ctx = NULL; + ADM_pfs_context_t new_pfs_ctx = NULL; + ADM_pfs_storage_t pfs_storage = NULL; - ADM_pfs_storage_t pfs_storage; - ADM_return_t ret = ADM_register_pfs_storage(server, ctx, &pfs_storage); + // Let's prepare all the information required by the API calls. + // ADM_register_pfs_storage() requires a set of nodes for the PFS + // storage to use and an appropriate execution context defining how it + // should behave: - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully: %s\n", + // 1. define the PFS execution context + pfs_ctx = ADM_pfs_context_create(pfs_mount); + + if(pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing PFS context\n"); + goto cleanup; + } + + // All the information required by the ADM_register_pfs_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); + goto cleanup; + } + + // 2. Register the adhoc storage + if(ADM_register_pfs_storage(server, pfs_name, ADM_PFS_STORAGE_GPFS, pfs_ctx, + &pfs_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_register_pfs_storage() remote procedure completed " - "successfully\n"); + // Now that we have an existing PFS storage registered into the + // system, let's prepare a new execution context for the PFS + // storage system + new_pfs_ctx = ADM_pfs_context_create(new_pfs_mount); - ret = ADM_update_pfs_storage(server, ctx, pfs_storage); + if(new_pfs_ctx == NULL) { + fprintf(stderr, "Fatal error preparing new PFS context\n"); + goto cleanup; + } - if(ret != ADM_SUCCESS) { - fprintf(stderr, - "ADM_update_pfs_storage() remote procedure not completed " - "successfully: %s\n", + // We can now request the update to the server + if((ret = ADM_update_pfs_storage(server, new_pfs_ctx, pfs_storage)) != + ADM_SUCCESS) { + fprintf(stderr, "ADM_update_pfs_storage() failed: %s\n", ADM_strerror(ret)); - exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_update_pfs_storage() remote procedure completed " - "successfully\n"); + // At this point, the PFS storage has been updated... + exit_status = EXIT_SUCCESS; + + // Once the PFS storage is no longer required we need to notify the server + if((ret = ADM_remove_pfs_storage(server, pfs_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_pfs_storage() failed: %s\n", + ADM_strerror(ret)); + pfs_storage = NULL; + exit_status = EXIT_FAILURE; + // intentionally fall through... + } cleanup: ADM_server_destroy(server); + ADM_pfs_context_destroy(pfs_ctx); + exit(exit_status); } diff --git a/examples/cxx/ADM_register_pfs_storage.cpp b/examples/cxx/ADM_register_pfs_storage.cpp index 4ff9c5536e64e1d1cbdaf5b083d7745da135522f..b3ad5e47419ccbd3e01442b0378c0f3ab4bd720c 100644 --- a/examples/cxx/ADM_register_pfs_storage.cpp +++ b/examples/cxx/ADM_register_pfs_storage.cpp @@ -38,25 +38,19 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; - ADM_pfs_context_t ctx{}; - ADM_pfs_storage_t pfs_storage{}; - ADM_return_t ret = ADM_SUCCESS; + std::string pfs_name = "gpfs_scratch"; + std::string pfs_mount = "/gpfs/scratch"; try { - ret = admire::register_pfs_storage(server, ctx, &pfs_storage); + admire::register_pfs_storage(server, pfs_name, + admire::pfs_storage::type::gpfs, + admire::pfs_storage::ctx{pfs_mount}); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_register_pfs_storage() failed: {}\n", e.what()); exit(EXIT_FAILURE); } - if(ret != ADM_SUCCESS) { - fmt::print(stdout, - "ADM_register_pfs_storage() remote procedure not completed " - "successfully\n"); - exit(EXIT_FAILURE); - } - fmt::print(stdout, "ADM_register_pfs_storage() remote procedure completed " "successfully\n"); } diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index 727fc8419432866425ebaa627822f3550802a69b..197780d716b69c30a91569ab178f45c99004a477 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -510,11 +510,11 @@ ADM_adhoc_context_destroy(ADM_adhoc_context_t ctx); * @param[in] pfs_ctx Some specific context information for the storage * tier or NULL if none is required. For instance, an adhoc storage system may * find it useful to provide an ADM_adhoc_context_t describing the instance. - * @return A valid ADM_ADHOC_STORAGE if successful, or NULL in case of failure. + * @return A valid ADM_PFS_STORAGE if successful, or NULL in case of failure. */ -ADM_adhoc_storage_t -ADM_pfs_storage_create(const char* name, ADM_adhoc_storage_type_t type, - uint64_t id, ADM_adhoc_context_t adhoc_ctx); +ADM_pfs_storage_t +ADM_pfs_storage_create(const char* name, ADM_pfs_storage_type_t type, + uint64_t id, ADM_pfs_context_t pfs_ctx); /** * Destroy an ADM_ADHOC_STORAGE created by ADM_adhoc_storage_destroy(). diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index a6613afaa3cfe510dc3621aa9d4ea4a5d217d0d3..49eed8ec24c795ebda46c6ac3bc180d13681a6f8 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -425,12 +425,18 @@ struct pfs_storage { pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id, std::filesystem::path mount_point); + + pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id, + const pfs_storage::ctx& pfs_ctx); + + explicit pfs_storage(ADM_pfs_storage_t storage); + pfs_storage(const pfs_storage& other) noexcept; - pfs_storage(pfs_storage&&) noexcept = default; + pfs_storage(pfs_storage&&) noexcept; pfs_storage& operator=(const pfs_storage& other) noexcept; pfs_storage& - operator=(pfs_storage&&) noexcept = default; + operator=(pfs_storage&&) noexcept; ~pfs_storage(); std::string @@ -442,6 +448,9 @@ struct pfs_storage { pfs_storage::ctx context() const; + void + update(admire::pfs_storage::ctx new_ctx); + private: class impl; std::unique_ptr m_pimpl; diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 3dc82ce3ef9840c6e74db8c3ac33f3c47c73d9fa..1822695f337b86d951869f5055f79e1d93412f97 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -117,6 +117,33 @@ convert(const admire::adhoc_storage& st) { return managed_ctype{c_st, std::move(managed_ctx)}; } +managed_ctype +convert(const pfs_storage::ctx& ctx) { + return managed_ctype{ + ADM_pfs_context_create(ctx.mount_point().c_str())}; +} + +managed_ctype +convert(const std::optional& pfs_storage) { + + if(!pfs_storage) { + return managed_ctype{}; + } + + return convert(pfs_storage.value()); +} + +managed_ctype +convert(const admire::pfs_storage& st) { + + auto managed_ctx = convert(st.context()); + ADM_pfs_storage_t c_st = ADM_pfs_storage_create( + st.name().c_str(), static_cast(st.type()), + st.id(), managed_ctx.get()); + + return managed_ctype{c_st, std::move(managed_ctx)}; +} + managed_ctype convert(const admire::dataset& dataset) { return managed_ctype( diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 1104baba40ae52a66dfc80e13ba6f791fe701ac7..7c95b6402069a466130ea2e442ec2f3690c25cd7 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -51,6 +51,12 @@ convert(const adhoc_storage::ctx& ctx); managed_ctype convert(const admire::adhoc_storage& st); +managed_ctype +convert(const pfs_storage::ctx& ctx); + +managed_ctype +convert(const admire::pfs_storage& st); + managed_ctype convert(const admire::dataset& dataset); @@ -224,6 +230,52 @@ struct admire::api::managed_ctype { managed_ctype m_ctx; }; +template <> +struct admire::api::managed_ctype { + + managed_ctype() = default; + + explicit managed_ctype(ADM_pfs_context_t ctx) : m_pfs_context(ctx) {} + + ADM_pfs_context_t + get() const { + return m_pfs_context.get(); + } + + ADM_pfs_context_t + release() { + return m_pfs_context.release(); + } + + scord::utils::ctype_ptr + m_pfs_context; +}; + +template <> +struct admire::api::managed_ctype { + + managed_ctype() = default; + + explicit managed_ctype(ADM_pfs_storage_t st, + managed_ctype&& ctx) + : m_storage(st), m_ctx(std::move(ctx)) {} + + ADM_pfs_storage_t + get() const { + return m_storage.get(); + } + + ADM_pfs_storage_t + release() { + std::ignore = m_ctx.release(); + return m_storage.release(); + } + + scord::utils::ctype_ptr + m_storage; + managed_ctype m_ctx; +}; + template <> struct admire::api::managed_ctype { diff --git a/src/common/api/internal_types.hpp b/src/common/api/internal_types.hpp index 5aa9e412af40fa6d6d900fa1b542d6a3b294a424..131b2eed38b8a62b339c79ec24247b0d1ac8ab87 100644 --- a/src/common/api/internal_types.hpp +++ b/src/common/api/internal_types.hpp @@ -111,6 +111,25 @@ struct adhoc_storage_info { mutable scord::abt::shared_mutex m_info_mutex; }; +struct pfs_storage_info { + + explicit pfs_storage_info(admire::pfs_storage pfs_storage) + : m_pfs_storage(std::move(pfs_storage)) {} + + admire::pfs_storage + pfs_storage() const { + return m_pfs_storage; + } + + void + update(admire::pfs_storage::ctx pfs_context) { + m_pfs_storage.update(std::move(pfs_context)); + } + + admire::pfs_storage m_pfs_storage; + std::shared_ptr m_client_info; +}; + } // namespace admire::internal #endif // SCORD_INTERNAL_TYPES_HPP diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 56a4135943b192cfd8c3aee1bcc0297afc8da775..98074ca6e26bfb2ffbd64be2af786fe9d66a7c08 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -719,7 +719,11 @@ ADM_pfs_context_create(const char* mountpoint) { return NULL; } - adm_pfs_context->c_mount = mountpoint; + if(mountpoint) { + size_t n = strlen(mountpoint); + adm_pfs_context->c_mount = (const char*) calloc(n + 1, sizeof(char)); + strcpy((char*) adm_pfs_context->c_mount, mountpoint); + } return adm_pfs_context; } @@ -1488,21 +1492,13 @@ pfs_storage::ctx::ctx(std::filesystem::path mount_point) pfs_storage::ctx::ctx(ADM_pfs_context_t ctx) : pfs_storage::ctx(ctx->c_mount) {} -pfs_storage::pfs_storage(const pfs_storage& other) noexcept - : m_pimpl(std::make_unique(*other.m_pimpl)) {} - -pfs_storage& -pfs_storage::operator=(const pfs_storage& other) noexcept { - this->m_pimpl = std::make_unique(*other.m_pimpl); - return *this; -} - std::filesystem::path pfs_storage::ctx::mount_point() const { return m_mount_point; } class pfs_storage::impl { + public: explicit impl(enum pfs_storage::type type, std::string name, std::uint64_t id, pfs_storage::ctx ctx) @@ -1514,6 +1510,7 @@ public: operator=(const impl& other) noexcept = default; impl& operator=(impl&&) noexcept = default; + ~impl() = default; enum type type() const { @@ -1535,6 +1532,11 @@ public: return m_ctx; } + void + update(pfs_storage::ctx new_ctx) { + m_ctx = std::move(new_ctx); + } + private: enum type m_type; std::string m_name; @@ -1548,6 +1550,29 @@ pfs_storage::pfs_storage(enum pfs_storage::type type, std::string name, type, std::move(name), id, pfs_storage::ctx{std::move(mount_point)})) {} +pfs_storage::pfs_storage(enum pfs_storage::type type, std::string name, + std::uint64_t id, const pfs_storage::ctx& pfs_ctx) + : m_pimpl(std::make_unique(type, std::move(name), id, pfs_ctx)) {} + +pfs_storage::pfs_storage(ADM_pfs_storage_t st) + : m_pimpl(std::make_unique( + static_cast(st->s_type), st->s_name, + st->s_id, pfs_storage::ctx{st->s_pfs_ctx})) {} + +pfs_storage::pfs_storage(const pfs_storage& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +pfs_storage::pfs_storage(pfs_storage&&) noexcept = default; + +pfs_storage& +pfs_storage::operator=(const pfs_storage& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +pfs_storage& +pfs_storage::operator=(pfs_storage&&) noexcept = default; + pfs_storage::~pfs_storage() = default; std::string @@ -1570,7 +1595,13 @@ pfs_storage::context() const { return m_pimpl->context(); } +void +pfs_storage::update(admire::pfs_storage::ctx new_ctx) { + return m_pimpl->update(std::move(new_ctx)); +} + class job_requirements::impl { + public: impl(std::vector inputs, std::vector outputs) @@ -1675,6 +1706,7 @@ job_requirements::adhoc_storage() const { } namespace qos { + class entity::impl { public: template diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 07a8c02919564959f083c6abe21faaf7122cab8f..b7c503db726897b670f906a0f0ecdf62182ccc05 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -382,15 +382,25 @@ MERCURY_GEN_PROC( /// ADM_deploy_adhoc_storage MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((hg_uint64_t) (id))) -MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((hg_uint64_t) (op_id)) ((hg_int32_t) (retval)) ); /// ADM_register_pfs_storage -MERCURY_GEN_PROC(ADM_register_pfs_storage_in_t, ((int32_t) (reqs))) +MERCURY_GEN_PROC( + ADM_register_pfs_storage_in_t, + ((hg_const_string_t) (name)) + ((ADM_pfs_storage_type_t) (type)) + ((ADM_pfs_context_t) (ctx)) +); -MERCURY_GEN_PROC(ADM_register_pfs_storage_out_t, ((int32_t) (ret))) +MERCURY_GEN_PROC( + ADM_register_pfs_storage_out_t, + ((hg_uint64_t) (op_id)) + ((hg_int32_t) (retval)) + ((hg_uint64_t) (id)) +); /// ADM_update_pfs_storage MERCURY_GEN_PROC(ADM_update_pfs_storage_in_t, ((int32_t) (reqs))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 3e0a964899b790e86ae5ad0b2d70b4c204c4a002..c0b5cbd7c9a1dc597c834f8880653e7cd26cae5e 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -294,31 +294,18 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } } -ADM_return_t -register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, - ADM_pfs_storage_t* pfs_storage) { - (void) srv; - (void) ctx; - (void) pfs_storage; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); +admire::pfs_storage +register_pfs_storage(const server& srv, const std::string& name, + enum pfs_storage::type type, const pfs_storage::ctx& ctx) { - LOGGER_INFO("ADM_register_pfs_storage(...)"); + const auto rv = detail::register_pfs_storage(srv, name, type, ctx); - ADM_register_pfs_storage_in_t in{}; - ADM_register_pfs_storage_out_t out; - - const auto rpc = endp.call("ADM_register_pfs_storage", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_register_pfs_storage() = {}", out.ret); - return static_cast(out.ret); + if(!rv) { + throw std::runtime_error(fmt::format( + "ADM_register_pfs_storage() error: {}", rv.error().message())); } - LOGGER_INFO("ADM_register_pfs_storage() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + return rv.value(); } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index 87b623498dd8f83cfbfcbb19e9b3c7123553e84c..a7ba56ee9836cc644f9ec1400cf16e0c5dff497c 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -162,6 +162,8 @@ ADM_deploy_adhoc_storage(ADM_server_t server, * Register a PFS storage tier. * * @param[in] server The server to which the request is directed + * @param[in] name The desired name for the PFS. + * @param[in] type The desired type for the PFS. * @param[in] ctx The EXECUTION_CONTEXT for the PFS. * @param[out] adhoc_storage An ADM_STORAGE referring to the newly-created * PFS instance. @@ -169,7 +171,8 @@ ADM_deploy_adhoc_storage(ADM_server_t server, * successfully. */ ADM_return_t -ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, +ADM_register_pfs_storage(ADM_server_t server, const char* name, + ADM_pfs_storage_type_t type, ADM_pfs_context_t ctx, ADM_pfs_storage_t* pfs_storage); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 2980244f0d48efb34d2b743df9d04c55e9be23f9..054d5f45478bce98c1a1b950b8d9916943b3fa80 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -72,9 +72,10 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); void deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); -ADM_return_t -register_pfs_storage(const server& srv, ADM_pfs_context_t ctx, - ADM_pfs_storage_t* pfs_storage); +admire::pfs_storage +register_pfs_storage(const server& srv, const std::string& name, + enum admire::pfs_storage::type type, + const admire::pfs_storage::ctx& ctx); ADM_return_t update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index be03a19722ecf18d350ab582b52097f6842f6839..8d7d7139a534c89467003c0b2dfa42c2296c3af1 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -133,12 +133,22 @@ ADM_deploy_adhoc_storage(ADM_server_t server, } ADM_return_t -ADM_register_pfs_storage(ADM_server_t server, ADM_pfs_context_t ctx, +ADM_register_pfs_storage(ADM_server_t server, const char* name, + ADM_pfs_storage_type_t type, ADM_pfs_context_t ctx, ADM_pfs_storage_t* pfs_storage) { - const admire::server srv{server}; + const auto rv = admire::detail::register_pfs_storage( + admire::server{server}, name, + static_cast(type), + admire::pfs_storage::ctx{ctx}); - return admire::register_pfs_storage(srv, ctx, pfs_storage); + if(!rv) { + return rv.error(); + } + + *pfs_storage = admire::api::convert(rv.value()).release(); + + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 40f809308332c06a7adc40ba136e3b4abcdddba0..b582115f5ceca57501e7fba28f625f85973e2098 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -358,8 +358,7 @@ register_adhoc_storage(const server& srv, const std::string& name, } admire::error_code -deploy_adhoc_storage(const server& srv, - const adhoc_storage& adhoc_storage) { +deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -373,7 +372,7 @@ deploy_adhoc_storage(const server& srv, ADM_deploy_adhoc_storage_in_t in{adhoc_storage.id()}; ADM_deploy_adhoc_storage_out_t out; - + const auto rpc = endp.call("ADM_deploy_adhoc_storage", &in, &out); if(const auto rv = admire::error_code{out.retval}; !rv) { @@ -514,4 +513,47 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { out.op_id); return admire::error_code::success; } + +tl::expected +register_pfs_storage(const server& srv, const std::string& name, + enum pfs_storage::type type, const pfs_storage::ctx& ctx) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{name: {}, type: {}, pfs_ctx: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), name, type, ctx); + + const auto rpc_name = name.c_str(); + const auto rpc_type = static_cast(type); + const auto rpc_ctx = api::convert(ctx); + + ADM_register_pfs_storage_in_t in{rpc_name, rpc_type, rpc_ctx.get()}; + ADM_register_pfs_storage_out_t out; + + const auto rpc = endp.call("ADM_register_pfs_storage", &in, &out); + + if(const auto rv = admire::error_code{out.retval}; !rv) { + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), rv, out.op_id); + return tl::make_unexpected(rv); + } + + auto rpc_pfs_storage = admire::pfs_storage{type, name, out.id, ctx}; + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}, id: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), + admire::error_code::success, out.id, out.op_id); + + return rpc_pfs_storage; +} + } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 51aa88e981649703e0954eca699ce5717a534011..98909dcc594013cf02456a846031ed723577775d 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -69,6 +69,10 @@ admire::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +tl::expected +register_pfs_storage(const server& srv, const std::string& name, + enum pfs_storage::type type, const pfs_storage::ctx& ctx); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 61d63df6b5456241e4c81fb36f8446863ab49220..8278ec4ce2d455fcd758d37ae7b30575405c34f5 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -26,7 +26,7 @@ add_executable(scord) target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp - env.hpp job_manager.hpp adhoc_storage_manager.hpp) + env.hpp job_manager.hpp adhoc_storage_manager.hpp pfs_storage_manager.hpp) target_include_directories( scord diff --git a/src/scord/pfs_storage_manager.hpp b/src/scord/pfs_storage_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..48b31037c274a3bdb401fab1f2825ccab2610263 --- /dev/null +++ b/src/scord/pfs_storage_manager.hpp @@ -0,0 +1,131 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_PFS_STORAGE_MANAGER_HPP +#define SCORD_PFS_STORAGE_MANAGER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace scord { + +struct pfs_storage_manager : scord::utils::singleton { + + tl::expected, + admire::error_code> + create(enum admire::pfs_storage::type type, const std::string& name, + const admire::pfs_storage::ctx& ctx) { + + static std::atomic_uint64_t current_id; + std::uint64_t id = current_id++; + + abt::unique_lock lock(m_pfs_storages_mutex); + + if(const auto it = m_pfs_storages.find(id); + it == m_pfs_storages.end()) { + const auto& [it_pfs, inserted] = m_pfs_storages.emplace( + id, std::make_shared( + admire::pfs_storage{type, name, id, ctx})); + + if(!inserted) { + LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); + return tl::make_unexpected(admire::error_code::snafu); + } + + return it_pfs->second; + } + + LOGGER_ERROR("{}: PFS storage '{}' already exists", __FUNCTION__, id); + return tl::make_unexpected(admire::error_code::entity_exists); + } + + admire::error_code + update(std::uint64_t id, admire::pfs_storage::ctx new_ctx) { + + abt::unique_lock lock(m_pfs_storages_mutex); + + if(const auto it = m_pfs_storages.find(id); + it != m_pfs_storages.end()) { + const auto current_pfs_info = it->second; + current_pfs_info->update(std::move(new_ctx)); + return admire::error_code::success; + } + + LOGGER_ERROR("{}: Adhoc storage '{}' does not exist", __FUNCTION__, id); + return admire::error_code::no_such_entity; + } + + tl::expected, + admire::error_code> + find(std::uint64_t id) { + + abt::shared_lock lock(m_pfs_storages_mutex); + + if(auto it = m_pfs_storages.find(id); it != m_pfs_storages.end()) { + return it->second; + } + + LOGGER_ERROR("PFS storage '{}' was not registered or was already " + "deleted", + id); + return tl::make_unexpected(admire::error_code::no_such_entity); + } + + admire::error_code + remove(std::uint64_t id) { + + abt::unique_lock lock(m_pfs_storages_mutex); + + if(m_pfs_storages.count(id) != 0) { + m_pfs_storages.erase(id); + return admire::error_code::success; + } + + LOGGER_ERROR("PFS storage '{}' was not registered or was already " + "deleted", + id); + + return admire::error_code::no_such_entity; + } + +private: + friend class scord::utils::singleton; + pfs_storage_manager() = default; + + mutable abt::shared_mutex m_pfs_storages_mutex; + std::unordered_map> + m_pfs_storages; +}; + +} // namespace scord + +#endif // SCORD_PFS_STORAGE_MANAGER_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 515caed7dcb14a600f78c23957860a93180d10a2..473714272b9d083190d146310976ff2f0a6af617 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -30,6 +30,7 @@ #include "rpc_handlers.hpp" #include "job_manager.hpp" #include "adhoc_storage_manager.hpp" +#include "pfs_storage_manager.hpp" // Process running #include @@ -549,6 +550,7 @@ DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage); static void ADM_register_pfs_storage(hg_handle_t h) { + using admire::pfs_storage; using scord::network::utils::get_address; [[maybe_unused]] hg_return_t ret; @@ -561,11 +563,38 @@ ADM_register_pfs_storage(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; + const std::string pfs_name{in.name}; + const auto pfs_type = static_cast(in.type); + const pfs_storage::ctx pfs_ctx{in.ctx}; - LOGGER_INFO("ADM_register_pfs_storage()"); + const auto rpc_id = remote_procedure::new_id(); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{name: {}, type: {}, pfs_ctx: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + pfs_name, pfs_type, pfs_ctx); - out.ret = 0; + admire::error_code ec; + std::uint64_t out_pfs_id = 0; + auto& pfs_manager = scord::pfs_storage_manager::instance(); + + if(const auto pm_result = pfs_manager.create(pfs_type, pfs_name, pfs_ctx); + pm_result.has_value()) { + const auto& adhoc_storage_info = pm_result.value(); + out_pfs_id = adhoc_storage_info->pfs_storage().id(); + } else { + LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"", + rpc_id, pm_result.error()); + ec = pm_result.error(); + } + + out.op_id = rpc_id; + out.retval = ec; + out.id = out_pfs_id; + + LOGGER_INFO("rpc id: {} name: {} to: {} => " + "body: {{retval: {}, id: {}}}", + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec, out.id); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS);