From 23c76d4a4bbc023884dc743fc7e25409d2f06768 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 16:16:43 +0200 Subject: [PATCH 01/34] Add ADM_QOS_SCOPE_TRANSFER Fixes #28 --- src/common/api/admire_types.h | 3 ++- src/common/api/types.cpp | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index 13b43bfe..ddc47a6e 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -145,7 +145,8 @@ typedef struct adm_pfs_context* ADM_pfs_context_t; typedef enum { ADM_QOS_SCOPE_DATASET, ADM_QOS_SCOPE_NODE, - ADM_QOS_SCOPE_JOB + ADM_QOS_SCOPE_JOB, + ADM_QOS_SCOPE_TRANSFER } ADM_qos_scope_t; /** The class of QoS limit applied to a scope */ diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index c4178299..9646f4c6 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -183,6 +183,8 @@ ADM_qos_entity_create(ADM_qos_scope_t scope, ...) { case ADM_QOS_SCOPE_DATASET: adm_qos_entity->e_dataset = va_arg(ap, ADM_dataset_t); break; + case ADM_QOS_SCOPE_TRANSFER: + adm_qos_entity->e_transfer = va_arg(ap, ADM_transfer_t); } va_end(ap); -- GitLab From 4ed3938c96f8494003ee955bae68791d305278e7 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 16:27:04 +0200 Subject: [PATCH 02/34] Add admire::transfer type to CXX API --- src/common/api/admire_types.hpp | 30 +++++++++++++++++++++ src/common/api/types.cpp | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index ca712bcc..6fbd335b 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -38,6 +38,7 @@ namespace admire { using error_code = ADM_return_t; using job_id = std::uint64_t; +using transfer_id = std::uint64_t; struct server { @@ -78,6 +79,35 @@ private: std::unique_ptr m_pimpl; }; +struct transfer { + + enum class mapping : std::underlying_type::type { + one_to_one = ADM_MAPPING_ONE_TO_ONE, + one_to_n = ADM_MAPPING_ONE_TO_N, + n_to_n = ADM_MAPPING_N_TO_N + }; + + explicit transfer(transfer_id id); + explicit transfer(ADM_transfer_t transfer); + + transfer(const transfer&) noexcept; + transfer(transfer&&) noexcept; + transfer& + operator=(const transfer&) noexcept; + transfer& + operator=(transfer&&) noexcept; + + ~transfer(); + + transfer_id + id() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + + struct dataset { explicit dataset(std::string id); explicit dataset(ADM_dataset_t dataset); diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 9646f4c6..804d944d 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -793,6 +793,54 @@ job::id() const { return m_pimpl->id(); } +class transfer::impl { + +public: + explicit impl(transfer_id id) : m_id(id) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + transfer_id + id() const { + return m_id; + } + +private: + transfer_id m_id; +}; + +transfer::transfer(transfer_id id) + : m_pimpl(std::make_unique(id)) {} + +transfer::transfer(ADM_transfer_t transfer) + : transfer::transfer(transfer->t_id) {} + +transfer::transfer(transfer&&) noexcept = default; + +transfer& +transfer::operator=(transfer&&) noexcept = default; + +transfer::transfer(const transfer& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +transfer& +transfer::operator=(const transfer& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +transfer::~transfer() = default; + +transfer_id +transfer::id() const { + return m_pimpl->id(); +} + class dataset::impl { public: explicit impl(std::string id) : m_id(std::move(id)) {} -- GitLab From dbe7e798418a249cc64bee2f5c1755e68399ef38 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 16:28:18 +0200 Subject: [PATCH 03/34] Add admire::node type to CXX API --- src/common/api/admire_types.hpp | 20 +++++++++++++++++ src/common/api/types.cpp | 39 +++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 6fbd335b..3898ee37 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -59,6 +59,26 @@ private: std::unique_ptr m_pimpl; }; +struct node { + + explicit node(std::string hostname); + explicit node(const ADM_node_t& srv); + node(const node&) noexcept; + node(node&&) noexcept; + node& + operator=(const node&) noexcept; + node& + operator=(node&&) noexcept; + ~node(); + + std::string + hostname() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + struct job { explicit job(job_id id); diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 804d944d..18fbf9cb 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -746,6 +746,45 @@ server::address() const { return m_pimpl->address(); } +class node::impl { + +public: + explicit impl(std::string hostname) : m_hostname(std::move(hostname)) {} + + std::string + hostname() const { + return m_hostname; + } + +private: + std::string m_hostname; +}; + +node::node(std::string hostname) + : m_pimpl(std::make_unique(std::move(hostname))) {} + +node::node(const ADM_node_t& node) : node::node(node->n_hostname) {} + +node::node(const node& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +node::node(node&&) noexcept = default; + +node& +node::operator=(const node& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +node& +node::operator=(node&&) noexcept = default; + +node::~node() = default; + +std::string +node::hostname() const { + return m_pimpl->hostname(); +} class job::impl { -- GitLab From 12abdac0d5557fbbd3bd412cd24187d79895d7d2 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 16:31:15 +0200 Subject: [PATCH 04/34] Add admire::qos::entity type to CXX API --- src/common/api/admire_types.hpp | 42 ++++++++++++++++++++++ src/common/api/types.cpp | 63 +++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 3898ee37..ff75c598 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -127,6 +127,48 @@ private: std::unique_ptr m_pimpl; }; +namespace qos { + +enum class subclass : std::underlying_type::type { + bandwidth = ADM_QOS_CLASS_BANDWIDTH, + iops = ADM_QOS_CLASS_IOPS, +}; + +enum class scope : std::underlying_type::type { + dataset = ADM_QOS_SCOPE_DATASET, + node = ADM_QOS_SCOPE_NODE, + job = ADM_QOS_SCOPE_JOB, + transfer = ADM_QOS_SCOPE_TRANSFER +}; + +struct entity { + + template + entity(admire::qos::scope s, T&& data); + + entity(const entity&) noexcept; + entity(entity&&) noexcept; + entity& + operator=(const entity&) noexcept; + entity& + operator=(entity&&) noexcept; + + ~entity(); + + admire::qos::scope + scope() const; + + template + T + data() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + +} // namespace qos + struct dataset { explicit dataset(std::string id); diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 18fbf9cb..d2e7053b 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include "admire_types.hpp" /******************************************************************************/ @@ -1212,5 +1213,67 @@ job_requirements::storage() const { return m_pimpl->storage(); } +namespace qos { + +class entity::impl { +public: + template + impl(const admire::qos::scope& s, T&& data) : m_scope(s), m_data(data) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + admire::qos::scope + scope() const { + return m_scope; + } + + template + T + data() const { + return std::get(m_data); + } + +private: + admire::qos::scope m_scope; + std::variant m_data; +}; + +template +entity::entity(admire::qos::scope s, T&& data) + : m_pimpl(std::make_unique(s, std::forward(data))) {} + +entity::entity(const entity& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +entity::entity(entity&&) noexcept = default; + +entity& +entity::operator=(const entity& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +entity& +entity::operator=(entity&&) noexcept = default; + +entity::~entity() = default; + +admire::qos::scope +entity::scope() const { + return m_pimpl->scope(); +} + +template +T +entity::data() const { + return m_pimpl->data(); +} + +} // namespace qos } // namespace admire -- GitLab From 663cb25539c77dc45a02d0db0df08b13696fa328 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 16:32:14 +0200 Subject: [PATCH 05/34] Add admire::qos::limit type to CXX API --- src/common/api/admire_types.hpp | 28 +++++++++++++ src/common/api/types.cpp | 69 +++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index ff75c598..2a160116 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -167,6 +167,34 @@ private: std::unique_ptr m_pimpl; }; +struct limit { + + limit(const admire::qos::entity& e, admire::qos::subclass cls, + uint64_t value); + + limit(const limit&) noexcept; + limit(limit&&) noexcept; + limit& + operator=(const limit&) noexcept; + limit& + operator=(limit&&) noexcept; + + ~limit(); + + admire::qos::entity + entity() const; + + admire::qos::subclass + subclass() const; + + uint64_t + value() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + } // namespace qos diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index d2e7053b..7c455a22 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1274,6 +1274,75 @@ entity::data() const { return m_pimpl->data(); } +class limit::impl { + +public: + impl(admire::qos::entity e, admire::qos::subclass cls, uint64_t value) + : m_entity(std::move(e)), m_subclass(cls), m_value(value) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + admire::qos::entity + entity() const { + return m_entity; + } + + admire::qos::subclass + subclass() const { + return m_subclass; + } + + uint64_t + value() const { + return m_value; + } + +private: + admire::qos::entity m_entity; + admire::qos::subclass m_subclass; + uint64_t m_value; +}; + +limit::limit(const admire::qos::entity& e, admire::qos::subclass cls, + uint64_t value) + : m_pimpl(std::make_unique(e, cls, value)) {} + +limit::limit(const limit& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +limit::limit(limit&&) noexcept = default; + +limit& +limit::operator=(const limit& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +limit& +limit::operator=(limit&&) noexcept = default; + +limit::~limit() = default; + +admire::qos::entity +limit::entity() const { + return m_pimpl->entity(); +} + +admire::qos::subclass +limit::subclass() const { + return m_pimpl->subclass(); +} + +uint64_t +limit::value() const { + return m_pimpl->value(); +} + } // namespace qos } // namespace admire -- GitLab From 5f90b915aedd4f8318e9085bee68cff500885099 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 16:32:55 +0200 Subject: [PATCH 06/34] Redefine adm_transfer RPC type --- src/common/net/proto/rpc_types.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 5dc1c159..b1075ffe 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -97,14 +97,13 @@ struct adm_qos_limit { // TODO: encoder/decoder typedef struct adm_transfer { - // TODO: undefined for now - int32_t placeholder; + uint64_t t_id; } adm_transfer; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_transfer, // NOLINT - ((hg_int32_t) (placeholder)) + ((hg_uint64_t) (t_id)) ); // clang-format on -- GitLab From 55644ec69d879120cef0e6012cf7ca4536bde53e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 17:22:17 +0200 Subject: [PATCH 07/34] Add conversion for C array of ADM_dataset_t --- src/common/api/convert.cpp | 13 +++++++++++++ src/common/api/convert.hpp | 3 +++ 2 files changed, 16 insertions(+) diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index a6d05b3b..45048a86 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -66,6 +66,19 @@ convert(const std::vector& datasets) { return managed_ctype_array{std::move(tmp)}; } +std::vector +convert(ADM_dataset_t datasets[], size_t datasets_len) { + + std::vector rv; + rv.reserve(datasets_len); + + for(size_t i = 0; i < datasets_len; ++i) { + rv.emplace_back(datasets[i]); + } + + return rv; +} + managed_ctype convert(const admire::job_requirements& reqs) { diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 8ac73ca1..100f686d 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -48,6 +48,9 @@ convert(const admire::adhoc_storage& st); managed_ctype_array convert(const std::vector& datasets); +std::vector +convert(ADM_dataset_t datasets[], size_t datasets_len); + managed_ctype convert(const admire::job_requirements& reqs); -- GitLab From 905ce57f849ea856d98581a055e4cd1061244150 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 17:24:36 +0200 Subject: [PATCH 08/34] Add ADM_transfer_create|destroy() --- src/common/api/types.cpp | 47 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 7c455a22..29906c63 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -695,6 +695,53 @@ ADM_job_destroy(ADM_job_t job) { return ret; } +/** + * Initialize a transfer handle that can be used by clients to refer to a + * transfer. + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param [in] id The identifier for this transfer + * @return A valid TRANSFER HANDLE or NULL in case of failure. + */ +ADM_transfer_t +ADM_transfer_create(uint64_t id) { + + struct adm_transfer* adm_transfer = + (struct adm_transfer*) malloc(sizeof(struct adm_transfer)); + + if(!adm_transfer) { + LOGGER_ERROR("Could not allocate ADM_transfer_t") + return NULL; + } + + adm_transfer->t_id = id; + + return adm_transfer; +} + +/** + * Destroy a ADM_transfer_t created by ADM_transfer_create(). + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param[in] tx The ADM_transfer_t to destroy. + * @return ADM_SUCCESS or corresponding error code. + */ +ADM_return_t +ADM_transfer_destroy(ADM_transfer_t tx) { + ADM_return_t ret = ADM_SUCCESS; + + if(!tx) { + LOGGER_ERROR("Invalid ADM_transfer_t") + return ADM_EBADARGS; + } + + free(tx); + return ret; +} /******************************************************************************/ /* C++ Type definitions and related functions */ -- GitLab From 1a8d1d0e85ac772487a7d65cea607a391318317f Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 Aug 2022 17:32:31 +0200 Subject: [PATCH 09/34] Add conversion function for admire::transfer --- src/common/api/convert.cpp | 8 ++++++++ src/common/api/convert.hpp | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 45048a86..186347aa 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -29,6 +29,9 @@ ADM_job_t ADM_job_create(uint64_t id); +ADM_transfer_t +ADM_transfer_create(uint64_t id); + namespace admire::api { managed_ctype @@ -110,4 +113,9 @@ convert(ADM_job_t j) { return admire::job{j}; } +managed_ctype +convert(const transfer& tx) { + return managed_ctype(ADM_transfer_create(tx.id())); +} + } // namespace admire::api diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 100f686d..1dda9422 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -60,6 +60,9 @@ convert(const job& j); job convert(ADM_job_t j); +managed_ctype +convert(const transfer& t); + } // namespace admire::api @@ -175,6 +178,9 @@ struct admire::api::managed_ctype { ADM_return_t ADM_job_destroy(ADM_job_t job); +ADM_return_t +ADM_transfer_destroy(ADM_transfer_t tx); + template <> struct admire::api::managed_ctype { @@ -193,4 +199,23 @@ struct admire::api::managed_ctype { scord::utils::ctype_ptr m_job; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_transfer_t tx) : m_transfer(tx) {} + + ADM_transfer_t + get() const { + return m_transfer.get(); + } + + ADM_transfer_t + release() { + return m_transfer.release(); + } + + scord::utils::ctype_ptr m_transfer; +}; + + #endif // SCORD_CONVERT_HPP -- GitLab From ce832455ab9e5388a22cb555431c69889ab4b850 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 09:10:18 +0200 Subject: [PATCH 10/34] Add ADM_qos_limit_list_t plus creation and destruction functions --- src/common/api/admire_types.h | 27 +++++++++++++ src/common/api/types.cpp | 69 ++++++++++++++++++++++++++++++++ src/common/net/proto/rpc_types.h | 4 +- 3 files changed, 98 insertions(+), 2 deletions(-) diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index ddc47a6e..fd8c7282 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -91,6 +91,9 @@ typedef struct adm_dataset_info* ADM_dataset_info_t; /** A list of datasets */ typedef struct adm_dataset_list* ADM_dataset_list_t; +/** A list of QoS limits */ +typedef struct adm_qos_limit_list* ADM_qos_limit_list_t; + /* ----------------------------------------------------- */ /* Storage tiers */ @@ -497,6 +500,30 @@ ADM_qos_limit_create(ADM_qos_entity_t entity, ADM_qos_class_t cls, ADM_return_t ADM_qos_limit_destroy(ADM_qos_limit_t limit); +/** + * Create a list of QoS limits from an array of ADM_QOS_LIMITs and its + * length. + * + * @remark QoS limit lists need to be freed by calling + * ADM_qos_limit_list_destroy(). + * + * @param[in] limits The array of QoS limits. + * @param[in] len The length of the array. + * @return A valid ADM_qos_limit_list_t if successful or NULL in case of + * failure. + */ +ADM_qos_limit_list_t +ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t len); + +/** + * Destroy a QoS limit list created by ADM_qos_limit_list_create(). + * + * @param[in] list A valid ADM_qos_limit_list_t + * @return ADM_SUCCESS or corresponding ADM error code + */ +ADM_return_t +ADM_qos_limit_list_destroy(ADM_qos_limit_list_t list); + /* ----------------------------------------------------- */ /* Data operations */ diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 29906c63..317ee0a9 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -743,6 +743,75 @@ ADM_transfer_destroy(ADM_transfer_t tx) { return ret; } +ADM_qos_limit_list_t +ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t length) { + + ADM_qos_limit_list_t p = (ADM_qos_limit_list_t) malloc(sizeof(*p)); + + if(!p) { + LOGGER_ERROR("Could not allocate ADM_qos_limit_list_t") + return NULL; + } + + const char* error_msg = NULL; + + p->l_length = length; + p->l_limits = (struct adm_qos_limit*) calloc(length, sizeof(adm_qos_limit)); + + if(!p->l_limits) { + error_msg = "Could not allocate ADM_qos_limit_list_t"; + goto cleanup_on_error; + } + + for(size_t i = 0; i < length; ++i) { + memcpy(&p->l_limits[i], limits[i], sizeof(adm_qos_limit)); + } + + return p; + +cleanup_on_error: + if(p->l_limits) { + free(p->l_limits); + } + free(p); + + if(error_msg) { + LOGGER_ERROR(error_msg); + } + + return NULL; +} + +ADM_return_t +ADM_qos_limit_list_destroy(ADM_qos_limit_list_t list) { + + ADM_return_t ret = ADM_SUCCESS; + + if(!list) { + LOGGER_ERROR("Invalid ADM_qos_limit_list_t") + return ADM_EBADARGS; + } + + // We cannot call ADM_qos_limit_destroy here because adm_limits + // are stored as a consecutive array in memory. Thus, we free + // the entities themselves and then the array. + if(list->l_limits) { + for(size_t i = 0; i < list->l_length; ++i) { + + ADM_qos_entity_t entity = list->l_limits[i].l_entity; + + if(entity) { + ADM_qos_entity_destroy(entity); + } + } + free(list->l_limits); + } + + free(list); + return ret; +} + + /******************************************************************************/ /* C++ Type definitions and related functions */ /******************************************************************************/ diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index b1075ffe..3fda297d 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -88,11 +88,11 @@ struct adm_qos_entity { // TODO: encoder/decoder -struct adm_qos_limit { +typedef struct adm_qos_limit { ADM_qos_entity_t l_entity; ADM_qos_class_t l_class; uint64_t l_value; -}; +} adm_qos_limit; // TODO: encoder/decoder -- GitLab From f9bc8ceb886f2df42d7efe2e333469c361388f68 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 09:14:35 +0200 Subject: [PATCH 11/34] Redefine RPC types for ADM_transfer_dataset --- src/common/net/proto/rpc_types.h | 43 +++++++++++++++++++++++++------- src/scord/rpc_handlers.cpp | 23 ++--------------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 3fda297d..ae27de5a 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -76,7 +76,7 @@ MERCURY_GEN_STRUCT_PROC( hg_return_t hg_proc_ADM_job_t(hg_proc_t proc, void* data); -struct adm_qos_entity { +typedef struct adm_qos_entity { ADM_qos_scope_t e_scope; union { ADM_node_t e_node; @@ -84,17 +84,27 @@ struct adm_qos_entity { ADM_dataset_t e_dataset; ADM_transfer_t e_transfer; }; -}; +} adm_qos_entity; // TODO: encoder/decoder +hg_return_t +hg_proc_ADM_qos_entity_t(hg_proc_t proc, void* data); typedef struct adm_qos_limit { ADM_qos_entity_t l_entity; ADM_qos_class_t l_class; - uint64_t l_value; + hg_uint64_t l_value; } adm_qos_limit; // TODO: encoder/decoder +// clang-format off +MERCURY_GEN_STRUCT_PROC( + adm_qos_limit, // NOLINT + ((ADM_qos_entity_t) (l_entity)) + ((ADM_qos_class_t) (l_class)) + ((hg_uint64_t) (l_value)) +) +// clang-format on typedef struct adm_transfer { uint64_t t_id; @@ -377,17 +387,32 @@ MERCURY_GEN_PROC(ADM_in_transit_ops_in_t, ((hg_const_string_t) (in_transit))) MERCURY_GEN_PROC(ADM_in_transit_ops_out_t, ((int32_t) (ret))) +struct adm_qos_limit_list { + /** An array of QoS limits */ + adm_qos_limit* l_limits; + /** The length of the array */ + size_t l_length; +}; + +hg_return_t +hg_proc_ADM_qos_limit_list_t(hg_proc_t proc, void* data); /// ADM_transfer_dataset MERCURY_GEN_PROC( - ADM_transfer_dataset_in_t, - ((hg_const_string_t) (source))((hg_const_string_t) (destination))( - (hg_const_string_t) (qos_constraints))( - (hg_const_string_t) (distribution))((int32_t) (job_id))) + ADM_transfer_dataset_in_t, + ((ADM_job_t) (job_id)) + ((ADM_dataset_list_t) (sources)) + ((ADM_dataset_list_t) (targets)) + ((ADM_qos_limit_list_t) (qos_limits)) + ((hg_int32_t) (distribution)) +) + +MERCURY_GEN_PROC( + ADM_transfer_dataset_out_t, + ((int32_t) (ret)) +((hg_const_string_t) (transfer_handle))) -MERCURY_GEN_PROC(ADM_transfer_dataset_out_t, - ((int32_t) (ret))((hg_const_string_t) (transfer_handle))) /// ADM_set_dataset_information diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index a12d812d..74c787dd 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -980,28 +980,9 @@ ADM_transfer_dataset(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; - out.transfer_handle = "fail"; - - if(in.source == nullptr) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid source (nullptr)"); - } else if(in.destination == nullptr) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid destination (nullptr)"); - } else if(in.qos_constraints == nullptr) { - LOGGER_ERROR( - "ADM_transfer_dataset(): invalid qos_constraints (nullptr)"); - } else if(in.distribution == nullptr) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid distribution (nullptr)"); - } else if(in.job_id < 0) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid job_id (< 0)"); - } else { - LOGGER_INFO("ADM_transfer_dataset({},{},{},{},{})", in.source, - in.destination, in.qos_constraints, in.distribution, - in.job_id); - out.ret = 0; - out.transfer_handle = "ok"; - } + // TODO + out.ret = 0; ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab From 765dc7729409fcbbb21308c535de5266995f76f4 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 09:15:44 +0200 Subject: [PATCH 12/34] Add encoding functions for ADM_qos_*_t types --- src/common/net/proto/rpc_types.c | 397 +++++++++++++++++++++++++++++++ src/common/net/proto/rpc_types.h | 15 +- 2 files changed, 410 insertions(+), 2 deletions(-) diff --git a/src/common/net/proto/rpc_types.c b/src/common/net/proto/rpc_types.c index d23339e3..193787d9 100644 --- a/src/common/net/proto/rpc_types.c +++ b/src/common/net/proto/rpc_types.c @@ -24,6 +24,151 @@ #include "rpc_types.h" +hg_return_t (*hg_proc_ADM_qos_scope_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t; +hg_return_t (*hg_proc_ADM_qos_class_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t; + +hg_return_t +hg_proc_ADM_node_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_node_t* node = (ADM_node_t*) data; + ADM_node_t tmp = NULL; + hg_size_t node_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_node object we need to send + node_length = *node ? sizeof(adm_node) : 0; + ret = hg_proc_hg_size_t(proc, &node_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!node_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_node object, + // write it to the mercury buffer + tmp = *node; + + ret = hg_proc_adm_node(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_node object + ret = hg_proc_hg_size_t(proc, &node_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!node_length) { + *node = NULL; + break; + } + + // if the received adm_node object was not NULL, read each of + // its fields from the mercury buffer + tmp = (adm_node*) calloc(1, sizeof(adm_node)); + + ret = hg_proc_adm_node(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *node = tmp; + break; + + case HG_FREE: + tmp = *node; + free(tmp); + break; + } + + return ret; +} + +hg_return_t +hg_proc_ADM_dataset_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_dataset_t* dataset = (ADM_dataset_t*) data; + ADM_dataset_t tmp = NULL; + hg_size_t dataset_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_dataset object we need to send + dataset_length = *dataset ? sizeof(adm_node) : 0; + ret = hg_proc_hg_size_t(proc, &dataset_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!dataset_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_dataset object, + // write it to the mercury buffer + tmp = *dataset; + + ret = hg_proc_adm_dataset(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_dataset object + ret = hg_proc_hg_size_t(proc, &dataset_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!dataset_length) { + *dataset = NULL; + break; + } + + // if the received adm_dataset object was not NULL, read each of + // its fields from the mercury buffer + tmp = (adm_dataset*) calloc(1, sizeof(adm_dataset)); + + ret = hg_proc_adm_dataset(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *dataset = tmp; + break; + + case HG_FREE: + tmp = *dataset; + free(tmp); + break; + } + + return ret; +} + hg_return_t hg_proc_ADM_job_t(hg_proc_t proc, void* data) { @@ -95,8 +240,80 @@ hg_proc_ADM_job_t(hg_proc_t proc, void* data) { return ret; } +hg_return_t +hg_proc_ADM_transfer_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_transfer_t* transfer = (ADM_transfer_t*) data; + ADM_transfer_t tmp = NULL; + hg_size_t transfer_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_transfer object we need to send + transfer_length = *transfer ? sizeof(adm_transfer) : 0; + ret = hg_proc_hg_size_t(proc, &transfer_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!transfer_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_transfer object, + // write it to the mercury buffer + tmp = *transfer; + + ret = hg_proc_adm_transfer(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_transfer object + ret = hg_proc_hg_size_t(proc, &transfer_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!transfer_length) { + *transfer = NULL; + break; + } + + // if the received adm_transfer object was not NULL, read each of + // its fields from the mercury buffer + tmp = (adm_transfer*) calloc(1, sizeof(adm_transfer)); + + ret = hg_proc_adm_transfer(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *transfer = tmp; + break; + + case HG_FREE: + tmp = *transfer; + free(tmp); + break; + } + + return ret; +} + hg_return_t hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) { + hg_return_t ret = HG_SUCCESS; ADM_dataset_list_t* list = (ADM_dataset_list_t*) data; ADM_dataset_list_t tmp = NULL; @@ -424,3 +641,183 @@ hg_proc_ADM_pfs_context_t(hg_proc_t proc, void* data) { return ret; } + + +hg_return_t +hg_proc_ADM_qos_limit_list_t(hg_proc_t proc, void* data) { + hg_return_t ret = HG_SUCCESS; + ADM_qos_limit_list_t* list = (ADM_qos_limit_list_t*) data; + ADM_qos_limit_list_t tmp = NULL; + + hg_size_t length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + tmp = *list; + // write the length of the list + length = tmp->l_length; + ret = hg_proc_hg_size_t(proc, &tmp->l_length); + + if(ret != HG_SUCCESS) { + break; + } + + // write the list + for(size_t i = 0; i < length; ++i) { + ret = hg_proc_adm_qos_limit(proc, &tmp->l_limits[i]); + + if(ret != HG_SUCCESS) { + break; + } + } + break; + + case HG_DECODE: { + + // find out the length of the list + ret = hg_proc_hg_size_t(proc, &length); + + if(ret != HG_SUCCESS) { + break; + } + + // loop and create list elements + tmp = (ADM_qos_limit_list_t) calloc( + 1, sizeof(struct adm_qos_limit_list)); + tmp->l_length = length; + tmp->l_limits = + (adm_qos_limit*) calloc(length, sizeof(adm_qos_limit)); + + for(size_t i = 0; i < length; ++i) { + ret = hg_proc_adm_qos_limit(proc, &tmp->l_limits[i]); + + if(ret != HG_SUCCESS) { + break; + } + } + + // return the newly-created list + *list = tmp; + + break; + } + + case HG_FREE: + tmp = *list; + free(tmp->l_limits); + free(tmp); + ret = HG_SUCCESS; + break; + } + + return ret; +} + +hg_return_t +hg_proc_ADM_qos_entity_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_qos_entity_t* qos_entity = (ADM_qos_entity_t*) data; + ADM_qos_entity_t tmp = NULL; + hg_size_t qos_entity_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_qos_entity object we need to send + qos_entity_length = *qos_entity ? sizeof(adm_qos_entity) : 0; + ret = hg_proc_hg_size_t(proc, &qos_entity_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!qos_entity_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_qos_entity object, + // write each of its fields to the mercury buffer + + // 1. the QoS scope + ret = hg_proc_ADM_qos_scope_t(proc, &tmp->e_scope); + + if(ret != HG_SUCCESS) { + break; + } + + // 2. the appropriate related data depending on the scope (i.e. an + // ADM_node_t, ADM_job_t, ADM_dataset_t, or ADM_transfer_t) + switch(tmp->e_scope) { + + case ADM_QOS_SCOPE_DATASET: + ret = hg_proc_ADM_dataset_t(proc, &tmp->e_dataset); + break; + case ADM_QOS_SCOPE_NODE: + ret = hg_proc_ADM_node_t(proc, &tmp->e_node); + break; + case ADM_QOS_SCOPE_JOB: + ret = hg_proc_ADM_job_t(proc, &tmp->e_job); + break; + case ADM_QOS_SCOPE_TRANSFER: + ret = hg_proc_ADM_transfer_t(proc, &tmp->e_transfer); + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_qos_entity object + ret = hg_proc_hg_size_t(proc, &qos_entity_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!qos_entity_length) { + *qos_entity = NULL; + break; + } + + // if the received adm_qos_entity object was not NULL, read each + // of its fields from the mercury buffer + tmp = (adm_qos_entity*) calloc(1, sizeof(adm_qos_entity)); + + // 1. the QoS scope + ret = hg_proc_ADM_qos_scope_t(proc, &tmp->e_scope); + + if(ret != HG_SUCCESS) { + break; + } + + // 2. the appropriate related data depending on the scope (i.e. an + // ADM_node_t, ADM_job_t, ADM_dataset_t, or ADM_transfer_t) + switch(tmp->e_scope) { + + case ADM_QOS_SCOPE_DATASET: + ret = hg_proc_ADM_dataset_t(proc, &tmp->e_dataset); + break; + case ADM_QOS_SCOPE_NODE: + ret = hg_proc_ADM_node_t(proc, &tmp->e_node); + break; + case ADM_QOS_SCOPE_JOB: + ret = hg_proc_ADM_job_t(proc, &tmp->e_job); + break; + case ADM_QOS_SCOPE_TRANSFER: + ret = hg_proc_ADM_transfer_t(proc, &tmp->e_transfer); + break; + } + + // return the newly-created entity + *qos_entity = tmp; + break; + + case HG_FREE: + tmp = *qos_entity; + free(tmp); + break; + } + + return ret; +} diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index ae27de5a..bddb7395 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -44,6 +44,9 @@ typedef struct adm_node { const char* n_hostname; } adm_node; +hg_return_t +hg_proc_ADM_node_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_node, // NOLINT @@ -55,6 +58,9 @@ typedef struct adm_dataset { const char* d_id; } adm_dataset; +hg_return_t +hg_proc_ADM_dataset_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_dataset, // NOLINT @@ -86,7 +92,8 @@ typedef struct adm_qos_entity { }; } adm_qos_entity; -// TODO: encoder/decoder +extern hg_return_t (*hg_proc_ADM_qos_scope_t)(hg_proc_t, void*); + hg_return_t hg_proc_ADM_qos_entity_t(hg_proc_t proc, void* data); @@ -96,7 +103,8 @@ typedef struct adm_qos_limit { hg_uint64_t l_value; } adm_qos_limit; -// TODO: encoder/decoder +extern hg_return_t (*hg_proc_ADM_qos_class_t)(hg_proc_t, void*); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_qos_limit, // NOLINT @@ -110,6 +118,9 @@ typedef struct adm_transfer { uint64_t t_id; } adm_transfer; +hg_return_t +hg_proc_ADM_transfer_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_transfer, // NOLINT -- GitLab From 68041f588ebd1e578ca5ae736a2045521f00d8ee Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 13:39:24 +0200 Subject: [PATCH 13/34] ADM_qos_entity_create() now accepts a void* rather than ... --- src/common/api/admire_types.h | 4 ++-- src/common/api/types.cpp | 14 +++++--------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index fd8c7282..0cb5db79 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -460,13 +460,13 @@ ADM_pfs_context_destroy(ADM_pfs_context_t ctx); * * @param[in] scope The scope of the entity, i.e. ADM_QOS_SCOPE_DATASET, * ADM_QOS_SCOPE_NODE, or ADM_QOS_SCOPE_JOB. - * @param[in] ... A single argument with data from either a ADM_dataset_t, + * @param[in] data A single argument with data from either a ADM_dataset_t, * ADM_node_t, or ADM_job_t variable. The argument must correspond properly * to the scope provided. * @return A valid ADM_qos_entity_t if successful or NULL in case of failure. */ ADM_qos_entity_t -ADM_qos_entity_create(ADM_qos_scope_t scope, ...); +ADM_qos_entity_create(ADM_qos_scope_t scope, void* data); /** * Destroy a QoS entity created by ADM_qos_entity_create(). diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 317ee0a9..2f535949 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -159,7 +159,7 @@ ADM_dataset_destroy(ADM_dataset_t dataset) { } ADM_qos_entity_t -ADM_qos_entity_create(ADM_qos_scope_t scope, ...) { +ADM_qos_entity_create(ADM_qos_scope_t scope, void* data) { struct adm_qos_entity* adm_qos_entity = (struct adm_qos_entity*) malloc(sizeof(struct adm_qos_entity)); @@ -171,23 +171,19 @@ ADM_qos_entity_create(ADM_qos_scope_t scope, ...) { adm_qos_entity->e_scope = scope; - va_list ap; - va_start(ap, scope); - switch(scope) { case ADM_QOS_SCOPE_NODE: - adm_qos_entity->e_node = va_arg(ap, ADM_node_t); + adm_qos_entity->e_node = (ADM_node_t) data; break; case ADM_QOS_SCOPE_JOB: - adm_qos_entity->e_job = va_arg(ap, ADM_job_t); + adm_qos_entity->e_job = (ADM_job_t) data; break; case ADM_QOS_SCOPE_DATASET: - adm_qos_entity->e_dataset = va_arg(ap, ADM_dataset_t); + adm_qos_entity->e_dataset = (ADM_dataset_t) data; break; case ADM_QOS_SCOPE_TRANSFER: - adm_qos_entity->e_transfer = va_arg(ap, ADM_transfer_t); + adm_qos_entity->e_transfer = (ADM_transfer_t) data; } - va_end(ap); return adm_qos_entity; } -- GitLab From c7a6b1ead7b04bf07cf2b370fe55bb34cd40a59a Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 13:40:42 +0200 Subject: [PATCH 14/34] Add ADM_qos_limit_destroy_all() --- src/common/api/types.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 2f535949..3fd08fc8 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -234,6 +234,23 @@ ADM_qos_limit_destroy(ADM_qos_limit_t limit) { return ret; } +ADM_return_t +ADM_qos_limit_destroy_all(ADM_qos_limit_t limit) { + ADM_return_t ret = ADM_SUCCESS; + + if(!limit) { + LOGGER_ERROR("Invalid ADM_qos_limit_t") + return ADM_EBADARGS; + } + + if(limit->l_entity) { + ADM_qos_entity_destroy(limit->l_entity); + } + + free(limit); + return ret; +} + ADM_dataset_info_t ADM_dataset_info_create() { -- GitLab From 77beb5cf41a1f1581976504dab07f41358f7e70f Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 13:55:26 +0200 Subject: [PATCH 15/34] Add conversion functions for several API types Conversion functions have been added for: - admire::node - admire::dataset - std::vector - ADM_transfer_t - ADM_qos_limit_list_t - ADM_dataset_list_t Specializations for managed_ctype and managed_ctype_array have been added for: - ADM_node_t - ADM_dataset_t - ADM_qos_limit_t - ADM_transfer_t - ADM_qos_limit_list_t - ADM_dataset_list_t --- src/common/api/admire_types.hpp | 2 + src/common/api/convert.cpp | 148 +++++++++++++++++++++++++++++++- src/common/api/convert.hpp | 131 +++++++++++++++++++++++++++- src/common/api/types.cpp | 33 +++++++ 4 files changed, 309 insertions(+), 5 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 2a160116..04d11eda 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -145,6 +145,7 @@ struct entity { template entity(admire::qos::scope s, T&& data); + explicit entity(ADM_qos_entity_t entity); entity(const entity&) noexcept; entity(entity&&) noexcept; @@ -171,6 +172,7 @@ struct limit { limit(const admire::qos::entity& e, admire::qos::subclass cls, uint64_t value); + explicit limit(ADM_qos_limit_t l); limit(const limit&) noexcept; limit(limit&&) noexcept; diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index 186347aa..aeee7453 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -23,6 +23,7 @@ *****************************************************************************/ #include +#include #include "convert.hpp" // forward declarations @@ -32,8 +33,30 @@ ADM_job_create(uint64_t id); ADM_transfer_t ADM_transfer_create(uint64_t id); +namespace { + +admire::api::managed_ctype_array +as_ctype_array(const std::vector& datasets) { + + std::vector tmp; + + std::transform(datasets.cbegin(), datasets.cend(), std::back_inserter(tmp), + [](const admire::dataset& d) { + return ADM_dataset_create(d.id().c_str()); + }); + + return admire::api::managed_ctype_array{std::move(tmp)}; +} + +} // namespace + namespace admire::api { +managed_ctype +convert(const node& node) { + return managed_ctype(ADM_node_create(node.hostname().c_str())); +} + managed_ctype convert(const adhoc_storage::ctx& ctx) { return managed_ctype{ADM_adhoc_context_create( @@ -56,7 +79,13 @@ convert(const admire::adhoc_storage& st) { return managed_ctype{c_st, std::move(managed_ctx)}; } -managed_ctype_array +managed_ctype +convert(const admire::dataset& dataset) { + return managed_ctype( + ADM_dataset_create(dataset.id().c_str())); +} + +managed_ctype convert(const std::vector& datasets) { std::vector tmp; @@ -66,7 +95,13 @@ convert(const std::vector& datasets) { return ADM_dataset_create(d.id().c_str()); }); - return managed_ctype_array{std::move(tmp)}; + auto rv = managed_ctype{ + ADM_dataset_list_create(tmp.data(), tmp.size())}; + + std::for_each(tmp.cbegin(), tmp.cend(), + [](ADM_dataset_t d) { ADM_dataset_destroy(d); }); + + return rv; } std::vector @@ -82,6 +117,19 @@ convert(ADM_dataset_t datasets[], size_t datasets_len) { return rv; } +std::vector +convert(ADM_dataset_list_t list) { + + std::vector rv; + rv.reserve(list->l_length); + + for(size_t i = 0; i < list->l_length; ++i) { + rv.emplace_back(&list->l_datasets[i]); + } + + return rv; +} + managed_ctype convert(const admire::job_requirements& reqs) { @@ -89,8 +137,8 @@ convert(const admire::job_requirements& reqs) { *std::dynamic_pointer_cast(reqs.storage()); auto managed_storage = convert(adhoc_storage); - auto managed_inputs = convert(reqs.inputs()); - auto managed_outputs = convert(reqs.outputs()); + auto managed_inputs = as_ctype_array(reqs.inputs()); + auto managed_outputs = as_ctype_array(reqs.outputs()); ADM_job_requirements_t c_reqs = ADM_job_requirements_create( managed_inputs.data(), managed_inputs.size(), @@ -118,4 +166,96 @@ convert(const transfer& tx) { return managed_ctype(ADM_transfer_create(tx.id())); } +transfer +convert(ADM_transfer_t tx) { + return transfer{tx}; +} + +managed_ctype +convert(const std::vector& limits) { + + std::vector tmp; + + std::transform( + limits.cbegin(), limits.cend(), std::back_inserter(tmp), + [](const admire::qos::limit& l) { + ADM_qos_entity_t e = nullptr; + + switch(l.entity().scope()) { + case qos::scope::dataset: { + e = ADM_qos_entity_create( + static_cast( + qos::scope::dataset), + convert(l.entity().data()) + .release()); + break; + } + + case qos::scope::node: { + e = ADM_qos_entity_create( + static_cast(qos::scope::node), + convert(l.entity().data()) + .release()); + break; + } + + case qos::scope::job: { + e = ADM_qos_entity_create( + static_cast(qos::scope::job), + convert(l.entity().data()) + .release()); + break; + } + + case qos::scope::transfer: { + e = ADM_qos_entity_create( + static_cast( + qos::scope::transfer), + convert(l.entity().data()) + .release()); + break; + } + } + + return ADM_qos_limit_create( + e, static_cast(l.subclass()), + l.value()); + }); + + auto rv = managed_ctype{ + ADM_qos_limit_list_create(tmp.data(), tmp.size())}; + + std::for_each(tmp.cbegin(), tmp.cend(), + [](ADM_qos_limit_t l) { ADM_qos_limit_destroy_all(l); }); + + return rv; +} + +std::vector +convert(ADM_qos_limit_t limits[], size_t limits_len) { + + std::vector rv; + rv.reserve(limits_len); + + for(size_t i = 0; i < limits_len; ++i) { + rv.emplace_back(limits[i]); + } + + return rv; +} + +std::vector +convert(ADM_qos_limit_list_t list) { + + std::vector rv; + rv.reserve(list->l_length); + + for(size_t i = 0; i < list->l_length; ++i) { + rv.emplace_back(&list->l_limits[i]); + } + + return rv; +} + + } // namespace admire::api diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 1dda9422..2b569791 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -39,18 +39,27 @@ struct managed_ctype_array; // conversion functions between C API and CXX API types +managed_ctype +convert(const node& node); + managed_ctype convert(const adhoc_storage::ctx& ctx); managed_ctype convert(const admire::adhoc_storage& st); -managed_ctype_array +managed_ctype +convert(const admire::dataset& dataset); + +managed_ctype convert(const std::vector& datasets); std::vector convert(ADM_dataset_t datasets[], size_t datasets_len); +std::vector +convert(ADM_dataset_list_t list); + managed_ctype convert(const admire::job_requirements& reqs); @@ -63,6 +72,17 @@ convert(ADM_job_t j); managed_ctype convert(const transfer& t); +transfer +convert(ADM_transfer_t j); + +managed_ctype +convert(const std::vector& limits); + +std::vector +convert(ADM_qos_limit_t limits[], size_t limits_len); + +std::vector +convert(ADM_qos_limit_list_t list); } // namespace admire::api @@ -71,6 +91,24 @@ convert(const transfer& t); // Specializations for conversion types //////////////////////////////////////////////////////////////////////////////// +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_node_t node) : m_node(node) {} + + ADM_node_t + get() const { + return m_node.get(); + } + + ADM_node_t + release() { + return m_node.release(); + } + + scord::utils::ctype_ptr m_node; +}; + template <> struct admire::api::managed_ctype { @@ -111,6 +149,24 @@ struct admire::api::managed_ctype { managed_ctype m_ctx; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_dataset_t dataset) : m_dataset(dataset) {} + + ADM_dataset_t + get() const { + return m_dataset.get(); + } + + ADM_dataset_t + release() { + return m_dataset.release(); + } + + scord::utils::ctype_ptr m_dataset; +}; + template <> struct admire::api::managed_ctype_array { @@ -144,6 +200,25 @@ struct admire::api::managed_ctype_array { m_datasets; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_dataset_list_t list) : m_list(list) {} + + ADM_dataset_list_t + get() const { + return m_list.get(); + } + + ADM_dataset_list_t + release() { + return m_list.release(); + } + + scord::utils::ctype_ptr + m_list; +}; + template <> struct admire::api::managed_ctype { @@ -217,5 +292,59 @@ struct admire::api::managed_ctype { scord::utils::ctype_ptr m_transfer; }; +ADM_return_t +ADM_qos_limit_destroy_all(ADM_qos_limit_t l); + +template <> +struct admire::api::managed_ctype_array { + + explicit managed_ctype_array(ADM_qos_limit_t data[], size_t size) + : m_qos_limits(data, size) {} + + explicit managed_ctype_array(std::vector&& v) + : m_qos_limits(v.data(), v.size()) {} + + constexpr size_t + size() const { + return m_qos_limits.size(); + } + + constexpr const ADM_qos_limit_t* + data() const noexcept { + return m_qos_limits.data(); + } + + constexpr ADM_qos_limit_t* + data() noexcept { + return m_qos_limits.data(); + } + + constexpr ADM_qos_limit_t* + release() noexcept { + return m_qos_limits.release(); + } + + scord::utils::ctype_ptr_vector + m_qos_limits; +}; + +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_qos_limit_list_t list) : m_list(list) {} + + ADM_qos_limit_list_t + get() const { + return m_list.get(); + } + + ADM_qos_limit_list_t + release() { + return m_list.release(); + } + + scord::utils::ctype_ptr + m_list; +}; #endif // SCORD_CONVERT_HPP diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 3fd08fc8..6c34f2b3 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1349,6 +1349,10 @@ public: template impl(const admire::qos::scope& s, T&& data) : m_scope(s), m_data(data) {} + explicit impl(ADM_qos_entity_t entity) + : m_scope(static_cast(entity->e_scope)), + m_data(init_helper(entity)) {} + impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& @@ -1367,6 +1371,25 @@ public: return std::get(m_data); } +private: + static std::variant + init_helper(ADM_qos_entity_t entity) { + switch(entity->e_scope) { + case ADM_QOS_SCOPE_DATASET: + return admire::dataset(entity->e_dataset); + case ADM_QOS_SCOPE_NODE: + return admire::node(entity->e_node); + case ADM_QOS_SCOPE_JOB: + return admire::job(entity->e_job); + case ADM_QOS_SCOPE_TRANSFER: + return admire::transfer(entity->e_transfer); + default: + throw std::runtime_error(fmt::format( + "Unexpected scope value: {}", entity->e_scope)); + } + } + + private: admire::qos::scope m_scope; std::variant m_data; @@ -1376,6 +1399,9 @@ template entity::entity(admire::qos::scope s, T&& data) : m_pimpl(std::make_unique(s, std::forward(data))) {} +entity::entity(ADM_qos_entity_t entity) + : m_pimpl(std::make_unique(entity)) {} + entity::entity(const entity& other) noexcept : m_pimpl(std::make_unique(*other.m_pimpl)) {} @@ -1409,6 +1435,11 @@ public: impl(admire::qos::entity e, admire::qos::subclass cls, uint64_t value) : m_entity(std::move(e)), m_subclass(cls), m_value(value) {} + explicit impl(ADM_qos_limit_t l) + : m_entity(l->l_entity), + m_subclass(static_cast(l->l_class)), + m_value(l->l_value) {} + impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& @@ -1441,6 +1472,8 @@ limit::limit(const admire::qos::entity& e, admire::qos::subclass cls, uint64_t value) : m_pimpl(std::make_unique(e, cls, value)) {} +limit::limit(ADM_qos_limit_t l) : m_pimpl(std::make_unique(l)) {} + limit::limit(const limit& other) noexcept : m_pimpl(std::make_unique(*other.m_pimpl)) {} -- GitLab From acf84f22ae906c35f4f8b741051f658fbe5959ad Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 1 Sep 2022 13:55:49 +0200 Subject: [PATCH 16/34] Provide template specialization for undefined references --- src/common/api/types.cpp | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 6c34f2b3..ba4920d8 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1423,12 +1423,31 @@ entity::scope() const { return m_pimpl->scope(); } -template -T -entity::data() const { - return m_pimpl->data(); +template <> +admire::node +entity::data() const { + return m_pimpl->data(); +} + +template <> +admire::job +entity::data() const { + return m_pimpl->data(); +} + +template <> +admire::dataset +entity::data() const { + return m_pimpl->data(); +} + +template <> +admire::transfer +entity::data() const { + return m_pimpl->data(); } + class limit::impl { public: -- GitLab From 6287b09db90b06d8e5b3aa6080a10efbddc3554e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 11:34:52 +0200 Subject: [PATCH 17/34] Redefine ADM_transfer_dataset_{in|out}_t --- src/common/net/proto/rpc_types.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index bddb7395..ff8f08f8 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -412,17 +412,17 @@ hg_proc_ADM_qos_limit_list_t(hg_proc_t proc, void* data); MERCURY_GEN_PROC( ADM_transfer_dataset_in_t, - ((ADM_job_t) (job_id)) + ((ADM_job_t) (job)) ((ADM_dataset_list_t) (sources)) ((ADM_dataset_list_t) (targets)) ((ADM_qos_limit_list_t) (qos_limits)) - ((hg_int32_t) (distribution)) + ((hg_int32_t) (mapping)) ) MERCURY_GEN_PROC( ADM_transfer_dataset_out_t, - ((int32_t) (ret)) -((hg_const_string_t) (transfer_handle))) + ((hg_int32_t) (retval)) + ((ADM_transfer_t) (tx))) /// ADM_set_dataset_information -- GitLab From 36126af5ee78d4a9639257825a27779b56f6dd3e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 11:37:10 +0200 Subject: [PATCH 18/34] Reimplement ADM_transfer_dataset stubs --- src/lib/admire.cpp | 44 +++++++++++--------------------------- src/lib/admire.h | 16 ++++++++------ src/lib/admire.hpp | 10 +++++---- src/lib/c_wrapper.cpp | 23 ++++++++++++++------ src/lib/detail/impl.cpp | 42 ++++++++++++++++++++++++++++++++++++ src/lib/detail/impl.hpp | 7 ++++++ src/scord/rpc_handlers.cpp | 24 +++++++++++++++++++-- 7 files changed, 116 insertions(+), 50 deletions(-) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index c5b16855..5e1ab7f3 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -400,41 +400,22 @@ remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage) { return ADM_SUCCESS; } -ADM_return_t -transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources, - ADM_dataset_t** targets, ADM_qos_limit_t** limits, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer) { - (void) srv; - (void) job; - (void) sources; - (void) targets; - (void) limits; - (void) mapping; - (void) transfer; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_transfer_dataset(...)"); +admire::transfer +transfer_dataset(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping) { - ADM_transfer_dataset_in_t in{}; - ADM_transfer_dataset_out_t out; + const auto rv = detail::transfer_dataset(srv, job, sources, targets, limits, + mapping); - in.source = "/tmp"; - in.destination = "/tmp"; - in.qos_constraints = "constraints"; - in.distribution = "distribution"; - - const auto rpc = endp.call("ADM_transfer_dataset", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_transfer_dataset() = {}", out.ret); - return static_cast(out.ret); + if(!rv) { + throw std::runtime_error(fmt::format("ADM_transfer_dataset() error: {}", + ADM_strerror(rv.error()))); } - LOGGER_INFO("ADM_transfer_dataset() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + return rv.value(); } ADM_return_t @@ -780,7 +761,6 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job, (void) op; (void) should_stream; (void) args; - (void) transfer; scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; diff --git a/src/lib/admire.h b/src/lib/admire.h index 1404ec4d..f91a1525 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -185,13 +185,16 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage); * * @param[in] server The server to which the request is directed * @param[in] job An ADM_JOB identifying the originating job. - * @param[in] sources A list of DATASETs identifying the source dataset/s + * @param[in] sources An array of DATASETs identifying the source dataset/s * to be transferred. - * @param[in] targets A list of DATASETs identifying the destination + * @param[in] sources_len The number of DATASETs stored in sources. + * @param[in] targets An array of DATASETs identifying the destination * dataset/s and its/their desired locations in a storage tier. - * @param[in] limits A list of QOS_CONSTRAINTS that must be applied to + * @param[in] targets_len The number of DATASETs stored in targets. + * @param[in] limits An array of QOS_CONSTRAINTS that must be applied to * the transfer. These may not exceed the global ones set at node, application, * or resource level. + * @param[in] limits_len The number of QOS_CONSTRAINTS stored in limits. * @param[in] mapping A distribution strategy for the transfers (e.g. * ONE_TO_ONE, ONE_TO_MANY, MANY_TO_MANY) * @param[out] transfer A ADM_TRANSFER allowing clients to interact @@ -202,9 +205,10 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage); */ ADM_return_t ADM_transfer_dataset(ADM_server_t server, ADM_job_t job, - ADM_dataset_t** sources, ADM_dataset_t** targets, - ADM_qos_limit_t** limits, ADM_transfer_mapping_t mapping, - ADM_transfer_t* transfer); + ADM_dataset_t sources[], size_t sources_len, + ADM_dataset_t targets[], size_t targets_len, + ADM_qos_limit_t limits[], size_t limits_len, + ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index a7e5a517..0835f740 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -81,10 +81,12 @@ update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, ADM_return_t remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage); -ADM_return_t -transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources, - ADM_dataset_t** targets, ADM_qos_limit_t** limits, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); +admire::transfer +transfer_dataset(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping); ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index dba51d28..3cd4f854 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -140,14 +140,25 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t pfs_storage) { ADM_return_t ADM_transfer_dataset(ADM_server_t server, ADM_job_t job, - ADM_dataset_t** sources, ADM_dataset_t** targets, - ADM_qos_limit_t** limits, ADM_transfer_mapping_t mapping, - ADM_transfer_t* transfer) { + ADM_dataset_t sources[], size_t sources_len, + ADM_dataset_t targets[], size_t targets_len, + ADM_qos_limit_t limits[], size_t limits_len, + ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer) { + + const auto rv = admire::detail::transfer_dataset( + admire::server{server}, admire::job{job}, + admire::api::convert(sources, sources_len), + admire::api::convert(targets, targets_len), + admire::api::convert(limits, limits_len), + static_cast(mapping)); - const admire::server srv{server}; + if(!rv) { + return rv.error(); + } + + *transfer = admire::api::convert(*rv).release(); - return admire::transfer_dataset(srv, job, sources, targets, limits, mapping, - transfer); + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 803e6252..8ccc3260 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -255,4 +255,46 @@ remove_job(const server& srv, const job& job) { return ADM_SUCCESS; } +tl::expected +transfer_dataset(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("RPC (ADM_{}) => {{job: {{{}}}, sources: {}, targets: {}, " + "limits: {}, mapping: {}}}", + __FUNCTION__, job, sources, targets, limits, mapping); + + const auto rpc_job = api::convert(job); + const auto rpc_sources = api::convert(sources); + const auto rpc_targets = api::convert(targets); + const auto rpc_qos_limits = api::convert(limits); + + ADM_transfer_dataset_in_t in{rpc_job.get(), rpc_sources.get(), + rpc_targets.get(), rpc_qos_limits.get(), + static_cast(mapping)}; + ADM_transfer_dataset_out_t out; + + [[maybe_unused]] const auto rpc = + endp.call("ADM_transfer_dataset", &in, &out); + + if(out.retval < 0) { + LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, + out.retval); + return tl::make_unexpected(static_cast(out.retval)); + } + + const admire::transfer tx = api::convert(out.tx); + + LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {{{}}}}}", + __FUNCTION__, ADM_SUCCESS, tx); + return tx; +} + + } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index a047192b..266dac3a 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -43,6 +43,13 @@ update_job(const server& srv, const job& job, const job_requirements& reqs); admire::error_code remove_job(const server& srv, const job& job); +tl::expected +transfer_dataset(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 74c787dd..bf2d5de1 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -980,9 +980,29 @@ ADM_transfer_dataset(hg_handle_t h) { ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - // TODO + const admire::job job{in.job}; + const std::vector sources = + admire::api::convert(in.sources); + const std::vector targets = + admire::api::convert(in.targets); + const std::vector limits = + admire::api::convert(in.qos_limits); + const auto mapping = static_cast(in.mapping); - out.ret = 0; + const auto id = remote_procedure::new_id(); + LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}, sources: {}, targets: {}, " + "limits: {}, mapping: {}}}", + id, __FUNCTION__, job, sources, targets, limits, mapping); + + admire::error_code rv = ADM_SUCCESS; + + const auto transfer = admire::transfer{42}; + + out.retval = rv; + out.tx = admire::api::convert(transfer).release(); + + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, transfer: {{{}}}}}", id, + __FUNCTION__, rv, transfer); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab From 9ae8a9e393243563a7b288dfd3bcb9eecf1d0a85 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 12:08:07 +0200 Subject: [PATCH 19/34] Add formatting function for admire::qos::limit --- src/common/api/admire_types.hpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 04d11eda..f7e44990 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -649,4 +649,16 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::qos::limit& l, FormatContext& ctx) const { + const auto str = fmt::format("{{entity: {}, subclass: {}, value: {}}}", + l.entity(), l.subclass(), l.value()); + return formatter::format(str, ctx); + } +}; + #endif // SCORD_ADMIRE_TYPES_HPP -- GitLab From a3eafb34df51d8bd75bde195abe26895a16e1152 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 12:08:45 +0200 Subject: [PATCH 20/34] Add formatting function for admire::transfer::mapping --- src/common/api/admire_types.hpp | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index f7e44990..933d5ae2 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -661,4 +661,31 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::transfer::mapping& m, FormatContext& ctx) const { + + using mapping = admire::transfer::mapping; + + std::string_view name = "unknown"; + + switch(m) { + case mapping::one_to_one: + name = "ADM_MAPPING_ONE_TO_ONE"; + break; + case mapping::one_to_n: + name = "ADM_MAPPING_ONE_TO_N"; + break; + case mapping::n_to_n: + name = "ADM_MAPPING_N_TO_N"; + break; + } + + return formatter::format(name, ctx); + } +}; + #endif // SCORD_ADMIRE_TYPES_HPP -- GitLab From 8ec06c9afcef01a65c981c8a1681e9badb445b1e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 12:09:19 +0200 Subject: [PATCH 21/34] Add formatting function for admire::transfer --- src/common/api/admire_types.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 933d5ae2..d7fad23a 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -688,4 +688,15 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::transfer& tx, FormatContext& ctx) const { + const auto str = fmt::format("id: {}", tx.id()); + return formatter::format(str, ctx); + } +}; + #endif // SCORD_ADMIRE_TYPES_HPP -- GitLab From d04bd88c3176b3246708dfe6a2ad170e54f3daae Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 12:30:36 +0200 Subject: [PATCH 22/34] Add formatting function for std::vector --- src/common/api/admire_types.hpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index d7fad23a..e5482fb3 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -699,4 +699,16 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter> + : formatter { + // parse is inherited from formatter. + template + auto + format(const std::vector& v, FormatContext& ctx) const { + const auto str = fmt::format("[{}]", fmt::join(v, ", ")); + return formatter::format(str, ctx); + } +}; + #endif // SCORD_ADMIRE_TYPES_HPP -- GitLab From 107551820dbdfc404f15b012c12051f363082805 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 12:31:31 +0200 Subject: [PATCH 23/34] Add formatting function for std::vector --- src/common/api/admire_types.hpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index e5482fb3..28c42e6d 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -711,4 +711,16 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter> + : formatter { + // parse is inherited from formatter. + template + auto + format(const std::vector& l, FormatContext& ctx) const { + const auto str = fmt::format("[{}]", fmt::join(l, ", ")); + return formatter::format(str, ctx); + } +}; + #endif // SCORD_ADMIRE_TYPES_HPP -- GitLab From a266c39ead89dface6793c2e0e432f242b979724 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 2 Sep 2022 13:25:46 +0200 Subject: [PATCH 24/34] Update tests involving ADM_transfer_dataset RPC --- examples/c/ADM_cancel_transfer.c | 15 ++++-- examples/c/ADM_get_transfer_priority.c | 14 ++++-- .../c/ADM_link_transfer_to_data_operation.c | 16 ++++-- examples/c/ADM_set_transfer_priority.c | 14 ++++-- examples/c/ADM_transfer_dataset.c | 14 ++++-- examples/cxx/ADM_transfer_dataset.cpp | 50 +++++++++++-------- examples/cxx/common.cpp | 13 +++++ examples/cxx/common.hpp | 3 ++ 8 files changed, 93 insertions(+), 46 deletions(-) diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 9669e7a0..aaf8d589 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -68,13 +68,18 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); + if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " "successfully\n"); diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index ca088cf4..2627564f 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -68,13 +68,17 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index cd070ef8..0a6f9546 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -72,13 +72,19 @@ main(int argc, char* argv[]) { ADM_data_operation_t op; const char* path = "/tmpxxxxx"; ADM_define_data_operation(server, job, path, &op); - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); + if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index 5d08b7c8..ed4bef18 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -68,13 +68,17 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); int incr = 42; ret = ADM_set_transfer_priority(server, job, tx, incr); diff --git a/examples/c/ADM_transfer_dataset.c b/examples/c/ADM_transfer_dataset.c index 434878a4..6d9ed99f 100644 --- a/examples/c/ADM_transfer_dataset.c +++ b/examples/c/ADM_transfer_dataset.c @@ -68,13 +68,17 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " diff --git a/examples/cxx/ADM_transfer_dataset.cpp b/examples/cxx/ADM_transfer_dataset.cpp index 1eb3f2c5..a2161d49 100644 --- a/examples/cxx/ADM_transfer_dataset.cpp +++ b/examples/cxx/ADM_transfer_dataset.cpp @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2021, Barcelona Supercomputing Center (BSC), Spain + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). @@ -24,43 +24,51 @@ #include #include +#include "common.hpp" +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 4 int main(int argc, char* argv[]) { if(argc != 2) { - fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "ERROR: no server address provided\n"); fmt::print(stderr, "Usage: ADM_transfer_dataset \n"); exit(EXIT_FAILURE); } admire::server server{"tcp", argv[1]}; - ADM_job_t job{}; - ADM_dataset_t** sources = nullptr; - ADM_dataset_t** targets = nullptr; - ADM_qos_limit_t** limits = nullptr; - ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; - ADM_transfer_t tx{}; - ADM_return_t ret = ADM_SUCCESS; + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); + const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); + const auto qos_limits = prepare_qos_limits(NLIMITS); + const auto mapping = admire::transfer::mapping::n_to_n; + + auto p = std::make_unique( + admire::storage::type::gekkofs, "foobar", + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, 42, 100, false); + + admire::job_requirements reqs(inputs, outputs, std::move(p)); try { - ret = admire::transfer_dataset(server, job, sources, targets, limits, - mapping, &tx); + const auto job = admire::register_job(server, reqs); + const auto transfer = admire::transfer_dataset( + server, job, sources, targets, qos_limits, mapping); + + fmt::print(stdout, "ADM_transfer_dataset() remote procedure completed " + "successfully\n"); + exit(EXIT_SUCCESS); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", e.what()); exit(EXIT_FAILURE); } - - if(ret != ADM_SUCCESS) { - fmt::print(stdout, - "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); - exit(EXIT_FAILURE); - } else { - fmt::print(stdout, "ADM_transfer_dataset() remote procedure completed " - "successfully\n"); - } } diff --git a/examples/cxx/common.cpp b/examples/cxx/common.cpp index 83dc666c..5461521b 100644 --- a/examples/cxx/common.cpp +++ b/examples/cxx/common.cpp @@ -10,3 +10,16 @@ prepare_datasets(const std::string& pattern, size_t n) { return datasets; } + +std::vector +prepare_qos_limits(size_t n) { + + std::vector limits; + limits.reserve(n); + + for(size_t i = 0; i < n; ++i) { + limits.emplace_back(admire::qos::subclass::bandwidth, 50); + } + + return limits; +} diff --git a/examples/cxx/common.hpp b/examples/cxx/common.hpp index b64a32e7..f3618be5 100644 --- a/examples/cxx/common.hpp +++ b/examples/cxx/common.hpp @@ -7,4 +7,7 @@ std::vector prepare_datasets(const std::string& pattern, size_t n); +std::vector +prepare_qos_limits(size_t n); + #endif // SCORD_CXX_EXAMPLES_COMMON_HPP -- GitLab From d3fe6c48e4884e5cdfa25c0116c53b3ca9e3fee5 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Mon, 5 Sep 2022 13:35:20 +0200 Subject: [PATCH 25/34] Fix warning in admire::link_transfer_to_data_operation() --- src/lib/admire.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 5e1ab7f3..69b69667 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -759,6 +759,7 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job, (void) srv; (void) job; (void) op; + (void) transfer; (void) should_stream; (void) args; -- GitLab From 7f248631c3eda5f343ca32f04e6e44c57cff77e6 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 7 Sep 2022 11:48:06 +0200 Subject: [PATCH 26/34] Support optional entities in admire::qos::limit --- src/common/api/admire_types.hpp | 8 ++-- src/common/api/convert.cpp | 68 +++++++++++++++++---------------- src/common/api/types.cpp | 30 +++++++++------ 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 28c42e6d..cce00240 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include "admire_types.h" namespace admire { @@ -170,8 +171,9 @@ private: struct limit { - limit(const admire::qos::entity& e, admire::qos::subclass cls, - uint64_t value); + limit(admire::qos::subclass cls, uint64_t value); + limit(admire::qos::subclass cls, uint64_t value, + const admire::qos::entity& e); explicit limit(ADM_qos_limit_t l); limit(const limit&) noexcept; @@ -183,7 +185,7 @@ struct limit { ~limit(); - admire::qos::entity + std::optional entity() const; admire::qos::subclass diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index aeee7453..591b56e9 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -181,39 +181,41 @@ convert(const std::vector& limits) { [](const admire::qos::limit& l) { ADM_qos_entity_t e = nullptr; - switch(l.entity().scope()) { - case qos::scope::dataset: { - e = ADM_qos_entity_create( - static_cast( - qos::scope::dataset), - convert(l.entity().data()) - .release()); - break; - } - - case qos::scope::node: { - e = ADM_qos_entity_create( - static_cast(qos::scope::node), - convert(l.entity().data()) - .release()); - break; - } - - case qos::scope::job: { - e = ADM_qos_entity_create( - static_cast(qos::scope::job), - convert(l.entity().data()) - .release()); - break; - } - - case qos::scope::transfer: { - e = ADM_qos_entity_create( - static_cast( - qos::scope::transfer), - convert(l.entity().data()) - .release()); - break; + if(l.entity()) { + + switch(const auto s = l.entity()->scope()) { + case qos::scope::dataset: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity()->data()) + .release()); + break; + } + + case qos::scope::node: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity()->data()) + .release()); + break; + } + + case qos::scope::job: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity()->data()) + .release()); + break; + } + + case qos::scope::transfer: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity() + ->data()) + .release()); + break; + } } } diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index ba4920d8..85636c77 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "admire_types.hpp" /******************************************************************************/ @@ -1451,13 +1452,17 @@ entity::data() const { class limit::impl { public: - impl(admire::qos::entity e, admire::qos::subclass cls, uint64_t value) - : m_entity(std::move(e)), m_subclass(cls), m_value(value) {} + impl(admire::qos::subclass cls, uint64_t value, admire::qos::entity e) + : m_subclass(cls), m_value(value), m_entity(std::move(e)) {} + + impl(admire::qos::subclass cls, uint64_t value) + : m_subclass(cls), m_value(value) {} explicit impl(ADM_qos_limit_t l) - : m_entity(l->l_entity), - m_subclass(static_cast(l->l_class)), - m_value(l->l_value) {} + : m_subclass(static_cast(l->l_class)), + m_value(l->l_value), + m_entity(l->l_entity ? std::optional(admire::qos::entity(l->l_entity)) + : std::nullopt) {} impl(const impl& rhs) = default; impl(impl&& rhs) = default; @@ -1466,7 +1471,7 @@ public: impl& operator=(impl&&) noexcept = default; - admire::qos::entity + std::optional entity() const { return m_entity; } @@ -1482,14 +1487,17 @@ public: } private: - admire::qos::entity m_entity; admire::qos::subclass m_subclass; uint64_t m_value; + std::optional m_entity; }; -limit::limit(const admire::qos::entity& e, admire::qos::subclass cls, - uint64_t value) - : m_pimpl(std::make_unique(e, cls, value)) {} +limit::limit(admire::qos::subclass cls, uint64_t value) + : m_pimpl(std::make_unique(cls, value)) {} + +limit::limit(admire::qos::subclass cls, uint64_t value, + const admire::qos::entity& e) + : m_pimpl(std::make_unique(cls, value, e)) {} limit::limit(ADM_qos_limit_t l) : m_pimpl(std::make_unique(l)) {} @@ -1509,7 +1517,7 @@ limit::operator=(limit&&) noexcept = default; limit::~limit() = default; -admire::qos::entity +std::optional limit::entity() const { return m_pimpl->entity(); } -- GitLab From 4e9d7ed8d3583acbc55a42f1bad329c5d64b682a Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 10:23:18 +0200 Subject: [PATCH 27/34] Add formatting function for admire::node --- src/common/api/admire_types.hpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index cce00240..8cde9f08 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -442,6 +442,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::node& n, FormatContext& ctx) const { + const auto str = fmt::format("hostname: {}", std::quoted(n.hostname())); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter : formatter { -- GitLab From 2e58979e213132c506dbe3ec2fe2fe3a2daf5d25 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 11:50:05 +0200 Subject: [PATCH 28/34] Add formatting function for admire::qos::scope type --- src/common/api/admire_types.hpp | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 8cde9f08..428b6ff1 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -662,6 +662,36 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::qos::scope& s, FormatContext& ctx) const { + + using scope = admire::qos::scope; + + std::string_view name = "unknown"; + + switch(s) { + case scope::dataset: + name = "ADM_QOS_SCOPE_DATASET"; + break; + case scope::node: + name = "ADM_QOS_SCOPE_NODE"; + break; + case scope::job: + name = "ADM_QOS_SCOPE_JOB"; + break; + case scope::transfer: + name = "ADM_QOS_SCOPE_TRANSFER"; + break; + } + + return formatter::format(name, ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. -- GitLab From 3fb99054e8b95e03b96ace1411f0933ce0e1e193 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 11:51:18 +0200 Subject: [PATCH 29/34] Add formatting function for admire::qos::entity --- src/common/api/admire_types.hpp | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 428b6ff1..01998868 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -692,6 +692,41 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter> + : formatter { + // parse is inherited from formatter. + template + auto + format(const std::optional& e, + FormatContext& ctx) const { + + if(!e) { + return formatter::format("none", ctx); + } + + std::string_view data = "unknown"; + + switch(e->scope()) { + case admire::qos::scope::dataset: + data = fmt::format("{}", e->data()); + break; + case admire::qos::scope::node: + data = fmt::format("{}", e->data()); + break; + case admire::qos::scope::job: + data = fmt::format("{}", e->data()); + break; + case admire::qos::scope::transfer: + data = fmt::format("{}", e->data()); + break; + } + + const auto str = fmt::format("scope: {}, data: {}", e->scope(), data); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. -- GitLab From 8f07b13a50925ecc33077675f9ae85ac83cab2e3 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 11:51:40 +0200 Subject: [PATCH 30/34] Add formatting function for admire::qos::subclass --- src/common/api/admire_types.hpp | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 01998868..9746f1dd 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -727,6 +727,30 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::qos::subclass& sc, FormatContext& ctx) const { + + using subclass = admire::qos::subclass; + + std::string_view name = "unknown"; + + switch(sc) { + case subclass::bandwidth: + name = "ADM_QOS_CLASS_BANDWIDTH"; + break; + case subclass::iops: + name = "ADM_QOS_CLASS_IOPS"; + break; + } + + return formatter::format(name, ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. -- GitLab From dde43a0767557f4c5485143de629fc07ae9edb77 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 12:06:55 +0200 Subject: [PATCH 31/34] Fix inconsistent formatting --- src/common/api/admire_types.hpp | 18 ++++++++++-------- src/lib/detail/impl.cpp | 15 +++++++-------- src/scord/rpc_handlers.cpp | 18 +++++++++--------- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 9746f1dd..396006a8 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -428,7 +428,7 @@ struct fmt::formatter : formatter { auto format(const admire::job& j, FormatContext& ctx) const { return formatter::format( - fmt::format("id: {}", j.id()), ctx); + fmt::format("{{id: {}}}", j.id()), ctx); } }; @@ -438,7 +438,8 @@ struct fmt::formatter : formatter { template auto format(const admire::dataset& d, FormatContext& ctx) const { - return formatter::format("\"" + d.id() + "\"", ctx); + const auto str = fmt::format("{{id: {}}}", std::quoted(d.id())); + return formatter::format(str, ctx); } }; @@ -448,7 +449,8 @@ struct fmt::formatter : formatter { template auto format(const admire::node& n, FormatContext& ctx) const { - const auto str = fmt::format("hostname: {}", std::quoted(n.hostname())); + const auto str = + fmt::format("{{hostname: {}}}", std::quoted(n.hostname())); return formatter::format(str, ctx); } }; @@ -655,9 +657,8 @@ struct fmt::formatter : formatter { auto format(const admire::job_requirements& r, FormatContext& ctx) const { return formatter::format( - fmt::format("inputs: [{}], outputs: [{}], storage: {}", - fmt::join(r.inputs(), ", "), - fmt::join(r.outputs(), ", "), r.storage()), + fmt::format("{{inputs: {}, outputs: {}, storage: {}}}", + r.inputs(), r.outputs(), r.storage()), ctx); } }; @@ -722,7 +723,8 @@ struct fmt::formatter> break; } - const auto str = fmt::format("scope: {}, data: {}", e->scope(), data); + const auto str = + fmt::format("{{scope: {}, data: {}}}", e->scope(), data); return formatter::format(str, ctx); } }; @@ -796,7 +798,7 @@ struct fmt::formatter : formatter { template auto format(const admire::transfer& tx, FormatContext& ctx) const { - const auto str = fmt::format("id: {}", tx.id()); + const auto str = fmt::format("{{id: {}}}", tx.id()); return formatter::format(str, ctx); } }; diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 8ccc3260..21eb7398 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -177,8 +177,7 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job_requirements: {{{}}}}}", __FUNCTION__, - reqs); + LOGGER_INFO("RPC (ADM_{}) => {{job_requirements: {}}}", __FUNCTION__, reqs); auto rpc_reqs = api::convert(reqs); @@ -194,7 +193,7 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { const admire::job job = api::convert(out.job); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, job: {{{}}}}}", __FUNCTION__, + LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, job: {}}}", __FUNCTION__, ADM_SUCCESS, job.id()); return job; @@ -207,8 +206,8 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC ({}): {{job: {{{}}}, job_requirements: {{{}}}}}", - "ADM_update_job", job, reqs); + LOGGER_INFO("RPC (ADM_{}): {{job: {}, job_requirements: {}}}", __FUNCTION__, + job, reqs); const auto rpc_job = api::convert(job); const auto rpc_reqs = api::convert(reqs); @@ -266,7 +265,7 @@ transfer_dataset(const server& srv, const job& job, auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {{{}}}, sources: {}, targets: {}, " + LOGGER_INFO("RPC (ADM_{}) => {{job: {}, sources: {}, targets: {}, " "limits: {}, mapping: {}}}", __FUNCTION__, job, sources, targets, limits, mapping); @@ -291,8 +290,8 @@ transfer_dataset(const server& srv, const job& job, const admire::transfer tx = api::convert(out.tx); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {{{}}}}}", - __FUNCTION__, ADM_SUCCESS, tx); + LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {}}}", __FUNCTION__, + ADM_SUCCESS, tx); return tx; } diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index bf2d5de1..4ccc7342 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -67,8 +67,8 @@ ADM_register_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job_requirements: {{{}}}}}", id, - __FUNCTION__, reqs); + LOGGER_INFO("RPC ID {} ({}) <= {{job_requirements: {}}}", id, __FUNCTION__, + reqs); const auto job = admire::job{42}; @@ -77,8 +77,8 @@ ADM_register_job(hg_handle_t h) { out.retval = rv; out.job = admire::api::convert(job).release(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, job: {{{}}}}}", id, - __FUNCTION__, rv, job); + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, job: {}}}", id, __FUNCTION__, + rv, job); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -110,8 +110,8 @@ ADM_update_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}, job_requirements: {{{}}}}}", - id, __FUNCTION__, job, reqs); + LOGGER_INFO("RPC ID {} ({}) <= {{job: {}, job_requirements: {}}}", id, + __FUNCTION__, job, reqs); admire::error_code rv = ADM_SUCCESS; out.retval = rv; @@ -147,7 +147,7 @@ ADM_remove_job(hg_handle_t h) { const admire::job job(in.job); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}", id, __FUNCTION__, job); + LOGGER_INFO("RPC ID {} ({}) <= {{job: {}}}", id, __FUNCTION__, job); admire::error_code rv = ADM_SUCCESS; out.retval = rv; @@ -990,7 +990,7 @@ ADM_transfer_dataset(hg_handle_t h) { const auto mapping = static_cast(in.mapping); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}, sources: {}, targets: {}, " + LOGGER_INFO("RPC ID {} ({}) <= {{job: {}, sources: {}, targets: {}, " "limits: {}, mapping: {}}}", id, __FUNCTION__, job, sources, targets, limits, mapping); @@ -1001,7 +1001,7 @@ ADM_transfer_dataset(hg_handle_t h) { out.retval = rv; out.tx = admire::api::convert(transfer).release(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, transfer: {{{}}}}}", id, + LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, transfer: {}}}", id, __FUNCTION__, rv, transfer); ret = margo_respond(h, &out); -- GitLab From ea6c0382e3dbbbb5d8b4c08233825905c7b6f161 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 12:09:37 +0200 Subject: [PATCH 32/34] Minor reformat to log messages --- src/lib/detail/impl.cpp | 22 +++++++++++----------- src/scord/rpc_handlers.cpp | 16 ++++++++-------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 21eb7398..2822d07b 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -177,7 +177,7 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job_requirements: {}}}", __FUNCTION__, reqs); + LOGGER_INFO("RPC (ADM_{}) <= {{job_requirements: {}}}", __FUNCTION__, reqs); auto rpc_reqs = api::convert(reqs); @@ -187,13 +187,13 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { const auto rpc = endp.call("ADM_register_job", &in, &out); if(out.retval < 0) { - LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval); + LOGGER_ERROR("RPC (ADM_{}) => {}", __FUNCTION__, out.retval); return tl::make_unexpected(static_cast(out.retval)); } const admire::job job = api::convert(out.job); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, job: {}}}", __FUNCTION__, + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}, job: {}}}", __FUNCTION__, ADM_SUCCESS, job.id()); return job; @@ -220,11 +220,11 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { if(out.retval < 0) { const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, retval); return retval; } - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); return ADM_SUCCESS; } @@ -235,7 +235,7 @@ remove_job(const server& srv, const job& job) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job); + LOGGER_INFO("RPC (ADM_{}) <= {{job: {}}}", __FUNCTION__, job); const auto rpc_job = api::convert(job); @@ -246,11 +246,11 @@ remove_job(const server& srv, const job& job) { if(out.retval < 0) { const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, retval); return retval; } - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); return ADM_SUCCESS; } @@ -265,7 +265,7 @@ transfer_dataset(const server& srv, const job& job, auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}, sources: {}, targets: {}, " + LOGGER_INFO("RPC (ADM_{}) <= {{job: {}, sources: {}, targets: {}, " "limits: {}, mapping: {}}}", __FUNCTION__, job, sources, targets, limits, mapping); @@ -283,14 +283,14 @@ transfer_dataset(const server& srv, const job& job, endp.call("ADM_transfer_dataset", &in, &out); if(out.retval < 0) { - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, + LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, out.retval); return tl::make_unexpected(static_cast(out.retval)); } const admire::transfer tx = api::convert(out.tx); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {}}}", __FUNCTION__, + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}, transfer: {}}}", __FUNCTION__, ADM_SUCCESS, tx); return tx; } diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 4ccc7342..9e8b9a00 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -67,7 +67,7 @@ ADM_register_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job_requirements: {}}}", id, __FUNCTION__, + LOGGER_INFO("RPC ID {} ({}) => {{job_requirements: {}}}", id, __FUNCTION__, reqs); const auto job = admire::job{42}; @@ -77,7 +77,7 @@ ADM_register_job(hg_handle_t h) { out.retval = rv; out.job = admire::api::convert(job).release(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, job: {}}}", id, __FUNCTION__, + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, job: {}}}", id, __FUNCTION__, rv, job); ret = margo_respond(h, &out); @@ -110,13 +110,13 @@ ADM_update_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {}, job_requirements: {}}}", id, + LOGGER_INFO("RPC ID {} ({}) => {{job: {}, job_requirements: {}}}", id, __FUNCTION__, job, reqs); admire::error_code rv = ADM_SUCCESS; out.retval = rv; - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}}}", id, __FUNCTION__, rv); + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -147,12 +147,12 @@ ADM_remove_job(hg_handle_t h) { const admire::job job(in.job); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {}}}", id, __FUNCTION__, job); + LOGGER_INFO("RPC ID {} ({}) => {{job: {}}}", id, __FUNCTION__, job); admire::error_code rv = ADM_SUCCESS; out.retval = rv; - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}}}", id, __FUNCTION__, rv); + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -990,7 +990,7 @@ ADM_transfer_dataset(hg_handle_t h) { const auto mapping = static_cast(in.mapping); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {}, sources: {}, targets: {}, " + LOGGER_INFO("RPC ID {} ({}) => {{job: {}, sources: {}, targets: {}, " "limits: {}, mapping: {}}}", id, __FUNCTION__, job, sources, targets, limits, mapping); @@ -1001,7 +1001,7 @@ ADM_transfer_dataset(hg_handle_t h) { out.retval = rv; out.tx = admire::api::convert(transfer).release(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, transfer: {}}}", id, + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, transfer: {}}}", id, __FUNCTION__, rv, transfer); ret = margo_respond(h, &out); -- GitLab From fe27ccbbfd47d4805dd132969f9bcf21387283bb Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 12:25:13 +0200 Subject: [PATCH 33/34] Rename API function ADM_transfer_dataset to ADM_transfer_datasets --- examples/c/ADM_cancel_transfer.c | 8 +++---- examples/c/ADM_get_transfer_priority.c | 8 +++---- .../c/ADM_link_transfer_to_data_operation.c | 8 +++---- examples/c/ADM_set_transfer_priority.c | 11 +++++++-- ...sfer_dataset.c => ADM_transfer_datasets.c} | 12 +++++----- examples/c/CMakeLists.txt | 2 +- ..._dataset.cpp => ADM_transfer_datasets.cpp} | 8 +++---- examples/cxx/CMakeLists.txt | 2 +- src/common/net/proto/rpc_types.h | 6 ++--- src/lib/admire.cpp | 22 ++++++++--------- src/lib/admire.h | 10 ++++---- src/lib/admire.hpp | 10 ++++---- src/lib/c_wrapper.cpp | 15 ++++++------ src/lib/detail/impl.cpp | 24 +++++++++---------- src/lib/detail/impl.hpp | 10 ++++---- src/scord/rpc_handlers.cpp | 8 +++---- src/scord/rpc_handlers.hpp | 4 ++-- src/scord/scord.cpp | 6 ++--- 18 files changed, 91 insertions(+), 83 deletions(-) rename examples/c/{ADM_transfer_dataset.c => ADM_transfer_datasets.c} (86%) rename examples/cxx/{ADM_transfer_dataset.cpp => ADM_transfer_datasets.cpp} (89%) diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index aaf8d589..8feb1a29 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -77,12 +77,12 @@ main(int argc, char* argv[]) { ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index 2627564f..8529a2f3 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -77,12 +77,12 @@ main(int argc, char* argv[]) { ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index 0a6f9546..476cf605 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -82,13 +82,13 @@ main(int argc, char* argv[]) { ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index ed4bef18..bdd45e70 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -77,8 +77,15 @@ main(int argc, char* argv[]) { ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); + exit_status = EXIT_FAILURE; + goto cleanup; + } int incr = 42; ret = ADM_set_transfer_priority(server, job, tx, incr); diff --git a/examples/c/ADM_transfer_dataset.c b/examples/c/ADM_transfer_datasets.c similarity index 86% rename from examples/c/ADM_transfer_dataset.c rename to examples/c/ADM_transfer_datasets.c index 6d9ed99f..eeb0b8ec 100644 --- a/examples/c/ADM_transfer_dataset.c +++ b/examples/c/ADM_transfer_datasets.c @@ -36,7 +36,7 @@ main(int argc, char* argv[]) { if(argc != 2) { fprintf(stderr, "ERROR: no location provided\n"); - fprintf(stderr, "Usage: ADM_transfer_dataset \n"); + fprintf(stderr, "Usage: ADM_transfer_datasets \n"); exit(EXIT_FAILURE); } @@ -77,17 +77,17 @@ main(int argc, char* argv[]) { ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_transfer_dataset() remote procedure completed " + fprintf(stdout, "ADM_transfer_datasets() remote procedure completed " "successfully\n"); cleanup: diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 61068260..0c78bbf5 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -33,7 +33,7 @@ list(APPEND examples_c # pfs storage ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage # transfers - ADM_transfer_dataset ADM_get_transfer_priority ADM_set_transfer_priority + ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers # qos ADM_set_qos_constraints ADM_get_qos_constraints diff --git a/examples/cxx/ADM_transfer_dataset.cpp b/examples/cxx/ADM_transfer_datasets.cpp similarity index 89% rename from examples/cxx/ADM_transfer_dataset.cpp rename to examples/cxx/ADM_transfer_datasets.cpp index a2161d49..7d39e932 100644 --- a/examples/cxx/ADM_transfer_dataset.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -37,7 +37,7 @@ main(int argc, char* argv[]) { if(argc != 2) { fmt::print(stderr, "ERROR: no server address provided\n"); - fmt::print(stderr, "Usage: ADM_transfer_dataset \n"); + fmt::print(stderr, "Usage: ADM_transfer_datasets \n"); exit(EXIT_FAILURE); } @@ -60,14 +60,14 @@ main(int argc, char* argv[]) { try { const auto job = admire::register_job(server, reqs); - const auto transfer = admire::transfer_dataset( + const auto transfer = admire::transfer_datasets( server, job, sources, targets, qos_limits, mapping); - fmt::print(stdout, "ADM_transfer_dataset() remote procedure completed " + fmt::print(stdout, "ADM_transfer_datasets() remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); } catch(const std::exception& e) { - fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + fmt::print(stderr, "FATAL: ADM_transfer_datasets() failed: {}\n", e.what()); exit(EXIT_FAILURE); } diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 18bcaa91..86a81f25 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -33,7 +33,7 @@ list(APPEND examples_cxx # pfs storage ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage # transfers - ADM_transfer_dataset ADM_get_transfer_priority ADM_set_transfer_priority + ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers # qos ADM_set_qos_constraints ADM_get_qos_constraints diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index ff8f08f8..d07727d0 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -408,10 +408,10 @@ struct adm_qos_limit_list { hg_return_t hg_proc_ADM_qos_limit_list_t(hg_proc_t proc, void* data); -/// ADM_transfer_dataset +/// ADM_transfer_datasets MERCURY_GEN_PROC( - ADM_transfer_dataset_in_t, + ADM_transfer_datasets_in_t, ((ADM_job_t) (job)) ((ADM_dataset_list_t) (sources)) ((ADM_dataset_list_t) (targets)) @@ -420,7 +420,7 @@ MERCURY_GEN_PROC( ) MERCURY_GEN_PROC( - ADM_transfer_dataset_out_t, + ADM_transfer_datasets_out_t, ((hg_int32_t) (retval)) ((ADM_transfer_t) (tx))) diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 69b69667..2b3757aa 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -122,8 +122,8 @@ rpc_registration_cb(scord::network::rpc_client* client) { REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, ADM_in_transit_ops_out_t, NULL, true); - REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, NULL, true); + REGISTER_RPC(client, "ADM_transfer_datasets", ADM_transfer_datasets_in_t, + ADM_transfer_datasets_out_t, NULL, true); REGISTER_RPC(client, "ADM_set_dataset_information", ADM_set_dataset_information_in_t, @@ -401,18 +401,18 @@ remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage) { } admire::transfer -transfer_dataset(const server& srv, const job& job, - const std::vector& sources, - const std::vector& targets, - const std::vector& limits, - transfer::mapping mapping) { +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping) { - const auto rv = detail::transfer_dataset(srv, job, sources, targets, limits, - mapping); + const auto rv = detail::transfer_datasets(srv, job, sources, targets, + limits, mapping); if(!rv) { - throw std::runtime_error(fmt::format("ADM_transfer_dataset() error: {}", - ADM_strerror(rv.error()))); + throw std::runtime_error(fmt::format( + "ADM_transfer_datasets() error: {}", ADM_strerror(rv.error()))); } return rv.value(); diff --git a/src/lib/admire.h b/src/lib/admire.h index f91a1525..ea9008d7 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -204,11 +204,11 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage); * successfully or not. */ ADM_return_t -ADM_transfer_dataset(ADM_server_t server, ADM_job_t job, - ADM_dataset_t sources[], size_t sources_len, - ADM_dataset_t targets[], size_t targets_len, - ADM_qos_limit_t limits[], size_t limits_len, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); +ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, + ADM_dataset_t sources[], size_t sources_len, + ADM_dataset_t targets[], size_t targets_len, + ADM_qos_limit_t limits[], size_t limits_len, + ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 0835f740..52ffb122 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -82,11 +82,11 @@ ADM_return_t remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage); admire::transfer -transfer_dataset(const server& srv, const job& job, - const std::vector& sources, - const std::vector& targets, - const std::vector& limits, - transfer::mapping mapping); +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping); ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 3cd4f854..cd58fbc4 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -139,13 +139,14 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t pfs_storage) { } ADM_return_t -ADM_transfer_dataset(ADM_server_t server, ADM_job_t job, - ADM_dataset_t sources[], size_t sources_len, - ADM_dataset_t targets[], size_t targets_len, - ADM_qos_limit_t limits[], size_t limits_len, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer) { - - const auto rv = admire::detail::transfer_dataset( +ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, + ADM_dataset_t sources[], size_t sources_len, + ADM_dataset_t targets[], size_t targets_len, + ADM_qos_limit_t limits[], size_t limits_len, + ADM_transfer_mapping_t mapping, + ADM_transfer_t* transfer) { + + const auto rv = admire::detail::transfer_datasets( admire::server{server}, admire::job{job}, admire::api::convert(sources, sources_len), admire::api::convert(targets, targets_len), diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 2822d07b..2130ed15 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -101,8 +101,8 @@ rpc_registration_cb(scord::network::rpc_client* client) { REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, ADM_in_transit_ops_out_t, NULL, true); - REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, NULL, true); + REGISTER_RPC(client, "ADM_transfer_datasets", ADM_transfer_datasets_in_t, + ADM_transfer_datasets_out_t, NULL, true); REGISTER_RPC(client, "ADM_set_dataset_information", ADM_set_dataset_information_in_t, @@ -255,11 +255,11 @@ remove_job(const server& srv, const job& job) { } tl::expected -transfer_dataset(const server& srv, const job& job, - const std::vector& sources, - const std::vector& targets, - const std::vector& limits, - transfer::mapping mapping) { +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -274,13 +274,13 @@ transfer_dataset(const server& srv, const job& job, const auto rpc_targets = api::convert(targets); const auto rpc_qos_limits = api::convert(limits); - ADM_transfer_dataset_in_t in{rpc_job.get(), rpc_sources.get(), - rpc_targets.get(), rpc_qos_limits.get(), - static_cast(mapping)}; - ADM_transfer_dataset_out_t out; + ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(), + rpc_targets.get(), rpc_qos_limits.get(), + static_cast(mapping)}; + ADM_transfer_datasets_out_t out; [[maybe_unused]] const auto rpc = - endp.call("ADM_transfer_dataset", &in, &out); + endp.call("ADM_transfer_datasets", &in, &out); if(out.retval < 0) { LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 266dac3a..bf524d89 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -44,11 +44,11 @@ admire::error_code remove_job(const server& srv, const job& job); tl::expected -transfer_dataset(const server& srv, const job& job, - const std::vector& sources, - const std::vector& targets, - const std::vector& limits, - transfer::mapping mapping); +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping); } // namespace admire::detail diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 9e8b9a00..e38f7b84 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -968,12 +968,12 @@ DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops) * successfully or not. */ static void -ADM_transfer_dataset(hg_handle_t h) { +ADM_transfer_datasets(hg_handle_t h) { [[maybe_unused]] hg_return_t ret; - ADM_transfer_dataset_in_t in; - ADM_transfer_dataset_out_t out; + ADM_transfer_datasets_in_t in; + ADM_transfer_datasets_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); @@ -1014,7 +1014,7 @@ ADM_transfer_dataset(hg_handle_t h) { assert(ret == HG_SUCCESS); } -DEFINE_MARGO_RPC_HANDLER(ADM_transfer_dataset) +DEFINE_MARGO_RPC_HANDLER(ADM_transfer_datasets) /** * Sets information for the dataset identified by resource_id. diff --git a/src/scord/rpc_handlers.hpp b/src/scord/rpc_handlers.hpp index ee63174e..85dbc1d7 100644 --- a/src/scord/rpc_handlers.hpp +++ b/src/scord/rpc_handlers.hpp @@ -106,8 +106,8 @@ DECLARE_MARGO_RPC_HANDLER(ADM_in_situ_ops); DECLARE_MARGO_RPC_HANDLER(ADM_in_transit_ops); -/// ADM_transfer_dataset -DECLARE_MARGO_RPC_HANDLER(ADM_transfer_dataset); +/// ADM_transfer_datasets +DECLARE_MARGO_RPC_HANDLER(ADM_transfer_datasets); /// ADM_set_dataset_information DECLARE_MARGO_RPC_HANDLER(ADM_set_dataset_information); diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp index 44e8f7ac..3710382f 100644 --- a/src/scord/scord.cpp +++ b/src/scord/scord.cpp @@ -237,9 +237,9 @@ main(int argc, char* argv[]) { REGISTER_RPC(ctx, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); - REGISTER_RPC(ctx, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, ADM_transfer_dataset, - true); + REGISTER_RPC( + ctx, "ADM_transfer_datasets", ADM_transfer_datasets_in_t, + ADM_transfer_datasets_out_t, ADM_transfer_datasets, true); REGISTER_RPC(ctx, "ADM_set_dataset_information", ADM_set_dataset_information_in_t, -- GitLab From 199fe4614cdc340337e111f0cce5d1eb5e85ee21 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 13 Sep 2022 13:09:01 +0200 Subject: [PATCH 34/34] Fix ADM_transfer_datasets C example --- examples/c/ADM_transfer_datasets.c | 19 +++++++++------- examples/c/common.c | 36 +++++++++++++++++++++++++++++- examples/c/common.h | 6 +++++ 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index eeb0b8ec..cd5ad062 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -30,6 +30,9 @@ #define NINPUTS 10 #define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 3 int main(int argc, char* argv[]) { @@ -68,17 +71,17 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t* sources = NULL; - size_t sources_len = 0; - ADM_dataset_t* targets = NULL; - size_t targets_len = 0; - ADM_qos_limit_t* limits = NULL; - size_t limits_len = 0; + ADM_dataset_t* sources = prepare_datasets("source-dataset-%d", NSOURCES); + assert(sources); + ADM_dataset_t* targets = prepare_datasets("target-dataset-%d", NTARGETS); + assert(targets); + ADM_qos_limit_t* limits = prepare_qos_limits(NLIMITS); + assert(limits); ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, - targets_len, limits, limits_len, mapping, &tx); + ret = ADM_transfer_datasets(server, job, sources, NSOURCES, targets, + NTARGETS, limits, NLIMITS, mapping, &tx); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_transfer_datasets() remote procedure not " diff --git a/examples/c/common.c b/examples/c/common.c index 1247ff2f..1e47a170 100644 --- a/examples/c/common.c +++ b/examples/c/common.c @@ -15,11 +15,13 @@ prepare_datasets(const char* pattern, size_t n) { } for(size_t i = 0; i < n; ++i) { - // const char* pattern = "input-dataset-%d"; size_t len = snprintf(NULL, 0, pattern, i); char* id = (char*) alloca(len + 1); snprintf(id, len + 1, pattern, i); datasets[i] = ADM_dataset_create(id); + if(!datasets[i]) { + return NULL; + } } return datasets; @@ -37,4 +39,36 @@ destroy_datasets(ADM_dataset_t datasets[], size_t n) { free(datasets); } +ADM_qos_limit_t* +prepare_qos_limits(size_t n) { + + ADM_qos_limit_t* limits = calloc(n, sizeof(ADM_qos_limit_t)); + + if(!limits) { + return NULL; + } + + for(size_t i = 0; i < n; ++i) { + limits[i] = ADM_qos_limit_create(NULL, ADM_QOS_CLASS_BANDWIDTH, 50); + if(!limits[i]) { + return NULL; + } + } + + return limits; +} + +void +destroy_qos_limits(ADM_qos_limit_t* limits, size_t n) { + + for(size_t i = 0; i < n; ++i) { + if(limits[i]) { + ADM_qos_limit_destroy(limits[i]); + } + } + + free(limits); +} + + #endif // SCORD_COMMON_H diff --git a/examples/c/common.h b/examples/c/common.h index 3d333353..16b383dc 100644 --- a/examples/c/common.h +++ b/examples/c/common.h @@ -9,4 +9,10 @@ prepare_datasets(const char* pattern, size_t n); void destroy_datasets(ADM_dataset_t datasets[], size_t n); +ADM_qos_limit_t* +prepare_qos_limits(size_t n); + +void +destroy_qos_limits(ADM_qos_limit_t limits[], size_t n); + #endif // SCORD_C_EXAMPLES_COMMON_H -- GitLab