diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 9669e7a02ee6710825a3a27cf929e33fef757a85..8feb1a2953bbbd6047267f654751d667fb5a9712 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -68,16 +68,21 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); + if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index ca088cf44bd0cc2d5c19f623811595f2c08f6938..8529a2f32f7834f2c40ac7e5ead997247d2540f8 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -68,17 +68,21 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index cd070ef8261ad20f2109d95e34a1a505d13b67c5..476cf605275291ccbc4b571309f7615c5cad11bb 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -72,17 +72,23 @@ main(int argc, char* argv[]) { ADM_data_operation_t op; const char* path = "/tmpxxxxx"; ADM_define_data_operation(server, job, path, &op); - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); + if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index 5d08b7c80ae8efb8b90c503866fb8954ce762962..bdd45e70467f07713b0af3d2047415431cd5b554 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -68,13 +68,24 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = NULL; + size_t sources_len = 0; + ADM_dataset_t* targets = NULL; + size_t targets_len = 0; + ADM_qos_limit_t* limits = NULL; + size_t limits_len = 0; ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_datasets(server, job, sources, sources_len, targets, + targets_len, limits, limits_len, mapping, &tx); + + if(ret != ADM_SUCCESS) { + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); + exit_status = EXIT_FAILURE; + goto cleanup; + } int incr = 42; ret = ADM_set_transfer_priority(server, job, tx, incr); diff --git a/examples/c/ADM_transfer_dataset.c b/examples/c/ADM_transfer_datasets.c similarity index 77% rename from examples/c/ADM_transfer_dataset.c rename to examples/c/ADM_transfer_datasets.c index 434878a4b616983141b7dff989715dfc190f1635..cd5ad062cd935f8043f042be80ef376711abc631 100644 --- a/examples/c/ADM_transfer_dataset.c +++ b/examples/c/ADM_transfer_datasets.c @@ -30,13 +30,16 @@ #define NINPUTS 10 #define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 3 int main(int argc, char* argv[]) { if(argc != 2) { fprintf(stderr, "ERROR: no location provided\n"); - fprintf(stderr, "Usage: ADM_transfer_dataset \n"); + fprintf(stderr, "Usage: ADM_transfer_datasets \n"); exit(EXIT_FAILURE); } @@ -68,22 +71,26 @@ main(int argc, char* argv[]) { exit_status = EXIT_FAILURE; } - ADM_dataset_t** sources = NULL; - ADM_dataset_t** targets = NULL; - ADM_qos_limit_t** limits = NULL; + ADM_dataset_t* sources = prepare_datasets("source-dataset-%d", NSOURCES); + assert(sources); + ADM_dataset_t* targets = prepare_datasets("target-dataset-%d", NTARGETS); + assert(targets); + ADM_qos_limit_t* limits = prepare_qos_limits(NLIMITS); + assert(limits); ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; ADM_transfer_t tx; - ret = ADM_transfer_dataset(server, job, sources, targets, limits, mapping, - &tx); + + ret = ADM_transfer_datasets(server, job, sources, NSOURCES, targets, + NTARGETS, limits, NLIMITS, mapping, &tx); if(ret != ADM_SUCCESS) { - fprintf(stdout, "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); + fprintf(stdout, "ADM_transfer_datasets() remote procedure not " + "completed successfully\n"); exit_status = EXIT_FAILURE; goto cleanup; } - fprintf(stdout, "ADM_transfer_dataset() remote procedure completed " + fprintf(stdout, "ADM_transfer_datasets() remote procedure completed " "successfully\n"); cleanup: diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 6106826097f9e79cd1ce6fc5d8f99df9ba888c54..0c78bbf59f47e3ff6ac6cc70ccbc854f0fa159e2 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -33,7 +33,7 @@ list(APPEND examples_c # pfs storage ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage # transfers - ADM_transfer_dataset ADM_get_transfer_priority ADM_set_transfer_priority + ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers # qos ADM_set_qos_constraints ADM_get_qos_constraints diff --git a/examples/c/common.c b/examples/c/common.c index 1247ff2fac9493c347f6eeb5bb0790338348d16b..1e47a17019c6d948aab8af4ff50c8a7e0a3c82c0 100644 --- a/examples/c/common.c +++ b/examples/c/common.c @@ -15,11 +15,13 @@ prepare_datasets(const char* pattern, size_t n) { } for(size_t i = 0; i < n; ++i) { - // const char* pattern = "input-dataset-%d"; size_t len = snprintf(NULL, 0, pattern, i); char* id = (char*) alloca(len + 1); snprintf(id, len + 1, pattern, i); datasets[i] = ADM_dataset_create(id); + if(!datasets[i]) { + return NULL; + } } return datasets; @@ -37,4 +39,36 @@ destroy_datasets(ADM_dataset_t datasets[], size_t n) { free(datasets); } +ADM_qos_limit_t* +prepare_qos_limits(size_t n) { + + ADM_qos_limit_t* limits = calloc(n, sizeof(ADM_qos_limit_t)); + + if(!limits) { + return NULL; + } + + for(size_t i = 0; i < n; ++i) { + limits[i] = ADM_qos_limit_create(NULL, ADM_QOS_CLASS_BANDWIDTH, 50); + if(!limits[i]) { + return NULL; + } + } + + return limits; +} + +void +destroy_qos_limits(ADM_qos_limit_t* limits, size_t n) { + + for(size_t i = 0; i < n; ++i) { + if(limits[i]) { + ADM_qos_limit_destroy(limits[i]); + } + } + + free(limits); +} + + #endif // SCORD_COMMON_H diff --git a/examples/c/common.h b/examples/c/common.h index 3d3333534febab7c26f064447e284a8167a7ff23..16b383dc465e9c0f3df7489ada0cf29ea640949c 100644 --- a/examples/c/common.h +++ b/examples/c/common.h @@ -9,4 +9,10 @@ prepare_datasets(const char* pattern, size_t n); void destroy_datasets(ADM_dataset_t datasets[], size_t n); +ADM_qos_limit_t* +prepare_qos_limits(size_t n); + +void +destroy_qos_limits(ADM_qos_limit_t limits[], size_t n); + #endif // SCORD_C_EXAMPLES_COMMON_H diff --git a/examples/cxx/ADM_transfer_dataset.cpp b/examples/cxx/ADM_transfer_dataset.cpp deleted file mode 100644 index 1eb3f2c56b4ba9d51f708ee102668a3d34036966..0000000000000000000000000000000000000000 --- a/examples/cxx/ADM_transfer_dataset.cpp +++ /dev/null @@ -1,66 +0,0 @@ -/****************************************************************************** - * Copyright 2021, Barcelona Supercomputing Center (BSC), Spain - * - * This software was partially supported by the EuroHPC-funded project ADMIRE - * (Project ID: 956748, https://www.admire-eurohpc.eu). - * - * This file is part of scord. - * - * scord is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * scord is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with scord. If not, see . - * - * SPDX-License-Identifier: GPL-3.0-or-later - *****************************************************************************/ - -#include -#include - - -int -main(int argc, char* argv[]) { - - if(argc != 2) { - fmt::print(stderr, "ERROR: no location provided\n"); - fmt::print(stderr, "Usage: ADM_transfer_dataset \n"); - exit(EXIT_FAILURE); - } - - admire::server server{"tcp", argv[1]}; - - ADM_job_t job{}; - ADM_dataset_t** sources = nullptr; - ADM_dataset_t** targets = nullptr; - ADM_qos_limit_t** limits = nullptr; - ADM_transfer_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; - ADM_transfer_t tx{}; - ADM_return_t ret = ADM_SUCCESS; - - try { - ret = admire::transfer_dataset(server, job, sources, targets, limits, - mapping, &tx); - } catch(const std::exception& e) { - fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", - e.what()); - exit(EXIT_FAILURE); - } - - if(ret != ADM_SUCCESS) { - fmt::print(stdout, - "ADM_transfer_dataset() remote procedure not completed " - "successfully\n"); - exit(EXIT_FAILURE); - } else { - fmt::print(stdout, "ADM_transfer_dataset() remote procedure completed " - "successfully\n"); - } -} diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7d39e932b47fa9ac97051401fa5250d9ef44341b --- /dev/null +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -0,0 +1,74 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "common.hpp" + +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 4 + +int +main(int argc, char* argv[]) { + + if(argc != 2) { + fmt::print(stderr, "ERROR: no server address provided\n"); + fmt::print(stderr, "Usage: ADM_transfer_datasets \n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); + const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); + const auto qos_limits = prepare_qos_limits(NLIMITS); + const auto mapping = admire::transfer::mapping::n_to_n; + + auto p = std::make_unique( + admire::storage::type::gekkofs, "foobar", + admire::adhoc_storage::execution_mode::separate_new, + admire::adhoc_storage::access_type::read_write, 42, 100, false); + + admire::job_requirements reqs(inputs, outputs, std::move(p)); + + try { + const auto job = admire::register_job(server, reqs); + const auto transfer = admire::transfer_datasets( + server, job, sources, targets, qos_limits, mapping); + + fmt::print(stdout, "ADM_transfer_datasets() remote procedure completed " + "successfully\n"); + exit(EXIT_SUCCESS); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_transfer_datasets() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } +} diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 18bcaa91cfac242f0de224201a36eeb2c1f92de8..86a81f2520bb6b6181d7dc15c2b5e8a70e3850a4 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -33,7 +33,7 @@ list(APPEND examples_cxx # pfs storage ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage # transfers - ADM_transfer_dataset ADM_get_transfer_priority ADM_set_transfer_priority + ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers # qos ADM_set_qos_constraints ADM_get_qos_constraints diff --git a/examples/cxx/common.cpp b/examples/cxx/common.cpp index 83dc666ceeec90b54a89ee14734ee40f5c634249..5461521bec7d0e0d14b8e01a109f58e969a754d7 100644 --- a/examples/cxx/common.cpp +++ b/examples/cxx/common.cpp @@ -10,3 +10,16 @@ prepare_datasets(const std::string& pattern, size_t n) { return datasets; } + +std::vector +prepare_qos_limits(size_t n) { + + std::vector limits; + limits.reserve(n); + + for(size_t i = 0; i < n; ++i) { + limits.emplace_back(admire::qos::subclass::bandwidth, 50); + } + + return limits; +} diff --git a/examples/cxx/common.hpp b/examples/cxx/common.hpp index b64a32e77270336897f7fedcb50c6ca9977d91d5..f3618be58a907e046b4999471b5d447046b981c7 100644 --- a/examples/cxx/common.hpp +++ b/examples/cxx/common.hpp @@ -7,4 +7,7 @@ std::vector prepare_datasets(const std::string& pattern, size_t n); +std::vector +prepare_qos_limits(size_t n); + #endif // SCORD_CXX_EXAMPLES_COMMON_HPP diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index 13b43bfec1f4407f892c9518bed36c57cf50c516..0cb5db799bc79bf84ee789bfb3ec994cc3673ac3 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -91,6 +91,9 @@ typedef struct adm_dataset_info* ADM_dataset_info_t; /** A list of datasets */ typedef struct adm_dataset_list* ADM_dataset_list_t; +/** A list of QoS limits */ +typedef struct adm_qos_limit_list* ADM_qos_limit_list_t; + /* ----------------------------------------------------- */ /* Storage tiers */ @@ -145,7 +148,8 @@ typedef struct adm_pfs_context* ADM_pfs_context_t; typedef enum { ADM_QOS_SCOPE_DATASET, ADM_QOS_SCOPE_NODE, - ADM_QOS_SCOPE_JOB + ADM_QOS_SCOPE_JOB, + ADM_QOS_SCOPE_TRANSFER } ADM_qos_scope_t; /** The class of QoS limit applied to a scope */ @@ -456,13 +460,13 @@ ADM_pfs_context_destroy(ADM_pfs_context_t ctx); * * @param[in] scope The scope of the entity, i.e. ADM_QOS_SCOPE_DATASET, * ADM_QOS_SCOPE_NODE, or ADM_QOS_SCOPE_JOB. - * @param[in] ... A single argument with data from either a ADM_dataset_t, + * @param[in] data A single argument with data from either a ADM_dataset_t, * ADM_node_t, or ADM_job_t variable. The argument must correspond properly * to the scope provided. * @return A valid ADM_qos_entity_t if successful or NULL in case of failure. */ ADM_qos_entity_t -ADM_qos_entity_create(ADM_qos_scope_t scope, ...); +ADM_qos_entity_create(ADM_qos_scope_t scope, void* data); /** * Destroy a QoS entity created by ADM_qos_entity_create(). @@ -496,6 +500,30 @@ ADM_qos_limit_create(ADM_qos_entity_t entity, ADM_qos_class_t cls, ADM_return_t ADM_qos_limit_destroy(ADM_qos_limit_t limit); +/** + * Create a list of QoS limits from an array of ADM_QOS_LIMITs and its + * length. + * + * @remark QoS limit lists need to be freed by calling + * ADM_qos_limit_list_destroy(). + * + * @param[in] limits The array of QoS limits. + * @param[in] len The length of the array. + * @return A valid ADM_qos_limit_list_t if successful or NULL in case of + * failure. + */ +ADM_qos_limit_list_t +ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t len); + +/** + * Destroy a QoS limit list created by ADM_qos_limit_list_create(). + * + * @param[in] list A valid ADM_qos_limit_list_t + * @return ADM_SUCCESS or corresponding ADM error code + */ +ADM_return_t +ADM_qos_limit_list_destroy(ADM_qos_limit_list_t list); + /* ----------------------------------------------------- */ /* Data operations */ diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index ca712bcc0139e19ddfc1aeefeeece024fa378ffb..396006a8cf80348520c5daa755a821281731b0bd 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include "admire_types.h" namespace admire { @@ -38,6 +39,7 @@ namespace admire { using error_code = ADM_return_t; using job_id = std::uint64_t; +using transfer_id = std::uint64_t; struct server { @@ -58,6 +60,26 @@ private: std::unique_ptr m_pimpl; }; +struct node { + + explicit node(std::string hostname); + explicit node(const ADM_node_t& srv); + node(const node&) noexcept; + node(node&&) noexcept; + node& + operator=(const node&) noexcept; + node& + operator=(node&&) noexcept; + ~node(); + + std::string + hostname() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + struct job { explicit job(job_id id); @@ -78,6 +100,108 @@ private: std::unique_ptr m_pimpl; }; +struct transfer { + + enum class mapping : std::underlying_type::type { + one_to_one = ADM_MAPPING_ONE_TO_ONE, + one_to_n = ADM_MAPPING_ONE_TO_N, + n_to_n = ADM_MAPPING_N_TO_N + }; + + explicit transfer(transfer_id id); + explicit transfer(ADM_transfer_t transfer); + + transfer(const transfer&) noexcept; + transfer(transfer&&) noexcept; + transfer& + operator=(const transfer&) noexcept; + transfer& + operator=(transfer&&) noexcept; + + ~transfer(); + + transfer_id + id() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + +namespace qos { + +enum class subclass : std::underlying_type::type { + bandwidth = ADM_QOS_CLASS_BANDWIDTH, + iops = ADM_QOS_CLASS_IOPS, +}; + +enum class scope : std::underlying_type::type { + dataset = ADM_QOS_SCOPE_DATASET, + node = ADM_QOS_SCOPE_NODE, + job = ADM_QOS_SCOPE_JOB, + transfer = ADM_QOS_SCOPE_TRANSFER +}; + +struct entity { + + template + entity(admire::qos::scope s, T&& data); + explicit entity(ADM_qos_entity_t entity); + + entity(const entity&) noexcept; + entity(entity&&) noexcept; + entity& + operator=(const entity&) noexcept; + entity& + operator=(entity&&) noexcept; + + ~entity(); + + admire::qos::scope + scope() const; + + template + T + data() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + +struct limit { + + limit(admire::qos::subclass cls, uint64_t value); + limit(admire::qos::subclass cls, uint64_t value, + const admire::qos::entity& e); + explicit limit(ADM_qos_limit_t l); + + limit(const limit&) noexcept; + limit(limit&&) noexcept; + limit& + operator=(const limit&) noexcept; + limit& + operator=(limit&&) noexcept; + + ~limit(); + + std::optional + entity() const; + + admire::qos::subclass + subclass() const; + + uint64_t + value() const; + +private: + class impl; + std::unique_ptr m_pimpl; +}; + +} // namespace qos + + struct dataset { explicit dataset(std::string id); explicit dataset(ADM_dataset_t dataset); @@ -304,7 +428,7 @@ struct fmt::formatter : formatter { auto format(const admire::job& j, FormatContext& ctx) const { return formatter::format( - fmt::format("id: {}", j.id()), ctx); + fmt::format("{{id: {}}}", j.id()), ctx); } }; @@ -314,7 +438,20 @@ struct fmt::formatter : formatter { template auto format(const admire::dataset& d, FormatContext& ctx) const { - return formatter::format("\"" + d.id() + "\"", ctx); + const auto str = fmt::format("{{id: {}}}", std::quoted(d.id())); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::node& n, FormatContext& ctx) const { + const auto str = + fmt::format("{{hostname: {}}}", std::quoted(n.hostname())); + return formatter::format(str, ctx); } }; @@ -520,11 +657,174 @@ struct fmt::formatter : formatter { auto format(const admire::job_requirements& r, FormatContext& ctx) const { return formatter::format( - fmt::format("inputs: [{}], outputs: [{}], storage: {}", - fmt::join(r.inputs(), ", "), - fmt::join(r.outputs(), ", "), r.storage()), + fmt::format("{{inputs: {}, outputs: {}, storage: {}}}", + r.inputs(), r.outputs(), r.storage()), ctx); } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::qos::scope& s, FormatContext& ctx) const { + + using scope = admire::qos::scope; + + std::string_view name = "unknown"; + + switch(s) { + case scope::dataset: + name = "ADM_QOS_SCOPE_DATASET"; + break; + case scope::node: + name = "ADM_QOS_SCOPE_NODE"; + break; + case scope::job: + name = "ADM_QOS_SCOPE_JOB"; + break; + case scope::transfer: + name = "ADM_QOS_SCOPE_TRANSFER"; + break; + } + + return formatter::format(name, ctx); + } +}; + +template <> +struct fmt::formatter> + : formatter { + // parse is inherited from formatter. + template + auto + format(const std::optional& e, + FormatContext& ctx) const { + + if(!e) { + return formatter::format("none", ctx); + } + + std::string_view data = "unknown"; + + switch(e->scope()) { + case admire::qos::scope::dataset: + data = fmt::format("{}", e->data()); + break; + case admire::qos::scope::node: + data = fmt::format("{}", e->data()); + break; + case admire::qos::scope::job: + data = fmt::format("{}", e->data()); + break; + case admire::qos::scope::transfer: + data = fmt::format("{}", e->data()); + break; + } + + const auto str = + fmt::format("{{scope: {}, data: {}}}", e->scope(), data); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::qos::subclass& sc, FormatContext& ctx) const { + + using subclass = admire::qos::subclass; + + std::string_view name = "unknown"; + + switch(sc) { + case subclass::bandwidth: + name = "ADM_QOS_CLASS_BANDWIDTH"; + break; + case subclass::iops: + name = "ADM_QOS_CLASS_IOPS"; + break; + } + + return formatter::format(name, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::qos::limit& l, FormatContext& ctx) const { + const auto str = fmt::format("{{entity: {}, subclass: {}, value: {}}}", + l.entity(), l.subclass(), l.value()); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::transfer::mapping& m, FormatContext& ctx) const { + + using mapping = admire::transfer::mapping; + + std::string_view name = "unknown"; + + switch(m) { + case mapping::one_to_one: + name = "ADM_MAPPING_ONE_TO_ONE"; + break; + case mapping::one_to_n: + name = "ADM_MAPPING_ONE_TO_N"; + break; + case mapping::n_to_n: + name = "ADM_MAPPING_N_TO_N"; + break; + } + + return formatter::format(name, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::transfer& tx, FormatContext& ctx) const { + const auto str = fmt::format("{{id: {}}}", tx.id()); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter> + : formatter { + // parse is inherited from formatter. + template + auto + format(const std::vector& v, FormatContext& ctx) const { + const auto str = fmt::format("[{}]", fmt::join(v, ", ")); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter> + : formatter { + // parse is inherited from formatter. + template + auto + format(const std::vector& l, FormatContext& ctx) const { + const auto str = fmt::format("[{}]", fmt::join(l, ", ")); + return formatter::format(str, ctx); + } +}; + #endif // SCORD_ADMIRE_TYPES_HPP diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index a6d05b3b6a0ef89d6fedcb40e123a5bceb725a4c..591b56e9148e38eef113a24dc3487477d00042d5 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -23,14 +23,40 @@ *****************************************************************************/ #include +#include #include "convert.hpp" // forward declarations ADM_job_t ADM_job_create(uint64_t id); +ADM_transfer_t +ADM_transfer_create(uint64_t id); + +namespace { + +admire::api::managed_ctype_array +as_ctype_array(const std::vector& datasets) { + + std::vector tmp; + + std::transform(datasets.cbegin(), datasets.cend(), std::back_inserter(tmp), + [](const admire::dataset& d) { + return ADM_dataset_create(d.id().c_str()); + }); + + return admire::api::managed_ctype_array{std::move(tmp)}; +} + +} // namespace + namespace admire::api { +managed_ctype +convert(const node& node) { + return managed_ctype(ADM_node_create(node.hostname().c_str())); +} + managed_ctype convert(const adhoc_storage::ctx& ctx) { return managed_ctype{ADM_adhoc_context_create( @@ -53,7 +79,13 @@ convert(const admire::adhoc_storage& st) { return managed_ctype{c_st, std::move(managed_ctx)}; } -managed_ctype_array +managed_ctype +convert(const admire::dataset& dataset) { + return managed_ctype( + ADM_dataset_create(dataset.id().c_str())); +} + +managed_ctype convert(const std::vector& datasets) { std::vector tmp; @@ -63,7 +95,39 @@ convert(const std::vector& datasets) { return ADM_dataset_create(d.id().c_str()); }); - return managed_ctype_array{std::move(tmp)}; + auto rv = managed_ctype{ + ADM_dataset_list_create(tmp.data(), tmp.size())}; + + std::for_each(tmp.cbegin(), tmp.cend(), + [](ADM_dataset_t d) { ADM_dataset_destroy(d); }); + + return rv; +} + +std::vector +convert(ADM_dataset_t datasets[], size_t datasets_len) { + + std::vector rv; + rv.reserve(datasets_len); + + for(size_t i = 0; i < datasets_len; ++i) { + rv.emplace_back(datasets[i]); + } + + return rv; +} + +std::vector +convert(ADM_dataset_list_t list) { + + std::vector rv; + rv.reserve(list->l_length); + + for(size_t i = 0; i < list->l_length; ++i) { + rv.emplace_back(&list->l_datasets[i]); + } + + return rv; } managed_ctype @@ -73,8 +137,8 @@ convert(const admire::job_requirements& reqs) { *std::dynamic_pointer_cast(reqs.storage()); auto managed_storage = convert(adhoc_storage); - auto managed_inputs = convert(reqs.inputs()); - auto managed_outputs = convert(reqs.outputs()); + auto managed_inputs = as_ctype_array(reqs.inputs()); + auto managed_outputs = as_ctype_array(reqs.outputs()); ADM_job_requirements_t c_reqs = ADM_job_requirements_create( managed_inputs.data(), managed_inputs.size(), @@ -97,4 +161,103 @@ convert(ADM_job_t j) { return admire::job{j}; } +managed_ctype +convert(const transfer& tx) { + return managed_ctype(ADM_transfer_create(tx.id())); +} + +transfer +convert(ADM_transfer_t tx) { + return transfer{tx}; +} + +managed_ctype +convert(const std::vector& limits) { + + std::vector tmp; + + std::transform( + limits.cbegin(), limits.cend(), std::back_inserter(tmp), + [](const admire::qos::limit& l) { + ADM_qos_entity_t e = nullptr; + + if(l.entity()) { + + switch(const auto s = l.entity()->scope()) { + case qos::scope::dataset: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity()->data()) + .release()); + break; + } + + case qos::scope::node: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity()->data()) + .release()); + break; + } + + case qos::scope::job: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity()->data()) + .release()); + break; + } + + case qos::scope::transfer: { + e = ADM_qos_entity_create( + static_cast(s), + convert(l.entity() + ->data()) + .release()); + break; + } + } + } + + return ADM_qos_limit_create( + e, static_cast(l.subclass()), + l.value()); + }); + + auto rv = managed_ctype{ + ADM_qos_limit_list_create(tmp.data(), tmp.size())}; + + std::for_each(tmp.cbegin(), tmp.cend(), + [](ADM_qos_limit_t l) { ADM_qos_limit_destroy_all(l); }); + + return rv; +} + +std::vector +convert(ADM_qos_limit_t limits[], size_t limits_len) { + + std::vector rv; + rv.reserve(limits_len); + + for(size_t i = 0; i < limits_len; ++i) { + rv.emplace_back(limits[i]); + } + + return rv; +} + +std::vector +convert(ADM_qos_limit_list_t list) { + + std::vector rv; + rv.reserve(list->l_length); + + for(size_t i = 0; i < list->l_length; ++i) { + rv.emplace_back(&list->l_limits[i]); + } + + return rv; +} + + } // namespace admire::api diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 8ac73ca100dc07c734f73de661f82b84e158cf84..2b5697910a22a980c896a0cd0aa7c1bc3c73600e 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -39,15 +39,27 @@ struct managed_ctype_array; // conversion functions between C API and CXX API types +managed_ctype +convert(const node& node); + managed_ctype convert(const adhoc_storage::ctx& ctx); managed_ctype convert(const admire::adhoc_storage& st); -managed_ctype_array +managed_ctype +convert(const admire::dataset& dataset); + +managed_ctype convert(const std::vector& datasets); +std::vector +convert(ADM_dataset_t datasets[], size_t datasets_len); + +std::vector +convert(ADM_dataset_list_t list); + managed_ctype convert(const admire::job_requirements& reqs); @@ -57,6 +69,20 @@ convert(const job& j); job convert(ADM_job_t j); +managed_ctype +convert(const transfer& t); + +transfer +convert(ADM_transfer_t j); + +managed_ctype +convert(const std::vector& limits); + +std::vector +convert(ADM_qos_limit_t limits[], size_t limits_len); + +std::vector +convert(ADM_qos_limit_list_t list); } // namespace admire::api @@ -65,6 +91,24 @@ convert(ADM_job_t j); // Specializations for conversion types //////////////////////////////////////////////////////////////////////////////// +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_node_t node) : m_node(node) {} + + ADM_node_t + get() const { + return m_node.get(); + } + + ADM_node_t + release() { + return m_node.release(); + } + + scord::utils::ctype_ptr m_node; +}; + template <> struct admire::api::managed_ctype { @@ -105,6 +149,24 @@ struct admire::api::managed_ctype { managed_ctype m_ctx; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_dataset_t dataset) : m_dataset(dataset) {} + + ADM_dataset_t + get() const { + return m_dataset.get(); + } + + ADM_dataset_t + release() { + return m_dataset.release(); + } + + scord::utils::ctype_ptr m_dataset; +}; + template <> struct admire::api::managed_ctype_array { @@ -138,6 +200,25 @@ struct admire::api::managed_ctype_array { m_datasets; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_dataset_list_t list) : m_list(list) {} + + ADM_dataset_list_t + get() const { + return m_list.get(); + } + + ADM_dataset_list_t + release() { + return m_list.release(); + } + + scord::utils::ctype_ptr + m_list; +}; + template <> struct admire::api::managed_ctype { @@ -172,6 +253,9 @@ struct admire::api::managed_ctype { ADM_return_t ADM_job_destroy(ADM_job_t job); +ADM_return_t +ADM_transfer_destroy(ADM_transfer_t tx); + template <> struct admire::api::managed_ctype { @@ -190,4 +274,77 @@ struct admire::api::managed_ctype { scord::utils::ctype_ptr m_job; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_transfer_t tx) : m_transfer(tx) {} + + ADM_transfer_t + get() const { + return m_transfer.get(); + } + + ADM_transfer_t + release() { + return m_transfer.release(); + } + + scord::utils::ctype_ptr m_transfer; +}; + +ADM_return_t +ADM_qos_limit_destroy_all(ADM_qos_limit_t l); + +template <> +struct admire::api::managed_ctype_array { + + explicit managed_ctype_array(ADM_qos_limit_t data[], size_t size) + : m_qos_limits(data, size) {} + + explicit managed_ctype_array(std::vector&& v) + : m_qos_limits(v.data(), v.size()) {} + + constexpr size_t + size() const { + return m_qos_limits.size(); + } + + constexpr const ADM_qos_limit_t* + data() const noexcept { + return m_qos_limits.data(); + } + + constexpr ADM_qos_limit_t* + data() noexcept { + return m_qos_limits.data(); + } + + constexpr ADM_qos_limit_t* + release() noexcept { + return m_qos_limits.release(); + } + + scord::utils::ctype_ptr_vector + m_qos_limits; +}; + +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_qos_limit_list_t list) : m_list(list) {} + + ADM_qos_limit_list_t + get() const { + return m_list.get(); + } + + ADM_qos_limit_list_t + release() { + return m_list.release(); + } + + scord::utils::ctype_ptr + m_list; +}; + #endif // SCORD_CONVERT_HPP diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index c4178299d2958650ce2663492173249928c141c1..85636c77e31bc561cbd70c0a43bf9823626d2ccf 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -28,6 +28,8 @@ #include #include #include +#include +#include #include "admire_types.hpp" /******************************************************************************/ @@ -158,7 +160,7 @@ ADM_dataset_destroy(ADM_dataset_t dataset) { } ADM_qos_entity_t -ADM_qos_entity_create(ADM_qos_scope_t scope, ...) { +ADM_qos_entity_create(ADM_qos_scope_t scope, void* data) { struct adm_qos_entity* adm_qos_entity = (struct adm_qos_entity*) malloc(sizeof(struct adm_qos_entity)); @@ -170,21 +172,19 @@ ADM_qos_entity_create(ADM_qos_scope_t scope, ...) { adm_qos_entity->e_scope = scope; - va_list ap; - va_start(ap, scope); - switch(scope) { case ADM_QOS_SCOPE_NODE: - adm_qos_entity->e_node = va_arg(ap, ADM_node_t); + adm_qos_entity->e_node = (ADM_node_t) data; break; case ADM_QOS_SCOPE_JOB: - adm_qos_entity->e_job = va_arg(ap, ADM_job_t); + adm_qos_entity->e_job = (ADM_job_t) data; break; case ADM_QOS_SCOPE_DATASET: - adm_qos_entity->e_dataset = va_arg(ap, ADM_dataset_t); + adm_qos_entity->e_dataset = (ADM_dataset_t) data; break; + case ADM_QOS_SCOPE_TRANSFER: + adm_qos_entity->e_transfer = (ADM_transfer_t) data; } - va_end(ap); return adm_qos_entity; } @@ -235,6 +235,23 @@ ADM_qos_limit_destroy(ADM_qos_limit_t limit) { return ret; } +ADM_return_t +ADM_qos_limit_destroy_all(ADM_qos_limit_t limit) { + ADM_return_t ret = ADM_SUCCESS; + + if(!limit) { + LOGGER_ERROR("Invalid ADM_qos_limit_t") + return ADM_EBADARGS; + } + + if(limit->l_entity) { + ADM_qos_entity_destroy(limit->l_entity); + } + + free(limit); + return ret; +} + ADM_dataset_info_t ADM_dataset_info_create() { @@ -692,6 +709,122 @@ ADM_job_destroy(ADM_job_t job) { return ret; } +/** + * Initialize a transfer handle that can be used by clients to refer to a + * transfer. + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param [in] id The identifier for this transfer + * @return A valid TRANSFER HANDLE or NULL in case of failure. + */ +ADM_transfer_t +ADM_transfer_create(uint64_t id) { + + struct adm_transfer* adm_transfer = + (struct adm_transfer*) malloc(sizeof(struct adm_transfer)); + + if(!adm_transfer) { + LOGGER_ERROR("Could not allocate ADM_transfer_t") + return NULL; + } + + adm_transfer->t_id = id; + + return adm_transfer; +} + +/** + * Destroy a ADM_transfer_t created by ADM_transfer_create(). + * + * @remark This function is not actually part of the public API, but it is + * useful to have for internal purposes + * + * @param[in] tx The ADM_transfer_t to destroy. + * @return ADM_SUCCESS or corresponding error code. + */ +ADM_return_t +ADM_transfer_destroy(ADM_transfer_t tx) { + ADM_return_t ret = ADM_SUCCESS; + + if(!tx) { + LOGGER_ERROR("Invalid ADM_transfer_t") + return ADM_EBADARGS; + } + + free(tx); + return ret; +} + +ADM_qos_limit_list_t +ADM_qos_limit_list_create(ADM_qos_limit_t limits[], size_t length) { + + ADM_qos_limit_list_t p = (ADM_qos_limit_list_t) malloc(sizeof(*p)); + + if(!p) { + LOGGER_ERROR("Could not allocate ADM_qos_limit_list_t") + return NULL; + } + + const char* error_msg = NULL; + + p->l_length = length; + p->l_limits = (struct adm_qos_limit*) calloc(length, sizeof(adm_qos_limit)); + + if(!p->l_limits) { + error_msg = "Could not allocate ADM_qos_limit_list_t"; + goto cleanup_on_error; + } + + for(size_t i = 0; i < length; ++i) { + memcpy(&p->l_limits[i], limits[i], sizeof(adm_qos_limit)); + } + + return p; + +cleanup_on_error: + if(p->l_limits) { + free(p->l_limits); + } + free(p); + + if(error_msg) { + LOGGER_ERROR(error_msg); + } + + return NULL; +} + +ADM_return_t +ADM_qos_limit_list_destroy(ADM_qos_limit_list_t list) { + + ADM_return_t ret = ADM_SUCCESS; + + if(!list) { + LOGGER_ERROR("Invalid ADM_qos_limit_list_t") + return ADM_EBADARGS; + } + + // We cannot call ADM_qos_limit_destroy here because adm_limits + // are stored as a consecutive array in memory. Thus, we free + // the entities themselves and then the array. + if(list->l_limits) { + for(size_t i = 0; i < list->l_length; ++i) { + + ADM_qos_entity_t entity = list->l_limits[i].l_entity; + + if(entity) { + ADM_qos_entity_destroy(entity); + } + } + free(list->l_limits); + } + + free(list); + return ret; +} + /******************************************************************************/ /* C++ Type definitions and related functions */ @@ -744,6 +877,45 @@ server::address() const { return m_pimpl->address(); } +class node::impl { + +public: + explicit impl(std::string hostname) : m_hostname(std::move(hostname)) {} + + std::string + hostname() const { + return m_hostname; + } + +private: + std::string m_hostname; +}; + +node::node(std::string hostname) + : m_pimpl(std::make_unique(std::move(hostname))) {} + +node::node(const ADM_node_t& node) : node::node(node->n_hostname) {} + +node::node(const node& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +node::node(node&&) noexcept = default; + +node& +node::operator=(const node& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +node& +node::operator=(node&&) noexcept = default; + +node::~node() = default; + +std::string +node::hostname() const { + return m_pimpl->hostname(); +} class job::impl { @@ -791,6 +963,54 @@ job::id() const { return m_pimpl->id(); } +class transfer::impl { + +public: + explicit impl(transfer_id id) : m_id(id) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + transfer_id + id() const { + return m_id; + } + +private: + transfer_id m_id; +}; + +transfer::transfer(transfer_id id) + : m_pimpl(std::make_unique(id)) {} + +transfer::transfer(ADM_transfer_t transfer) + : transfer::transfer(transfer->t_id) {} + +transfer::transfer(transfer&&) noexcept = default; + +transfer& +transfer::operator=(transfer&&) noexcept = default; + +transfer::transfer(const transfer& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +transfer& +transfer::operator=(const transfer& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +transfer::~transfer() = default; + +transfer_id +transfer::id() const { + return m_pimpl->id(); +} + class dataset::impl { public: explicit impl(std::string id) : m_id(std::move(id)) {} @@ -1123,5 +1343,195 @@ job_requirements::storage() const { return m_pimpl->storage(); } +namespace qos { + +class entity::impl { +public: + template + impl(const admire::qos::scope& s, T&& data) : m_scope(s), m_data(data) {} + + explicit impl(ADM_qos_entity_t entity) + : m_scope(static_cast(entity->e_scope)), + m_data(init_helper(entity)) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + admire::qos::scope + scope() const { + return m_scope; + } + + template + T + data() const { + return std::get(m_data); + } + +private: + static std::variant + init_helper(ADM_qos_entity_t entity) { + switch(entity->e_scope) { + case ADM_QOS_SCOPE_DATASET: + return admire::dataset(entity->e_dataset); + case ADM_QOS_SCOPE_NODE: + return admire::node(entity->e_node); + case ADM_QOS_SCOPE_JOB: + return admire::job(entity->e_job); + case ADM_QOS_SCOPE_TRANSFER: + return admire::transfer(entity->e_transfer); + default: + throw std::runtime_error(fmt::format( + "Unexpected scope value: {}", entity->e_scope)); + } + } + + +private: + admire::qos::scope m_scope; + std::variant m_data; +}; + +template +entity::entity(admire::qos::scope s, T&& data) + : m_pimpl(std::make_unique(s, std::forward(data))) {} + +entity::entity(ADM_qos_entity_t entity) + : m_pimpl(std::make_unique(entity)) {} + +entity::entity(const entity& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +entity::entity(entity&&) noexcept = default; + +entity& +entity::operator=(const entity& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +entity& +entity::operator=(entity&&) noexcept = default; + +entity::~entity() = default; + +admire::qos::scope +entity::scope() const { + return m_pimpl->scope(); +} + +template <> +admire::node +entity::data() const { + return m_pimpl->data(); +} + +template <> +admire::job +entity::data() const { + return m_pimpl->data(); +} + +template <> +admire::dataset +entity::data() const { + return m_pimpl->data(); +} + +template <> +admire::transfer +entity::data() const { + return m_pimpl->data(); +} + + +class limit::impl { + +public: + impl(admire::qos::subclass cls, uint64_t value, admire::qos::entity e) + : m_subclass(cls), m_value(value), m_entity(std::move(e)) {} + + impl(admire::qos::subclass cls, uint64_t value) + : m_subclass(cls), m_value(value) {} + + explicit impl(ADM_qos_limit_t l) + : m_subclass(static_cast(l->l_class)), + m_value(l->l_value), + m_entity(l->l_entity ? std::optional(admire::qos::entity(l->l_entity)) + : std::nullopt) {} + + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + + std::optional + entity() const { + return m_entity; + } + + admire::qos::subclass + subclass() const { + return m_subclass; + } + + uint64_t + value() const { + return m_value; + } + +private: + admire::qos::subclass m_subclass; + uint64_t m_value; + std::optional m_entity; +}; + +limit::limit(admire::qos::subclass cls, uint64_t value) + : m_pimpl(std::make_unique(cls, value)) {} + +limit::limit(admire::qos::subclass cls, uint64_t value, + const admire::qos::entity& e) + : m_pimpl(std::make_unique(cls, value, e)) {} + +limit::limit(ADM_qos_limit_t l) : m_pimpl(std::make_unique(l)) {} + +limit::limit(const limit& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +limit::limit(limit&&) noexcept = default; + +limit& +limit::operator=(const limit& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +limit& +limit::operator=(limit&&) noexcept = default; + +limit::~limit() = default; + +std::optional +limit::entity() const { + return m_pimpl->entity(); +} + +admire::qos::subclass +limit::subclass() const { + return m_pimpl->subclass(); +} + +uint64_t +limit::value() const { + return m_pimpl->value(); +} + +} // namespace qos } // namespace admire diff --git a/src/common/net/proto/rpc_types.c b/src/common/net/proto/rpc_types.c index d23339e39029b8f3b60b2baceb48983acd7818eb..193787d990e1b7cee6c6150d1e19cbca1b6a1c08 100644 --- a/src/common/net/proto/rpc_types.c +++ b/src/common/net/proto/rpc_types.c @@ -24,6 +24,151 @@ #include "rpc_types.h" +hg_return_t (*hg_proc_ADM_qos_scope_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t; +hg_return_t (*hg_proc_ADM_qos_class_t)(hg_proc_t, void*) = hg_proc_hg_uint32_t; + +hg_return_t +hg_proc_ADM_node_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_node_t* node = (ADM_node_t*) data; + ADM_node_t tmp = NULL; + hg_size_t node_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_node object we need to send + node_length = *node ? sizeof(adm_node) : 0; + ret = hg_proc_hg_size_t(proc, &node_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!node_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_node object, + // write it to the mercury buffer + tmp = *node; + + ret = hg_proc_adm_node(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_node object + ret = hg_proc_hg_size_t(proc, &node_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!node_length) { + *node = NULL; + break; + } + + // if the received adm_node object was not NULL, read each of + // its fields from the mercury buffer + tmp = (adm_node*) calloc(1, sizeof(adm_node)); + + ret = hg_proc_adm_node(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *node = tmp; + break; + + case HG_FREE: + tmp = *node; + free(tmp); + break; + } + + return ret; +} + +hg_return_t +hg_proc_ADM_dataset_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_dataset_t* dataset = (ADM_dataset_t*) data; + ADM_dataset_t tmp = NULL; + hg_size_t dataset_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_dataset object we need to send + dataset_length = *dataset ? sizeof(adm_node) : 0; + ret = hg_proc_hg_size_t(proc, &dataset_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!dataset_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_dataset object, + // write it to the mercury buffer + tmp = *dataset; + + ret = hg_proc_adm_dataset(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_dataset object + ret = hg_proc_hg_size_t(proc, &dataset_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!dataset_length) { + *dataset = NULL; + break; + } + + // if the received adm_dataset object was not NULL, read each of + // its fields from the mercury buffer + tmp = (adm_dataset*) calloc(1, sizeof(adm_dataset)); + + ret = hg_proc_adm_dataset(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *dataset = tmp; + break; + + case HG_FREE: + tmp = *dataset; + free(tmp); + break; + } + + return ret; +} + hg_return_t hg_proc_ADM_job_t(hg_proc_t proc, void* data) { @@ -95,8 +240,80 @@ hg_proc_ADM_job_t(hg_proc_t proc, void* data) { return ret; } +hg_return_t +hg_proc_ADM_transfer_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_transfer_t* transfer = (ADM_transfer_t*) data; + ADM_transfer_t tmp = NULL; + hg_size_t transfer_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_transfer object we need to send + transfer_length = *transfer ? sizeof(adm_transfer) : 0; + ret = hg_proc_hg_size_t(proc, &transfer_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!transfer_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_transfer object, + // write it to the mercury buffer + tmp = *transfer; + + ret = hg_proc_adm_transfer(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_transfer object + ret = hg_proc_hg_size_t(proc, &transfer_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!transfer_length) { + *transfer = NULL; + break; + } + + // if the received adm_transfer object was not NULL, read each of + // its fields from the mercury buffer + tmp = (adm_transfer*) calloc(1, sizeof(adm_transfer)); + + ret = hg_proc_adm_transfer(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *transfer = tmp; + break; + + case HG_FREE: + tmp = *transfer; + free(tmp); + break; + } + + return ret; +} + hg_return_t hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) { + hg_return_t ret = HG_SUCCESS; ADM_dataset_list_t* list = (ADM_dataset_list_t*) data; ADM_dataset_list_t tmp = NULL; @@ -424,3 +641,183 @@ hg_proc_ADM_pfs_context_t(hg_proc_t proc, void* data) { return ret; } + + +hg_return_t +hg_proc_ADM_qos_limit_list_t(hg_proc_t proc, void* data) { + hg_return_t ret = HG_SUCCESS; + ADM_qos_limit_list_t* list = (ADM_qos_limit_list_t*) data; + ADM_qos_limit_list_t tmp = NULL; + + hg_size_t length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + tmp = *list; + // write the length of the list + length = tmp->l_length; + ret = hg_proc_hg_size_t(proc, &tmp->l_length); + + if(ret != HG_SUCCESS) { + break; + } + + // write the list + for(size_t i = 0; i < length; ++i) { + ret = hg_proc_adm_qos_limit(proc, &tmp->l_limits[i]); + + if(ret != HG_SUCCESS) { + break; + } + } + break; + + case HG_DECODE: { + + // find out the length of the list + ret = hg_proc_hg_size_t(proc, &length); + + if(ret != HG_SUCCESS) { + break; + } + + // loop and create list elements + tmp = (ADM_qos_limit_list_t) calloc( + 1, sizeof(struct adm_qos_limit_list)); + tmp->l_length = length; + tmp->l_limits = + (adm_qos_limit*) calloc(length, sizeof(adm_qos_limit)); + + for(size_t i = 0; i < length; ++i) { + ret = hg_proc_adm_qos_limit(proc, &tmp->l_limits[i]); + + if(ret != HG_SUCCESS) { + break; + } + } + + // return the newly-created list + *list = tmp; + + break; + } + + case HG_FREE: + tmp = *list; + free(tmp->l_limits); + free(tmp); + ret = HG_SUCCESS; + break; + } + + return ret; +} + +hg_return_t +hg_proc_ADM_qos_entity_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_qos_entity_t* qos_entity = (ADM_qos_entity_t*) data; + ADM_qos_entity_t tmp = NULL; + hg_size_t qos_entity_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_qos_entity object we need to send + qos_entity_length = *qos_entity ? sizeof(adm_qos_entity) : 0; + ret = hg_proc_hg_size_t(proc, &qos_entity_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!qos_entity_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_qos_entity object, + // write each of its fields to the mercury buffer + + // 1. the QoS scope + ret = hg_proc_ADM_qos_scope_t(proc, &tmp->e_scope); + + if(ret != HG_SUCCESS) { + break; + } + + // 2. the appropriate related data depending on the scope (i.e. an + // ADM_node_t, ADM_job_t, ADM_dataset_t, or ADM_transfer_t) + switch(tmp->e_scope) { + + case ADM_QOS_SCOPE_DATASET: + ret = hg_proc_ADM_dataset_t(proc, &tmp->e_dataset); + break; + case ADM_QOS_SCOPE_NODE: + ret = hg_proc_ADM_node_t(proc, &tmp->e_node); + break; + case ADM_QOS_SCOPE_JOB: + ret = hg_proc_ADM_job_t(proc, &tmp->e_job); + break; + case ADM_QOS_SCOPE_TRANSFER: + ret = hg_proc_ADM_transfer_t(proc, &tmp->e_transfer); + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_qos_entity object + ret = hg_proc_hg_size_t(proc, &qos_entity_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!qos_entity_length) { + *qos_entity = NULL; + break; + } + + // if the received adm_qos_entity object was not NULL, read each + // of its fields from the mercury buffer + tmp = (adm_qos_entity*) calloc(1, sizeof(adm_qos_entity)); + + // 1. the QoS scope + ret = hg_proc_ADM_qos_scope_t(proc, &tmp->e_scope); + + if(ret != HG_SUCCESS) { + break; + } + + // 2. the appropriate related data depending on the scope (i.e. an + // ADM_node_t, ADM_job_t, ADM_dataset_t, or ADM_transfer_t) + switch(tmp->e_scope) { + + case ADM_QOS_SCOPE_DATASET: + ret = hg_proc_ADM_dataset_t(proc, &tmp->e_dataset); + break; + case ADM_QOS_SCOPE_NODE: + ret = hg_proc_ADM_node_t(proc, &tmp->e_node); + break; + case ADM_QOS_SCOPE_JOB: + ret = hg_proc_ADM_job_t(proc, &tmp->e_job); + break; + case ADM_QOS_SCOPE_TRANSFER: + ret = hg_proc_ADM_transfer_t(proc, &tmp->e_transfer); + break; + } + + // return the newly-created entity + *qos_entity = tmp; + break; + + case HG_FREE: + tmp = *qos_entity; + free(tmp); + break; + } + + return ret; +} diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index 5dc1c1595df16547a29f4e00bacdc9dba1f15e23..d07727d0fd07eed296b540615b2920447a789e52 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -44,6 +44,9 @@ typedef struct adm_node { const char* n_hostname; } adm_node; +hg_return_t +hg_proc_ADM_node_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_node, // NOLINT @@ -55,6 +58,9 @@ typedef struct adm_dataset { const char* d_id; } adm_dataset; +hg_return_t +hg_proc_ADM_dataset_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_dataset, // NOLINT @@ -76,7 +82,7 @@ MERCURY_GEN_STRUCT_PROC( hg_return_t hg_proc_ADM_job_t(hg_proc_t proc, void* data); -struct adm_qos_entity { +typedef struct adm_qos_entity { ADM_qos_scope_t e_scope; union { ADM_node_t e_node; @@ -84,27 +90,41 @@ struct adm_qos_entity { ADM_dataset_t e_dataset; ADM_transfer_t e_transfer; }; -}; +} adm_qos_entity; + +extern hg_return_t (*hg_proc_ADM_qos_scope_t)(hg_proc_t, void*); -// TODO: encoder/decoder +hg_return_t +hg_proc_ADM_qos_entity_t(hg_proc_t proc, void* data); -struct adm_qos_limit { +typedef struct adm_qos_limit { ADM_qos_entity_t l_entity; ADM_qos_class_t l_class; - uint64_t l_value; -}; + hg_uint64_t l_value; +} adm_qos_limit; -// TODO: encoder/decoder +extern hg_return_t (*hg_proc_ADM_qos_class_t)(hg_proc_t, void*); + +// clang-format off +MERCURY_GEN_STRUCT_PROC( + adm_qos_limit, // NOLINT + ((ADM_qos_entity_t) (l_entity)) + ((ADM_qos_class_t) (l_class)) + ((hg_uint64_t) (l_value)) +) +// clang-format on typedef struct adm_transfer { - // TODO: undefined for now - int32_t placeholder; + uint64_t t_id; } adm_transfer; +hg_return_t +hg_proc_ADM_transfer_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_transfer, // NOLINT - ((hg_int32_t) (placeholder)) + ((hg_uint64_t) (t_id)) ); // clang-format on @@ -378,17 +398,32 @@ MERCURY_GEN_PROC(ADM_in_transit_ops_in_t, ((hg_const_string_t) (in_transit))) MERCURY_GEN_PROC(ADM_in_transit_ops_out_t, ((int32_t) (ret))) +struct adm_qos_limit_list { + /** An array of QoS limits */ + adm_qos_limit* l_limits; + /** The length of the array */ + size_t l_length; +}; -/// ADM_transfer_dataset +hg_return_t +hg_proc_ADM_qos_limit_list_t(hg_proc_t proc, void* data); + +/// ADM_transfer_datasets + +MERCURY_GEN_PROC( + ADM_transfer_datasets_in_t, + ((ADM_job_t) (job)) + ((ADM_dataset_list_t) (sources)) + ((ADM_dataset_list_t) (targets)) + ((ADM_qos_limit_list_t) (qos_limits)) + ((hg_int32_t) (mapping)) +) MERCURY_GEN_PROC( - ADM_transfer_dataset_in_t, - ((hg_const_string_t) (source))((hg_const_string_t) (destination))( - (hg_const_string_t) (qos_constraints))( - (hg_const_string_t) (distribution))((int32_t) (job_id))) + ADM_transfer_datasets_out_t, + ((hg_int32_t) (retval)) + ((ADM_transfer_t) (tx))) -MERCURY_GEN_PROC(ADM_transfer_dataset_out_t, - ((int32_t) (ret))((hg_const_string_t) (transfer_handle))) /// ADM_set_dataset_information diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index c5b168552201a9e94bc37e0be8898269f0e7b550..2b3757aa68caf8499a333bb3560727b82bf55307 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -122,8 +122,8 @@ rpc_registration_cb(scord::network::rpc_client* client) { REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, ADM_in_transit_ops_out_t, NULL, true); - REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, NULL, true); + REGISTER_RPC(client, "ADM_transfer_datasets", ADM_transfer_datasets_in_t, + ADM_transfer_datasets_out_t, NULL, true); REGISTER_RPC(client, "ADM_set_dataset_information", ADM_set_dataset_information_in_t, @@ -400,41 +400,22 @@ remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage) { return ADM_SUCCESS; } -ADM_return_t -transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources, - ADM_dataset_t** targets, ADM_qos_limit_t** limits, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer) { - (void) srv; - (void) job; - (void) sources; - (void) targets; - (void) limits; - (void) mapping; - (void) transfer; - - scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; - - auto endp = rpc_client.lookup(srv.address()); - - LOGGER_INFO("ADM_transfer_dataset(...)"); +admire::transfer +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping) { - ADM_transfer_dataset_in_t in{}; - ADM_transfer_dataset_out_t out; + const auto rv = detail::transfer_datasets(srv, job, sources, targets, + limits, mapping); - in.source = "/tmp"; - in.destination = "/tmp"; - in.qos_constraints = "constraints"; - in.distribution = "distribution"; - - const auto rpc = endp.call("ADM_transfer_dataset", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_transfer_dataset() = {}", out.ret); - return static_cast(out.ret); + if(!rv) { + throw std::runtime_error(fmt::format( + "ADM_transfer_datasets() error: {}", ADM_strerror(rv.error()))); } - LOGGER_INFO("ADM_transfer_dataset() = {}", ADM_SUCCESS); - return ADM_SUCCESS; + return rv.value(); } ADM_return_t @@ -778,9 +759,9 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job, (void) srv; (void) job; (void) op; + (void) transfer; (void) should_stream; (void) args; - (void) transfer; scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; diff --git a/src/lib/admire.h b/src/lib/admire.h index 1404ec4d3338cc4700809421346d0caab6e34503..ea9008d78c945e9af1847129dedcbc7195763a00 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -185,13 +185,16 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage); * * @param[in] server The server to which the request is directed * @param[in] job An ADM_JOB identifying the originating job. - * @param[in] sources A list of DATASETs identifying the source dataset/s + * @param[in] sources An array of DATASETs identifying the source dataset/s * to be transferred. - * @param[in] targets A list of DATASETs identifying the destination + * @param[in] sources_len The number of DATASETs stored in sources. + * @param[in] targets An array of DATASETs identifying the destination * dataset/s and its/their desired locations in a storage tier. - * @param[in] limits A list of QOS_CONSTRAINTS that must be applied to + * @param[in] targets_len The number of DATASETs stored in targets. + * @param[in] limits An array of QOS_CONSTRAINTS that must be applied to * the transfer. These may not exceed the global ones set at node, application, * or resource level. + * @param[in] limits_len The number of QOS_CONSTRAINTS stored in limits. * @param[in] mapping A distribution strategy for the transfers (e.g. * ONE_TO_ONE, ONE_TO_MANY, MANY_TO_MANY) * @param[out] transfer A ADM_TRANSFER allowing clients to interact @@ -201,10 +204,11 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t adhoc_storage); * successfully or not. */ ADM_return_t -ADM_transfer_dataset(ADM_server_t server, ADM_job_t job, - ADM_dataset_t** sources, ADM_dataset_t** targets, - ADM_qos_limit_t** limits, ADM_transfer_mapping_t mapping, - ADM_transfer_t* transfer); +ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, + ADM_dataset_t sources[], size_t sources_len, + ADM_dataset_t targets[], size_t targets_len, + ADM_qos_limit_t limits[], size_t limits_len, + ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index a7e5a5171739ce858c7de70d155086d7ccb21820..52ffb1229480aa7001f340197d1618965e47210a 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -81,10 +81,12 @@ update_pfs_storage(const server& srv, ADM_pfs_context_t ctx, ADM_return_t remove_pfs_storage(const server& srv, ADM_storage_t pfs_storage); -ADM_return_t -transfer_dataset(const server& srv, ADM_job_t job, ADM_dataset_t** sources, - ADM_dataset_t** targets, ADM_qos_limit_t** limits, - ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); +admire::transfer +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping); ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index dba51d28c3850f6e3fe5c3ca6d9000c4931e94cb..cd58fbc4fa44099b3979d3c2a01ea99f583b84f5 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -139,15 +139,27 @@ ADM_remove_pfs_storage(ADM_server_t server, ADM_storage_t pfs_storage) { } ADM_return_t -ADM_transfer_dataset(ADM_server_t server, ADM_job_t job, - ADM_dataset_t** sources, ADM_dataset_t** targets, - ADM_qos_limit_t** limits, ADM_transfer_mapping_t mapping, - ADM_transfer_t* transfer) { +ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, + ADM_dataset_t sources[], size_t sources_len, + ADM_dataset_t targets[], size_t targets_len, + ADM_qos_limit_t limits[], size_t limits_len, + ADM_transfer_mapping_t mapping, + ADM_transfer_t* transfer) { + + const auto rv = admire::detail::transfer_datasets( + admire::server{server}, admire::job{job}, + admire::api::convert(sources, sources_len), + admire::api::convert(targets, targets_len), + admire::api::convert(limits, limits_len), + static_cast(mapping)); - const admire::server srv{server}; + if(!rv) { + return rv.error(); + } + + *transfer = admire::api::convert(*rv).release(); - return admire::transfer_dataset(srv, job, sources, targets, limits, mapping, - transfer); + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 803e6252330c0630332fa73567863ea34cbafab4..2130ed155a2ecf4779e9e20c3aac40097a58ac34 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -101,8 +101,8 @@ rpc_registration_cb(scord::network::rpc_client* client) { REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, ADM_in_transit_ops_out_t, NULL, true); - REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, NULL, true); + REGISTER_RPC(client, "ADM_transfer_datasets", ADM_transfer_datasets_in_t, + ADM_transfer_datasets_out_t, NULL, true); REGISTER_RPC(client, "ADM_set_dataset_information", ADM_set_dataset_information_in_t, @@ -177,8 +177,7 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job_requirements: {{{}}}}}", __FUNCTION__, - reqs); + LOGGER_INFO("RPC (ADM_{}) <= {{job_requirements: {}}}", __FUNCTION__, reqs); auto rpc_reqs = api::convert(reqs); @@ -188,13 +187,13 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { const auto rpc = endp.call("ADM_register_job", &in, &out); if(out.retval < 0) { - LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval); + LOGGER_ERROR("RPC (ADM_{}) => {}", __FUNCTION__, out.retval); return tl::make_unexpected(static_cast(out.retval)); } const admire::job job = api::convert(out.job); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, job: {{{}}}}}", __FUNCTION__, + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}, job: {}}}", __FUNCTION__, ADM_SUCCESS, job.id()); return job; @@ -207,8 +206,8 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC ({}): {{job: {{{}}}, job_requirements: {{{}}}}}", - "ADM_update_job", job, reqs); + LOGGER_INFO("RPC (ADM_{}): {{job: {}, job_requirements: {}}}", __FUNCTION__, + job, reqs); const auto rpc_job = api::convert(job); const auto rpc_reqs = api::convert(reqs); @@ -221,11 +220,11 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { if(out.retval < 0) { const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, retval); return retval; } - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); return ADM_SUCCESS; } @@ -236,7 +235,7 @@ remove_job(const server& srv, const job& job) { auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job); + LOGGER_INFO("RPC (ADM_{}) <= {{job: {}}}", __FUNCTION__, job); const auto rpc_job = api::convert(job); @@ -247,12 +246,54 @@ remove_job(const server& srv, const job& job) { if(out.retval < 0) { const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, retval); return retval; } - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); return ADM_SUCCESS; } +tl::expected +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping) { + + scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + + auto endp = rpc_client.lookup(srv.address()); + + LOGGER_INFO("RPC (ADM_{}) <= {{job: {}, sources: {}, targets: {}, " + "limits: {}, mapping: {}}}", + __FUNCTION__, job, sources, targets, limits, mapping); + + const auto rpc_job = api::convert(job); + const auto rpc_sources = api::convert(sources); + const auto rpc_targets = api::convert(targets); + const auto rpc_qos_limits = api::convert(limits); + + ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(), + rpc_targets.get(), rpc_qos_limits.get(), + static_cast(mapping)}; + ADM_transfer_datasets_out_t out; + + [[maybe_unused]] const auto rpc = + endp.call("ADM_transfer_datasets", &in, &out); + + if(out.retval < 0) { + LOGGER_ERROR("RPC (ADM_{}) => {{retval: {}}}", __FUNCTION__, + out.retval); + return tl::make_unexpected(static_cast(out.retval)); + } + + const admire::transfer tx = api::convert(out.tx); + + LOGGER_INFO("RPC (ADM_{}) => {{retval: {}, transfer: {}}}", __FUNCTION__, + ADM_SUCCESS, tx); + return tx; +} + + } // namespace admire::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index a047192b77149e2c19dd8a0d3ed5bc16aeaf44cd..bf524d8933b83389907281804090834299242952 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -43,6 +43,13 @@ update_job(const server& srv, const job& job, const job_requirements& reqs); admire::error_code remove_job(const server& srv, const job& job); +tl::expected +transfer_datasets(const server& srv, const job& job, + const std::vector& sources, + const std::vector& targets, + const std::vector& limits, + transfer::mapping mapping); + } // namespace admire::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index a12d812d48d452bd4f4522841f705cba8793d297..e38f7b8435d9ab001582d931155ae6a00e59b9a4 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -67,8 +67,8 @@ ADM_register_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job_requirements: {{{}}}}}", id, - __FUNCTION__, reqs); + LOGGER_INFO("RPC ID {} ({}) => {{job_requirements: {}}}", id, __FUNCTION__, + reqs); const auto job = admire::job{42}; @@ -77,8 +77,8 @@ ADM_register_job(hg_handle_t h) { out.retval = rv; out.job = admire::api::convert(job).release(); - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}, job: {{{}}}}}", id, - __FUNCTION__, rv, job); + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, job: {}}}", id, __FUNCTION__, + rv, job); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -110,13 +110,13 @@ ADM_update_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}, job_requirements: {{{}}}}}", - id, __FUNCTION__, job, reqs); + LOGGER_INFO("RPC ID {} ({}) => {{job: {}, job_requirements: {}}}", id, + __FUNCTION__, job, reqs); admire::error_code rv = ADM_SUCCESS; out.retval = rv; - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}}}", id, __FUNCTION__, rv); + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -147,12 +147,12 @@ ADM_remove_job(hg_handle_t h) { const admire::job job(in.job); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) <= {{job: {{{}}}", id, __FUNCTION__, job); + LOGGER_INFO("RPC ID {} ({}) => {{job: {}}}", id, __FUNCTION__, job); admire::error_code rv = ADM_SUCCESS; out.retval = rv; - LOGGER_INFO("RPC ID {} ({}) => {{retval: {}}}", id, __FUNCTION__, rv); + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -968,40 +968,41 @@ DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops) * successfully or not. */ static void -ADM_transfer_dataset(hg_handle_t h) { +ADM_transfer_datasets(hg_handle_t h) { [[maybe_unused]] hg_return_t ret; - ADM_transfer_dataset_in_t in; - ADM_transfer_dataset_out_t out; + ADM_transfer_datasets_in_t in; + ADM_transfer_datasets_out_t out; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - out.ret = -1; - out.transfer_handle = "fail"; + const admire::job job{in.job}; + const std::vector sources = + admire::api::convert(in.sources); + const std::vector targets = + admire::api::convert(in.targets); + const std::vector limits = + admire::api::convert(in.qos_limits); + const auto mapping = static_cast(in.mapping); - if(in.source == nullptr) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid source (nullptr)"); - } else if(in.destination == nullptr) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid destination (nullptr)"); - } else if(in.qos_constraints == nullptr) { - LOGGER_ERROR( - "ADM_transfer_dataset(): invalid qos_constraints (nullptr)"); - } else if(in.distribution == nullptr) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid distribution (nullptr)"); - } else if(in.job_id < 0) { - LOGGER_ERROR("ADM_transfer_dataset(): invalid job_id (< 0)"); - } else { - LOGGER_INFO("ADM_transfer_dataset({},{},{},{},{})", in.source, - in.destination, in.qos_constraints, in.distribution, - in.job_id); - out.ret = 0; - out.transfer_handle = "ok"; - } + const auto id = remote_procedure::new_id(); + LOGGER_INFO("RPC ID {} ({}) => {{job: {}, sources: {}, targets: {}, " + "limits: {}, mapping: {}}}", + id, __FUNCTION__, job, sources, targets, limits, mapping); + + admire::error_code rv = ADM_SUCCESS; + + const auto transfer = admire::transfer{42}; + + out.retval = rv; + out.tx = admire::api::convert(transfer).release(); + LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, transfer: {}}}", id, + __FUNCTION__, rv, transfer); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -1013,7 +1014,7 @@ ADM_transfer_dataset(hg_handle_t h) { assert(ret == HG_SUCCESS); } -DEFINE_MARGO_RPC_HANDLER(ADM_transfer_dataset) +DEFINE_MARGO_RPC_HANDLER(ADM_transfer_datasets) /** * Sets information for the dataset identified by resource_id. diff --git a/src/scord/rpc_handlers.hpp b/src/scord/rpc_handlers.hpp index ee63174ee9c0b483a09c766bde1ef1fb8cfc9de0..85dbc1d7a8f10454c3bfdfbca8a7ab45934dee85 100644 --- a/src/scord/rpc_handlers.hpp +++ b/src/scord/rpc_handlers.hpp @@ -106,8 +106,8 @@ DECLARE_MARGO_RPC_HANDLER(ADM_in_situ_ops); DECLARE_MARGO_RPC_HANDLER(ADM_in_transit_ops); -/// ADM_transfer_dataset -DECLARE_MARGO_RPC_HANDLER(ADM_transfer_dataset); +/// ADM_transfer_datasets +DECLARE_MARGO_RPC_HANDLER(ADM_transfer_datasets); /// ADM_set_dataset_information DECLARE_MARGO_RPC_HANDLER(ADM_set_dataset_information); diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp index 44e8f7aca7dc523c7551fed6732ac81a9137727e..3710382fd921e4c3ccf750906f58ca3930784b78 100644 --- a/src/scord/scord.cpp +++ b/src/scord/scord.cpp @@ -237,9 +237,9 @@ main(int argc, char* argv[]) { REGISTER_RPC(ctx, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); - REGISTER_RPC(ctx, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, ADM_transfer_dataset, - true); + REGISTER_RPC( + ctx, "ADM_transfer_datasets", ADM_transfer_datasets_in_t, + ADM_transfer_datasets_out_t, ADM_transfer_datasets, true); REGISTER_RPC(ctx, "ADM_set_dataset_information", ADM_set_dataset_information_in_t,