From fc670f6bcc3ad331769e27185a0f268a5bd0b5f3 Mon Sep 17 00:00:00 2001 From: rnou Date: Wed, 12 Jul 2023 11:58:19 +0200 Subject: [PATCH 1/7] WIP --- src/scord/rpc_server.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index f86a60c8..5638e63a 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -105,6 +105,10 @@ private: const std::vector& limits, enum scord::transfer::mapping mapping); + void + transfer_status(const network::request& req, scord::job_id job_id, + std::uint64_t cargo_id, float BW, float QoS); + job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; -- GitLab From b09c8d8bdd52446c688bd0b643de898c4377e891 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Jul 2023 12:06:36 +0200 Subject: [PATCH 2/7] WIP --- src/scord/internal_types.cpp | 35 ++++++++++++++++++++++++++++++++++ src/scord/transfer_manager.hpp | 2 +- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index 2f723ef6..cb276698 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -131,4 +131,39 @@ pfs_storage_metadata::update(scord::pfs_storage::ctx pfs_context) { m_pfs_storage.update(std::move(pfs_context)); } + +transfer_info::transfer_info(scord::transfer transfer, float qos, + std::string contact_point, float obtained_bw) + : m_transfer(transfer), m_qos(qos), + m_contact_point(std::move(contact_point)), m_obtained_bw(obtained_bw) {} + +void +transfer_info::update(float obtained_bw) { + m_obtained_bw = obtained_bw; +} + +scord::transfer +transfer_info::transfer() const { + return m_transfer; +} + +std::string const& +transfer_info::contact_point() const { + return m_contact_point; +} + +float +transfer_info::qos() const { + return m_qos; +} + +float +transfer_info::obtained_bw() const { + return m_obtained_bw; +} + +void +transfer_info::obtained_bw(float const obtained_bw) { + m_obtained_bw = obtained_bw; +} } // namespace scord::internal diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp index 6abb74f2..82921b60 100644 --- a/src/scord/transfer_manager.hpp +++ b/src/scord/transfer_manager.hpp @@ -49,7 +49,7 @@ struct transfer_manager { abt::unique_lock lock(m_transfer_mutex); - if(const auto it = m_transfer.find(id); it == m_transfer.end()) { + if(const auto it = m_transfer.find(tx_id); it == m_transfer.end()) { const auto& [it_transfer, inserted] = m_transfer.emplace( id, std::make_shared< internal::transfer_metadata>( -- GitLab From 75f9cde33c919f200bb7cd33c16200332e9e7abd Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 25 Jul 2023 13:10:03 +0200 Subject: [PATCH 3/7] WIP tests --- examples/cxx/ADM_transfer_update.cpp | 89 ++++++++++++++++++++++++++++ examples/cxx/CMakeLists.txt | 2 +- src/lib/c_wrapper.cpp | 9 +++ src/lib/detail/impl.cpp | 32 ++++++++++ src/lib/detail/impl.hpp | 4 ++ src/lib/libscord.cpp | 14 +++++ src/lib/scord/scord.h | 13 ++++ src/lib/scord/scord.hpp | 3 + src/scord/internal_types.cpp | 5 +- src/scord/rpc_server.cpp | 52 ++++++++++++++++ src/scord/rpc_server.hpp | 4 +- 11 files changed, 222 insertions(+), 5 deletions(-) create mode 100644 examples/cxx/ADM_transfer_update.cpp diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp new file mode 100644 index 00000000..01a16646 --- /dev/null +++ b/examples/cxx/ADM_transfer_update.cpp @@ -0,0 +1,89 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "common.hpp" + +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 4 + +int +main(int argc, char* argv[]) { + + test_info test_info{ + .name = TESTNAME, + .requires_server = true, + .requires_controller = true, + }; + + const auto cli_args = process_args(argc, argv, test_info); + + scord::server server{"tcp", cli_args.server_address}; + + const auto job_nodes = prepare_nodes(NJOB_NODES); + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); + const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); + const auto qos_limits = prepare_qos_limits(NLIMITS); + const auto mapping = scord::transfer::mapping::n_to_n; + + std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ + cli_args.controller_address, + scord::adhoc_storage::execution_mode::separate_new, + scord::adhoc_storage::access_type::read_write, 100, false}; + const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; + + try { + const auto adhoc_storage = scord::register_adhoc_storage( + server, name, scord::adhoc_storage::type::gekkofs, + adhoc_storage_ctx, adhoc_resources); + + scord::job::requirements reqs(inputs, outputs, adhoc_storage); + + const auto job = scord::register_job( + server, scord::job::resources{job_nodes}, reqs, 0); + const auto transfer = scord::transfer_datasets( + server, job, sources, targets, qos_limits, mapping); + + + scord::transfer_update(server, transfer, 10.0f); + fmt::print(stdout, "ADM_transfer_update remote procedure completed " + "successfully\n"); + exit(EXIT_SUCCESS); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_transfer_update() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } +} diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 86580f1f..38cdeac4 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority - ADM_cancel_transfer ADM_get_pending_transfers + ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update # qos ADM_set_qos_constraints ADM_get_qos_constraints # data operations diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 1b9f797a..fb49f0b1 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -245,6 +245,15 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, return ADM_SUCCESS; } +ADM_return_t +ADM_transfer_update(ADM_server_t server, ADM_transfer_t transfer, + float obtained_bw) { + + return scord::detail::transfer_update( + scord::server{server}, scord::transfer{transfer}, obtained_bw); +} + + ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 17c8c2c5..2316f3e6 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -530,4 +530,36 @@ transfer_datasets(const server& srv, const job& job, return tl::make_unexpected(scord::error_code::other); } + +scord::error_code +transfer_update(const server& srv, scord::transfer transfer, + float obtained_bw) { + + network::client rpc_client{srv.protocol()}; + + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); + + if(const auto& lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{transfer_id: {}}}", rpc, transfer.id()); + + if(const auto& call_rv = endp.call(rpc.name(), transfer, obtained_bw); + call_rv.has_value()) { + + const network::generic_response resp{call_rv.value()}; + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); + + return resp.error_code(); + } + } + + LOGGER_ERROR("rpc call failed"); + return scord::error_code::other; +} + } // namespace scord::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 31529bce..6e5deacc 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +scord::error_code +transfer_update(const server& srv, scord::transfer transfer, float obtained_bw); + + } // namespace scord::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 86866369..ae5e20e8 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -377,6 +377,20 @@ transfer_datasets(const server& srv, const job& job, return rv.value(); } + +void +transfer_update(const server& srv, scord::transfer transfer, + float obtained_bw) { + + const auto ec = detail::transfer_update(srv, transfer, obtained_bw); + + if(!ec) { + throw std::runtime_error( + fmt::format("ADM_transfer_update() error: {}", ec.message())); + } +} + + ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index dc6527af..da410dbb 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -251,6 +251,19 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ADM_qos_limit_t limits[], size_t limits_len, ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); +/** + * Sets the obtained bw for the transfer operation + * + * @param[in] server The server to which the request is directed + * @param[in] transfer An ADM_TRANSFER identifying the originating transfer. + * @param[in] obtained_bw a float indicating the obtained bandwidth + * + * @return Returns if the remote procedure has been completed + * successfully or not. + */ +ADM_return_t +ADM_transfer_update(ADM_server_t server, ADM_transfer_t transfer, + float obtained_bw); /** * Sets information for the dataset identified by resource_id. diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index 469d7d41..f1dfaf9f 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -98,6 +98,9 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +void +transfer_update(const server& srv, scord::transfer transfer, float obtained_bw); + ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info); diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index cb276698..22f4e47f 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -132,7 +132,8 @@ pfs_storage_metadata::update(scord::pfs_storage::ctx pfs_context) { } -transfer_info::transfer_info(scord::transfer transfer, float qos, +transfer_info::transfer_info(scord::transfer transfer, + std::vector qos, std::string contact_point, float obtained_bw) : m_transfer(transfer), m_qos(qos), m_contact_point(std::move(contact_point)), m_obtained_bw(obtained_bw) {} @@ -152,7 +153,7 @@ transfer_info::contact_point() const { return m_contact_point; } -float +std::vector transfer_info::qos() const { return m_qos; } diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 1448ff1f..b523fed5 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -755,7 +755,59 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, resp.error_code(), resp.value_or_none()); + // TODO: create a transfer in transfer manager + // We need the contact point, and different qos + + if(const auto transfer_result = + m_transfer_manager.create(tx_id.value(), contact_point, limits); + !transfer_result.has_value()) { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error creating transfer_storage: {}\"", + rpc.id(), transfer_result.error()); + ec = transfer_result.error(); + } + + req.respond(resp); } + +void +rpc_server::transfer_update(const network::request& req, + scord::transfer transfer, float obtained_bw) { + + using network::get_address; + using network::response_with_id; + using network::rpc_info; + + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, + transfer.id(), obtained_bw); + + scord::error_code ec; + + // TODO: generate a global ID for the transfer and contact Cargo to + // actually request it + + const auto resp = response_with_id{rpc.id(), ec, transfer.id()}; + + LOGGER_INFO("rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, ec, + transfer.id()); + + // TODO: create a transfer in transfer manager + // We need the contact point, and different qos + + ec = m_transfer_manager.update(transfer.id(), obtained_bw); + if(ec.no_such_entity) { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error updating transfer_storage\"", + rpc.id()); + } + + + req.respond(resp); +} + + } // namespace scord diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 5638e63a..727917d5 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -106,8 +106,8 @@ private: enum scord::transfer::mapping mapping); void - transfer_status(const network::request& req, scord::job_id job_id, - std::uint64_t cargo_id, float BW, float QoS); + transfer_update(const network::request& req, scord::transfer transfer, + float obtained_bw); job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; -- GitLab From 081f02e7de3f64ba747dd3fe65c1fe6834847279 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 26 Jul 2023 07:25:02 +0200 Subject: [PATCH 4/7] added RPC --- ci/check_rpcs.py | 2 +- examples/cxx/ADM_transfer_update.cpp | 2 +- src/lib/c_wrapper.cpp | 6 +++--- src/lib/detail/impl.cpp | 8 ++++---- src/lib/detail/impl.hpp | 2 +- src/lib/libscord.cpp | 5 ++--- src/lib/scord/scord.h | 2 +- src/lib/scord/scord.hpp | 2 +- src/scord/rpc_server.cpp | 13 +++++++------ src/scord/rpc_server.hpp | 2 +- 10 files changed, 22 insertions(+), 22 deletions(-) diff --git a/ci/check_rpcs.py b/ci/check_rpcs.py index 444bf155..4ab96fb9 100755 --- a/ci/check_rpcs.py +++ b/ci/check_rpcs.py @@ -19,7 +19,7 @@ RPC_NAMES = { 'ADM_remove_pfs_storage', 'ADM_transfer_datasets', 'ADM_get_transfer_priority', 'ADM_set_transfer_priority', 'ADM_cancel_transfer', - 'ADM_get_pending_transfers', + 'ADM_get_pending_transfers', 'ADM_transfer_update', 'ADM_set_qos_constraints', 'ADM_get_qos_constraints', 'ADM_define_data_operation', 'ADM_connect_data_operation', 'ADM_finalize_data_operation', diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index 01a16646..d92c0af0 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -77,7 +77,7 @@ main(int argc, char* argv[]) { server, job, sources, targets, qos_limits, mapping); - scord::transfer_update(server, transfer, 10.0f); + scord::transfer_update(server, transfer.id(), 10.0f); fmt::print(stdout, "ADM_transfer_update remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index fb49f0b1..71f03dc5 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -246,11 +246,11 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, } ADM_return_t -ADM_transfer_update(ADM_server_t server, ADM_transfer_t transfer, +ADM_transfer_update(ADM_server_t server, uint64_t transfer_id, float obtained_bw) { - return scord::detail::transfer_update( - scord::server{server}, scord::transfer{transfer}, obtained_bw); + return scord::detail::transfer_update(scord::server{server}, transfer_id, + obtained_bw); } diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 2316f3e6..36d937d5 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -532,8 +532,7 @@ transfer_datasets(const server& srv, const job& job, scord::error_code -transfer_update(const server& srv, scord::transfer transfer, - float obtained_bw) { +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { network::client rpc_client{srv.protocol()}; @@ -543,9 +542,10 @@ transfer_update(const server& srv, scord::transfer transfer, lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc {:<} body: {{transfer_id: {}}}", rpc, transfer.id()); + LOGGER_INFO("rpc {:<} body: {{transfer_id: {}}}", rpc, transfer_id); - if(const auto& call_rv = endp.call(rpc.name(), transfer, obtained_bw); + if(const auto& call_rv = + endp.call(rpc.name(), transfer_id, obtained_bw); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 6e5deacc..d26575c1 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -87,7 +87,7 @@ transfer_datasets(const server& srv, const job& job, transfer::mapping mapping); scord::error_code -transfer_update(const server& srv, scord::transfer transfer, float obtained_bw); +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); } // namespace scord::detail diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index ae5e20e8..4b341d64 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -379,10 +379,9 @@ transfer_datasets(const server& srv, const job& job, void -transfer_update(const server& srv, scord::transfer transfer, - float obtained_bw) { +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { - const auto ec = detail::transfer_update(srv, transfer, obtained_bw); + const auto ec = detail::transfer_update(srv, transfer_id, obtained_bw); if(!ec) { throw std::runtime_error( diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index da410dbb..d3c75164 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -262,7 +262,7 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, * successfully or not. */ ADM_return_t -ADM_transfer_update(ADM_server_t server, ADM_transfer_t transfer, +ADM_transfer_update(ADM_server_t server, uint64_t transfer_id, float obtained_bw); /** diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index f1dfaf9f..045bdf28 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -99,7 +99,7 @@ transfer_datasets(const server& srv, const job& job, transfer::mapping mapping); void -transfer_update(const server& srv, scord::transfer transfer, float obtained_bw); +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index b523fed5..ef9e0bf4 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -65,6 +65,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); provider::define(EXPAND(transfer_datasets)); + provider::define(EXPAND(transfer_update)); #undef EXPAND } @@ -773,8 +774,8 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, void -rpc_server::transfer_update(const network::request& req, - scord::transfer transfer, float obtained_bw) { +rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, + float obtained_bw) { using network::get_address; using network::response_with_id; @@ -783,22 +784,22 @@ rpc_server::transfer_update(const network::request& req, const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, - transfer.id(), obtained_bw); + transfer_id, obtained_bw); scord::error_code ec; // TODO: generate a global ID for the transfer and contact Cargo to // actually request it - const auto resp = response_with_id{rpc.id(), ec, transfer.id()}; + const auto resp = response_with_id{rpc.id(), ec, transfer_id}; LOGGER_INFO("rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, ec, - transfer.id()); + transfer_id); // TODO: create a transfer in transfer manager // We need the contact point, and different qos - ec = m_transfer_manager.update(transfer.id(), obtained_bw); + ec = m_transfer_manager.update(transfer_id, obtained_bw); if(ec.no_such_entity) { LOGGER_ERROR( "rpc id: {} error_msg: \"Error updating transfer_storage\"", diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 727917d5..ef6b066c 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -106,7 +106,7 @@ private: enum scord::transfer::mapping mapping); void - transfer_update(const network::request& req, scord::transfer transfer, + transfer_update(const network::request& req, uint64_t transfer_id, float obtained_bw); job_manager m_job_manager; -- GitLab From e795d1a55603027ab336acdb492e1a68b49d7db0 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 26 Jul 2023 09:54:30 +0200 Subject: [PATCH 5/7] Updated tests --- examples/cxx/ADM_transfer_update.cpp | 2 +- src/lib/detail/impl.cpp | 3 ++- src/scord/rpc_server.cpp | 3 +-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index d92c0af0..13c49a4e 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -78,7 +78,7 @@ main(int argc, char* argv[]) { scord::transfer_update(server, transfer.id(), 10.0f); - fmt::print(stdout, "ADM_transfer_update remote procedure completed " + fmt::print(stdout, "ADM_transfer_update() remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); } catch(const std::exception& e) { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 36d937d5..cc37fb09 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -542,7 +542,8 @@ transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc {:<} body: {{transfer_id: {}}}", rpc, transfer_id); + LOGGER_INFO("rpc {:<} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, + transfer_id, obtained_bw); if(const auto& call_rv = endp.call(rpc.name(), transfer_id, obtained_bw); diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index ef9e0bf4..4d61e01e 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -793,8 +793,7 @@ rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, const auto resp = response_with_id{rpc.id(), ec, transfer_id}; - LOGGER_INFO("rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, ec, - transfer_id); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); // TODO: create a transfer in transfer manager // We need the contact point, and different qos -- GitLab From 1fbeabe59d0330f33f90ddee78e293d4d4ef62cc Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 13 Nov 2023 20:30:01 +0100 Subject: [PATCH 6/7] Merge and update fmt - spdlog - catch2 versions --- CMakeLists.txt | 6 +-- src/common/net/server.cpp | 1 + src/common/net/utilities.hpp | 16 +++---- src/lib/scord/types.hpp | 78 +++++++++++++++++----------------- src/lib/types.cpp | 2 +- src/scord-ctl/config_file.cpp | 9 ++-- src/scord-ctl/rpc_server.cpp | 20 ++++----- src/scord/internal_types.cpp | 35 --------------- src/scord/rpc_server.cpp | 33 +++++--------- src/scord/transfer_manager.hpp | 2 +- 10 files changed, 79 insertions(+), 123 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 622c1c16..f2b5c05e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -213,7 +213,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building {fmt}") FetchContent_Declare( fmt GIT_REPOSITORY https://github.com/fmtlib/fmt - GIT_TAG d141cdbeb0fb422a3fb7173b285fd38e0d1772dc # v8.0.1 + GIT_TAG a33701196adfad74917046096bf5a2aa0ab0bb50 # v9.1.0 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -226,7 +226,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building spdlog") FetchContent_Declare( spdlog GIT_REPOSITORY https://github.com/gabime/spdlog - GIT_TAG eb3220622e73a4889eee355ffa37972b3cac3df5 # v1.9.2 + GIT_TAG 7e635fca68d014934b4af8a1cf874f63989352b7 # v1.12.0 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -280,7 +280,7 @@ if (SCORD_BUILD_TESTS) FetchContent_Declare( Catch2 GIT_REPOSITORY https://github.com/catchorg/Catch2.git - GIT_TAG 605a34765aa5d5ecbf476b4598a862ada971b0cc # v3.0.1 + GIT_TAG 6e79e682b726f524310d55dec8ddac4e9c52fb5f # v3.4.0 GIT_SHALLOW ON GIT_PROGRESS ON ) diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp index 30723ada..464a02ea 100644 --- a/src/common/net/server.cpp +++ b/src/common/net/server.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #ifdef SCORD_DEBUG_BUILD diff --git a/src/common/net/utilities.hpp b/src/common/net/utilities.hpp index cc1dda42..374405a1 100644 --- a/src/common/net/utilities.hpp +++ b/src/common/net/utilities.hpp @@ -120,15 +120,15 @@ struct fmt::formatter { } template - constexpr auto - format(const network::rpc_info& rpc, FormatContext& ctx) const { - format_to(ctx.out(), "{}{} id: {} name: {} ", m_outbound ? "<=" : "=>", + auto + format(const network::rpc_info& rpc, FormatContext& ctx) const -> format_context::iterator { + format_to(ctx.out(), "{}{} id: {} name: {:?} ", m_outbound ? "<=" : "=>", rpc.pid() ? fmt::format(" pid: {}", *rpc.pid()) : "", - rpc.id(), std::quoted(rpc.name())); - return m_outbound ? format_to(ctx.out(), "to: {}", - std::quoted(rpc.address())) - : format_to(ctx.out(), "from: {}", - std::quoted(rpc.address())); + rpc.id(), rpc.name()); + return m_outbound ? format_to(ctx.out(), "to: {:?}", + rpc.address()) + : format_to(ctx.out(), "from: {:?}", + rpc.address()); } }; diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index d6e771d1..6ce72e1e 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -29,7 +29,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -712,7 +714,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::error_code& ec, FormatContext& ctx) const { + format(const scord::error_code& ec, FormatContext& ctx) const -> format_context::iterator { return formatter::format(ec.name(), ctx); } }; @@ -722,7 +724,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job_info& ji, FormatContext& ctx) const { + format(const scord::job_info& ji, FormatContext& ctx) const -> format_context::iterator { return format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}", ji.adhoc_controller_address(), ji.io_procs()); } @@ -733,7 +735,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job& j, FormatContext& ctx) const { + format(const scord::job& j, FormatContext& ctx) const -> format_context::iterator { return formatter::format( fmt::format("{{id: {}, slurm_id: {}}}", j.id(), j.slurm_id()), ctx); @@ -745,8 +747,8 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::dataset& d, FormatContext& ctx) const { - const auto str = fmt::format("{{id: {}}}", std::quoted(d.id())); + format(const scord::dataset& d, FormatContext& ctx) const -> format_context::iterator { + const auto str = fmt::format("{{id: {:?}}}", d.id()); return formatter::format(str, ctx); } }; @@ -757,7 +759,7 @@ struct fmt::formatter> // parse is inherited from formatter. template auto - format(const std::vector& v, FormatContext& ctx) const { + format(const std::vector& v, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(v, ", ")); return formatter::format(str, ctx); } @@ -768,7 +770,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::dataset_route& r, FormatContext& ctx) const { + format(const scord::dataset_route& r, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{src: {}, dst: {}}}", r.source(), r.destination()); return formatter::format(str, ctx); @@ -782,7 +784,7 @@ struct fmt::formatter> template auto format(const std::vector& v, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(v, ", ")); return formatter::format(str, ctx); } @@ -793,7 +795,7 @@ struct fmt::formatter : fmt::formatter { // parse is inherited from formatter. template auto - format(const scord::node::type& t, FormatContext& ctx) const { + format(const scord::node::type& t, FormatContext& ctx) const -> format_context::iterator { using scord::node; std::string_view name = "unknown"; @@ -817,9 +819,9 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::node& n, FormatContext& ctx) const { - const auto str = fmt::format("{{hostname: {}, type: {}}}", - std::quoted(n.hostname()), n.get_type()); + format(const scord::node& n, FormatContext& ctx) const -> format_context::iterator { + const auto str = fmt::format("{{hostname: {:?}, type: {}}}", + n.hostname(), n.get_type()); return formatter::format(str, ctx); } }; @@ -830,7 +832,7 @@ struct fmt::formatter> // parse is inherited from formatter. template auto - format(const std::vector& v, FormatContext& ctx) const { + format(const std::vector& v, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(v, ", ")); return formatter::format(str, ctx); } @@ -841,7 +843,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::transfer::mapping& m, FormatContext& ctx) const { + format(const scord::transfer::mapping& m, FormatContext& ctx) const -> format_context::iterator { using mapping = scord::transfer::mapping; @@ -868,7 +870,7 @@ struct fmt::formatter : fmt::formatter { // parse is inherited from formatter. template auto - format(const scord::transfer& tx, FormatContext& ctx) const { + format(const scord::transfer& tx, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{id: {}}}", tx.id()); return formatter::format(str, ctx); } @@ -897,7 +899,7 @@ struct fmt::formatter { template auto - format(const enum scord::adhoc_storage::type& t, FormatContext& ctx) const { + format(const enum scord::adhoc_storage::type& t, FormatContext& ctx) const -> format_context::iterator { using scord::adhoc_storage; std::string_view name = "unknown"; @@ -932,7 +934,7 @@ struct fmt::formatter template auto format(const scord::adhoc_storage::execution_mode& exec_mode, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { using execution_mode = scord::adhoc_storage::execution_mode; @@ -964,7 +966,7 @@ struct fmt::formatter template auto format(const scord::adhoc_storage::access_type& type, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { using access_type = scord::adhoc_storage::access_type; @@ -991,13 +993,13 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const { + format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator { return format_to( ctx.out(), - "{{controller: {}, data_stager: {}, execution_mode: {}, " + "{{controller: {:?}, data_stager: {:?}, execution_mode: {}, " "access_type: {}, walltime: {}, should_flush: {}}}", - std::quoted(c.controller_address()), - std::quoted(c.data_stager_address()), c.exec_mode(), + c.controller_address(), + c.data_stager_address(), c.exec_mode(), c.access_type(), c.walltime(), c.should_flush()); } }; @@ -1008,7 +1010,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const std::nullopt_t& /*t*/, FormatContext& ctx) const { + format(const std::nullopt_t& /*t*/, FormatContext& ctx) const -> format_context::iterator { return formatter::format("none", ctx); } }; @@ -1019,7 +1021,7 @@ struct fmt::formatter> : formatter { // parse is inherited from formatter. template auto - format(const std::optional& v, FormatContext& ctx) const { + format(const std::optional& v, FormatContext& ctx) const -> format_context::iterator { return formatter::format( v ? fmt::format("{}", v.value()) : "none", ctx); } @@ -1030,10 +1032,10 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::adhoc_storage& s, FormatContext& ctx) const { + format(const scord::adhoc_storage& s, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format( - "{{type: {}, id: {}, name: {}, context: {}}}", s.type(), s.id(), - std::quoted(s.name()), s.context()); + "{{type: {}, id: {}, name: {:?}, context: {}}}", s.type(), s.id(), + s.name(), s.context()); return formatter::format(str, ctx); } }; @@ -1044,7 +1046,7 @@ struct fmt::formatter // parse is inherited from formatter. template auto - format(const scord::adhoc_storage::resources& r, FormatContext& ctx) const { + format(const scord::adhoc_storage::resources& r, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{nodes: {}}}", r.nodes()); @@ -1058,7 +1060,7 @@ struct fmt::formatter // parse is inherited from formatter. template auto - format(const enum scord::pfs_storage::type& t, FormatContext& ctx) const { + format(const enum scord::pfs_storage::type& t, FormatContext& ctx) const -> format_context::iterator { using scord::pfs_storage; std::string_view name = "unknown"; @@ -1081,7 +1083,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::pfs_storage::ctx& c, FormatContext& ctx) const { + format(const scord::pfs_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{mount_point: {}}}", c.mount_point()); return formatter::format(str, ctx); } @@ -1092,7 +1094,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::pfs_storage& s, FormatContext& ctx) const { + format(const scord::pfs_storage& s, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{context: {}}}", s.context()); return formatter::format(str, ctx); } @@ -1103,7 +1105,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job::resources& r, FormatContext& ctx) const { + format(const scord::job::resources& r, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{nodes: {}}}", r.nodes()); return formatter::format(str, ctx); } @@ -1114,7 +1116,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job::requirements& r, FormatContext& ctx) const { + format(const scord::job::requirements& r, FormatContext& ctx) const -> format_context::iterator { return formatter::format( fmt::format("{{inputs: {}, outputs: {}, " "expected_outputs: {}, adhoc_storage: {}}}", @@ -1129,7 +1131,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::qos::scope& s, FormatContext& ctx) const { + format(const scord::qos::scope& s, FormatContext& ctx) const -> format_context::iterator { using scope = scord::qos::scope; @@ -1161,7 +1163,7 @@ struct fmt::formatter> template auto format(const std::optional& e, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { if(!e) { return formatter::format("none", ctx); @@ -1195,7 +1197,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::qos::subclass& sc, FormatContext& ctx) const { + format(const scord::qos::subclass& sc, FormatContext& ctx) const -> format_context::iterator { using subclass = scord::qos::subclass; @@ -1219,7 +1221,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::qos::limit& l, FormatContext& ctx) const { + format(const scord::qos::limit& l, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{entity: {}, subclass: {}, value: {}}}", l.entity(), l.subclass(), l.value()); return formatter::format(str, ctx); @@ -1232,7 +1234,7 @@ struct fmt::formatter> // parse is inherited from formatter. template auto - format(const std::vector& l, FormatContext& ctx) const { + format(const std::vector& l, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(l, ", ")); return formatter::format(str, ctx); } diff --git a/src/lib/types.cpp b/src/lib/types.cpp index e51c2a08..a889d336 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -1150,7 +1150,7 @@ private: return scord::transfer(entity->e_transfer); default: throw std::runtime_error(fmt::format( - "Unexpected scope value: {}", entity->e_scope)); + "Unexpected scope value: {}", (int)entity->e_scope)); } } diff --git a/src/scord-ctl/config_file.cpp b/src/scord-ctl/config_file.cpp index bb53f653..a28b0f5a 100644 --- a/src/scord-ctl/config_file.cpp +++ b/src/scord-ctl/config_file.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include "config_file.hpp" @@ -87,7 +88,7 @@ to_adhoc_storage_type(const ryml::csubstr& type) { throw std::runtime_error{ fmt::format("Unsupported adhoc storage type '{}' in " "configuration file", - type)}; + type.data())}; } return valid_types.at(type); @@ -155,7 +156,7 @@ parse_command_node(const ryml::ConstNodeRef& node) { ::validate_command(cmdline); } else { fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n", - child.key()); + child.key().data()); } } @@ -221,7 +222,7 @@ parse_adhoc_config_node(const ryml::ConstNodeRef& node) { shrink_command = ::parse_command_node(child); } else { fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n", - child.key()); + child.key().data()); } } @@ -297,7 +298,7 @@ parse_config_node(const ryml::ConstNodeRef& node) { adhoc_configs = ::parse_adhoc_storage_node(child); } else { fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n", - child.key()); + child.key().data()); } } diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index edff7198..13e6a938 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -70,12 +70,12 @@ rpc_server::print_configuration() const { if(const auto& env = command.env(); env.has_value()) { for(const auto& [k, v] : *env) { - LOGGER_INFO(" - {} = {}", k, std::quoted(v)); + LOGGER_INFO(" - {} = {:?}", k, (v)); } } LOGGER_INFO(" - command:"); - LOGGER_INFO(" {}", std::quoted(command.cmdline())); + LOGGER_INFO(" {:?}", (command.cmdline())); }; LOGGER_INFO(" - adhoc storage configurations:"); @@ -126,8 +126,8 @@ rpc_server::deploy_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc, + (adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; @@ -211,8 +211,8 @@ rpc_server::expand_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc, + (adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; @@ -272,8 +272,8 @@ rpc_server::shrink_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc, + (adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; @@ -331,8 +331,8 @@ rpc_server::terminate_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}}}", rpc, (adhoc_uuid), + adhoc_type); auto ec = scord::error_code::success; diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index 22f4e47f..63f6ccc7 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -132,39 +132,4 @@ pfs_storage_metadata::update(scord::pfs_storage::ctx pfs_context) { } -transfer_info::transfer_info(scord::transfer transfer, - std::vector qos, - std::string contact_point, float obtained_bw) - : m_transfer(transfer), m_qos(qos), - m_contact_point(std::move(contact_point)), m_obtained_bw(obtained_bw) {} - -void -transfer_info::update(float obtained_bw) { - m_obtained_bw = obtained_bw; -} - -scord::transfer -transfer_info::transfer() const { - return m_transfer; -} - -std::string const& -transfer_info::contact_point() const { - return m_contact_point; -} - -std::vector -transfer_info::qos() const { - return m_qos; -} - -float -transfer_info::obtained_bw() const { - return m_obtained_bw; -} - -void -transfer_info::obtained_bw(float const obtained_bw) { - m_obtained_bw = obtained_bw; -} } // namespace scord::internal diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 4d61e01e..8ec74eb6 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -275,9 +275,9 @@ rpc_server::register_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, adhoc_ctx: {}, " + LOGGER_INFO("rpc {:>} body: {{name: {:?}, type: {}, adhoc_ctx: {}, " "adhoc_resources: {}}}", - rpc, std::quoted(name), type, ctx, resources); + rpc, name, type, ctx, resources); scord::error_code ec; std::optional adhoc_id; @@ -371,8 +371,8 @@ rpc_server::update_adhoc_storage( const auto child_rpc = rpc_info::create( name, adhoc_storage.context().controller_address()); - LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", - child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), + LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), adhoc_storage.get_resources()); if(const auto call_rv = endp->call( @@ -468,8 +468,8 @@ rpc_server::deploy_adhoc_storage(const network::request& req, const auto child_rpc = rpc.add_child(adhoc_storage.context().controller_address()); - LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", - child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), + LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), adhoc_storage.get_resources()); if(const auto call_rv = endp->call( @@ -547,8 +547,8 @@ rpc_server::terminate_adhoc_storage(const network::request& req, const auto child_rpc = rpc.add_child(adhoc_storage.context().controller_address()); - LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc, - std::quoted(adhoc_metadata_ptr->uuid()), + LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}}}", child_rpc, + adhoc_metadata_ptr->uuid(), adhoc_storage.type()); if(const auto call_rv = @@ -595,8 +595,8 @@ rpc_server::register_pfs_storage(const network::request& req, const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, - std::quoted(name), type, ctx); + LOGGER_INFO("rpc {:>} body: {{name: {:?}, type: {}, pfs_ctx: {}}}", rpc, + name, type, ctx); scord::error_code ec; std::optional pfs_id = 0; @@ -756,19 +756,6 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, resp.error_code(), resp.value_or_none()); - // TODO: create a transfer in transfer manager - // We need the contact point, and different qos - - if(const auto transfer_result = - m_transfer_manager.create(tx_id.value(), contact_point, limits); - !transfer_result.has_value()) { - LOGGER_ERROR( - "rpc id: {} error_msg: \"Error creating transfer_storage: {}\"", - rpc.id(), transfer_result.error()); - ec = transfer_result.error(); - } - - req.respond(resp); } diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp index 82921b60..6abb74f2 100644 --- a/src/scord/transfer_manager.hpp +++ b/src/scord/transfer_manager.hpp @@ -49,7 +49,7 @@ struct transfer_manager { abt::unique_lock lock(m_transfer_mutex); - if(const auto it = m_transfer.find(tx_id); it == m_transfer.end()) { + if(const auto it = m_transfer.find(id); it == m_transfer.end()) { const auto& [it_transfer, inserted] = m_transfer.emplace( id, std::make_shared< internal::transfer_metadata>( -- GitLab From 924e62f14f69efd0cb834f6e12681bc86856445f Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 14 Nov 2023 17:07:50 +0100 Subject: [PATCH 7/7] Updated transfer_update test, spdlog linking, fmt nspace --- examples/c/CMakeLists.txt | 2 +- examples/cxx/ADM_transfer_update.cpp | 16 +++++++++++----- examples/cxx/CMakeLists.txt | 2 +- src/common/net/utilities.hpp | 6 +++--- src/lib/scord/types.hpp | 6 +++--- src/scord-ctl/config_file.cpp | 2 ++ 6 files changed, 21 insertions(+), 13 deletions(-) diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 070a5713..1be1d938 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -52,7 +52,7 @@ list(APPEND c_examples_without_controller add_library(c_examples_common STATIC) target_sources(c_examples_common PUBLIC common.h PRIVATE common.c) -target_link_libraries(c_examples_common libscord_c_types) +target_link_libraries(c_examples_common libscord_c_types spdlog::spdlog fmt::fmt) foreach(example IN LISTS c_examples_with_controller c_examples_without_controller) add_executable(${example}_c) diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index 13c49a4e..060633b1 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -41,6 +41,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -49,8 +50,10 @@ main(int argc, char* argv[]) { const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); - const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); - const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS); + const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS); + const auto expected_outputs = + prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); @@ -60,8 +63,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { @@ -69,14 +75,14 @@ main(int argc, char* argv[]) { server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); - scord::job::requirements reqs(inputs, outputs, adhoc_storage); + scord::job::requirements reqs(inputs, outputs, expected_outputs, + adhoc_storage); const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); const auto transfer = scord::transfer_datasets( server, job, sources, targets, qos_limits, mapping); - scord::transfer_update(server, transfer.id(), 10.0f); fmt::print(stdout, "ADM_transfer_update() remote procedure completed " "successfully\n"); diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 38cdeac4..67bd704d 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -56,7 +56,7 @@ foreach(example IN LISTS cxx_examples_with_controller cxx_examples_without_contr add_executable(${example}_cxx) target_sources(${example}_cxx PRIVATE ${example}.cpp) target_link_libraries(${example}_cxx - PUBLIC fmt::fmt libscord cxx_examples_common) + PUBLIC fmt::fmt spdlog::spdlog libscord cxx_examples_common) set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example}) endforeach() diff --git a/src/common/net/utilities.hpp b/src/common/net/utilities.hpp index 374405a1..b45c2121 100644 --- a/src/common/net/utilities.hpp +++ b/src/common/net/utilities.hpp @@ -122,12 +122,12 @@ struct fmt::formatter { template auto format(const network::rpc_info& rpc, FormatContext& ctx) const -> format_context::iterator { - format_to(ctx.out(), "{}{} id: {} name: {:?} ", m_outbound ? "<=" : "=>", + fmt::format_to(ctx.out(), "{}{} id: {} name: {:?} ", m_outbound ? "<=" : "=>", rpc.pid() ? fmt::format(" pid: {}", *rpc.pid()) : "", rpc.id(), rpc.name()); - return m_outbound ? format_to(ctx.out(), "to: {:?}", + return m_outbound ? fmt::format_to(ctx.out(), "to: {:?}", rpc.address()) - : format_to(ctx.out(), "from: {:?}", + : fmt::format_to(ctx.out(), "from: {:?}", rpc.address()); } }; diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 6ce72e1e..ec767ccb 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -725,7 +725,7 @@ struct fmt::formatter : formatter { template auto format(const scord::job_info& ji, FormatContext& ctx) const -> format_context::iterator { - return format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}", + return fmt::format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}", ji.adhoc_controller_address(), ji.io_procs()); } }; @@ -923,7 +923,7 @@ struct fmt::formatter { break; } - return format_to(ctx.out(), "{}", name); + return fmt::format_to(ctx.out(), "{}", name); } }; @@ -994,7 +994,7 @@ struct fmt::formatter : formatter { template auto format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator { - return format_to( + return fmt::format_to( ctx.out(), "{{controller: {:?}, data_stager: {:?}, execution_mode: {}, " "access_type: {}, walltime: {}, should_flush: {}}}", diff --git a/src/scord-ctl/config_file.cpp b/src/scord-ctl/config_file.cpp index a28b0f5a..2cbb3b86 100644 --- a/src/scord-ctl/config_file.cpp +++ b/src/scord-ctl/config_file.cpp @@ -24,6 +24,8 @@ #include #include +#include +#include #include #include #include -- GitLab