diff --git a/CMakeLists.txt b/CMakeLists.txt index 622c1c165b2b729c2ff805db6b219e749c1d939c..f2b5c05ee0fba3c79a6e711314e5705af1161a92 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -213,7 +213,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building {fmt}") FetchContent_Declare( fmt GIT_REPOSITORY https://github.com/fmtlib/fmt - GIT_TAG d141cdbeb0fb422a3fb7173b285fd38e0d1772dc # v8.0.1 + GIT_TAG a33701196adfad74917046096bf5a2aa0ab0bb50 # v9.1.0 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -226,7 +226,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building spdlog") FetchContent_Declare( spdlog GIT_REPOSITORY https://github.com/gabime/spdlog - GIT_TAG eb3220622e73a4889eee355ffa37972b3cac3df5 # v1.9.2 + GIT_TAG 7e635fca68d014934b4af8a1cf874f63989352b7 # v1.12.0 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -280,7 +280,7 @@ if (SCORD_BUILD_TESTS) FetchContent_Declare( Catch2 GIT_REPOSITORY https://github.com/catchorg/Catch2.git - GIT_TAG 605a34765aa5d5ecbf476b4598a862ada971b0cc # v3.0.1 + GIT_TAG 6e79e682b726f524310d55dec8ddac4e9c52fb5f # v3.4.0 GIT_SHALLOW ON GIT_PROGRESS ON ) diff --git a/ci/check_rpcs.py b/ci/check_rpcs.py index 444bf155f48fbb659c57a9316a76eb74e71ae6ac..4ab96fb9e5d4bb36ef8f028d3da32020c8b6164f 100755 --- a/ci/check_rpcs.py +++ b/ci/check_rpcs.py @@ -19,7 +19,7 @@ RPC_NAMES = { 'ADM_remove_pfs_storage', 'ADM_transfer_datasets', 'ADM_get_transfer_priority', 'ADM_set_transfer_priority', 'ADM_cancel_transfer', - 'ADM_get_pending_transfers', + 'ADM_get_pending_transfers', 'ADM_transfer_update', 'ADM_set_qos_constraints', 'ADM_get_qos_constraints', 'ADM_define_data_operation', 'ADM_connect_data_operation', 'ADM_finalize_data_operation', diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 070a57133acdc04b2eeb69c469a7eefc9c2ed398..1be1d9388bd74c9ae644e3d927c34779e459851f 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -52,7 +52,7 @@ list(APPEND c_examples_without_controller add_library(c_examples_common STATIC) target_sources(c_examples_common PUBLIC common.h PRIVATE common.c) -target_link_libraries(c_examples_common libscord_c_types) +target_link_libraries(c_examples_common libscord_c_types spdlog::spdlog fmt::fmt) foreach(example IN LISTS c_examples_with_controller c_examples_without_controller) add_executable(${example}_c) diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp new file mode 100644 index 0000000000000000000000000000000000000000..060633b1ba345c3f6885ef05ea5e37a60fcb21ed --- /dev/null +++ b/examples/cxx/ADM_transfer_update.cpp @@ -0,0 +1,95 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "common.hpp" + +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 4 + +int +main(int argc, char* argv[]) { + + test_info test_info{ + .name = TESTNAME, + .requires_server = true, + .requires_controller = true, + .requires_data_stager = true, + }; + + const auto cli_args = process_args(argc, argv, test_info); + + scord::server server{"tcp", cli_args.server_address}; + + const auto job_nodes = prepare_nodes(NJOB_NODES); + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); + const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS); + const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS); + const auto expected_outputs = + prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); + + const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); + const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); + const auto qos_limits = prepare_qos_limits(NLIMITS); + const auto mapping = scord::transfer::mapping::n_to_n; + + std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ + cli_args.controller_address, + cli_args.data_stager_address, + scord::adhoc_storage::execution_mode::separate_new, + scord::adhoc_storage::access_type::read_write, + 100, + false}; + const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; + + try { + const auto adhoc_storage = scord::register_adhoc_storage( + server, name, scord::adhoc_storage::type::gekkofs, + adhoc_storage_ctx, adhoc_resources); + + scord::job::requirements reqs(inputs, outputs, expected_outputs, + adhoc_storage); + + const auto job = scord::register_job( + server, scord::job::resources{job_nodes}, reqs, 0); + const auto transfer = scord::transfer_datasets( + server, job, sources, targets, qos_limits, mapping); + + scord::transfer_update(server, transfer.id(), 10.0f); + fmt::print(stdout, "ADM_transfer_update() remote procedure completed " + "successfully\n"); + exit(EXIT_SUCCESS); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_transfer_update() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } +} diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 86580f1f903e150e16b8ff55e332fe26a85879fd..67bd704db4d63d0eb59a569bd5809aafb66b3b0f 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority - ADM_cancel_transfer ADM_get_pending_transfers + ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update # qos ADM_set_qos_constraints ADM_get_qos_constraints # data operations @@ -56,7 +56,7 @@ foreach(example IN LISTS cxx_examples_with_controller cxx_examples_without_contr add_executable(${example}_cxx) target_sources(${example}_cxx PRIVATE ${example}.cpp) target_link_libraries(${example}_cxx - PUBLIC fmt::fmt libscord cxx_examples_common) + PUBLIC fmt::fmt spdlog::spdlog libscord cxx_examples_common) set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example}) endforeach() diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp index 30723ada3bedafa02a05c01d0a6253a9edc3aa4e..464a02ea2abf97a3ad3309343944b94d34d4c592 100644 --- a/src/common/net/server.cpp +++ b/src/common/net/server.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #ifdef SCORD_DEBUG_BUILD diff --git a/src/common/net/utilities.hpp b/src/common/net/utilities.hpp index cc1dda421a4df64194e88428fdfa9f204f11fa9a..b45c21213d932de18ad5b7dcad4e01c3fb9ce485 100644 --- a/src/common/net/utilities.hpp +++ b/src/common/net/utilities.hpp @@ -120,15 +120,15 @@ struct fmt::formatter { } template - constexpr auto - format(const network::rpc_info& rpc, FormatContext& ctx) const { - format_to(ctx.out(), "{}{} id: {} name: {} ", m_outbound ? "<=" : "=>", + auto + format(const network::rpc_info& rpc, FormatContext& ctx) const -> format_context::iterator { + fmt::format_to(ctx.out(), "{}{} id: {} name: {:?} ", m_outbound ? "<=" : "=>", rpc.pid() ? fmt::format(" pid: {}", *rpc.pid()) : "", - rpc.id(), std::quoted(rpc.name())); - return m_outbound ? format_to(ctx.out(), "to: {}", - std::quoted(rpc.address())) - : format_to(ctx.out(), "from: {}", - std::quoted(rpc.address())); + rpc.id(), rpc.name()); + return m_outbound ? fmt::format_to(ctx.out(), "to: {:?}", + rpc.address()) + : fmt::format_to(ctx.out(), "from: {:?}", + rpc.address()); } }; diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 1b9f797ae3586f46ea313a8af49bf576c3b99c1d..71f03dc54e591aa4dbcf1706262a2ad6fa6942ab 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -245,6 +245,15 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, return ADM_SUCCESS; } +ADM_return_t +ADM_transfer_update(ADM_server_t server, uint64_t transfer_id, + float obtained_bw) { + + return scord::detail::transfer_update(scord::server{server}, transfer_id, + obtained_bw); +} + + ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 17c8c2c5e5b20e1d33e6387c68a316687b9b1708..cc37fb09fc60a6301e8aad218aae8c532196ecb9 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -530,4 +530,37 @@ transfer_datasets(const server& srv, const job& job, return tl::make_unexpected(scord::error_code::other); } + +scord::error_code +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { + + network::client rpc_client{srv.protocol()}; + + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); + + if(const auto& lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, + transfer_id, obtained_bw); + + if(const auto& call_rv = + endp.call(rpc.name(), transfer_id, obtained_bw); + call_rv.has_value()) { + + const network::generic_response resp{call_rv.value()}; + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); + + return resp.error_code(); + } + } + + LOGGER_ERROR("rpc call failed"); + return scord::error_code::other; +} + } // namespace scord::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 31529bcea64791e04c85b76fea48bc1ddddcdfb2..d26575c1800f758e0cfac52d38614b0a47f65034 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +scord::error_code +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); + + } // namespace scord::detail #endif // SCORD_ADMIRE_IMPL_HPP diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 8686636971f809ee875ef60954d6366d7ee14f23..4b341d640d5f2859839a4e98de855c01c419a5de 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -377,6 +377,19 @@ transfer_datasets(const server& srv, const job& job, return rv.value(); } + +void +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { + + const auto ec = detail::transfer_update(srv, transfer_id, obtained_bw); + + if(!ec) { + throw std::runtime_error( + fmt::format("ADM_transfer_update() error: {}", ec.message())); + } +} + + ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index dc6527af60341e56184483024fa80fd22c8f7a1c..d3c75164701b2a148945580707a885362e053579 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -251,6 +251,19 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, ADM_qos_limit_t limits[], size_t limits_len, ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer); +/** + * Sets the obtained bw for the transfer operation + * + * @param[in] server The server to which the request is directed + * @param[in] transfer An ADM_TRANSFER identifying the originating transfer. + * @param[in] obtained_bw a float indicating the obtained bandwidth + * + * @return Returns if the remote procedure has been completed + * successfully or not. + */ +ADM_return_t +ADM_transfer_update(ADM_server_t server, uint64_t transfer_id, + float obtained_bw); /** * Sets information for the dataset identified by resource_id. diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index 469d7d41484394e7169f3e52eb8e93e7c9639def..045bdf282e616ee71485958243cf75ff6a27ca10 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -98,6 +98,9 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); +void +transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); + ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info); diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index d6e771d17ac219f47669b412794ee345db955370..ec767ccb30acb09c5c2207ca993feb95ab81d693 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -29,7 +29,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -712,7 +714,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::error_code& ec, FormatContext& ctx) const { + format(const scord::error_code& ec, FormatContext& ctx) const -> format_context::iterator { return formatter::format(ec.name(), ctx); } }; @@ -722,8 +724,8 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job_info& ji, FormatContext& ctx) const { - return format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}", + format(const scord::job_info& ji, FormatContext& ctx) const -> format_context::iterator { + return fmt::format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}", ji.adhoc_controller_address(), ji.io_procs()); } }; @@ -733,7 +735,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job& j, FormatContext& ctx) const { + format(const scord::job& j, FormatContext& ctx) const -> format_context::iterator { return formatter::format( fmt::format("{{id: {}, slurm_id: {}}}", j.id(), j.slurm_id()), ctx); @@ -745,8 +747,8 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::dataset& d, FormatContext& ctx) const { - const auto str = fmt::format("{{id: {}}}", std::quoted(d.id())); + format(const scord::dataset& d, FormatContext& ctx) const -> format_context::iterator { + const auto str = fmt::format("{{id: {:?}}}", d.id()); return formatter::format(str, ctx); } }; @@ -757,7 +759,7 @@ struct fmt::formatter> // parse is inherited from formatter. template auto - format(const std::vector& v, FormatContext& ctx) const { + format(const std::vector& v, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(v, ", ")); return formatter::format(str, ctx); } @@ -768,7 +770,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::dataset_route& r, FormatContext& ctx) const { + format(const scord::dataset_route& r, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{src: {}, dst: {}}}", r.source(), r.destination()); return formatter::format(str, ctx); @@ -782,7 +784,7 @@ struct fmt::formatter> template auto format(const std::vector& v, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(v, ", ")); return formatter::format(str, ctx); } @@ -793,7 +795,7 @@ struct fmt::formatter : fmt::formatter { // parse is inherited from formatter. template auto - format(const scord::node::type& t, FormatContext& ctx) const { + format(const scord::node::type& t, FormatContext& ctx) const -> format_context::iterator { using scord::node; std::string_view name = "unknown"; @@ -817,9 +819,9 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::node& n, FormatContext& ctx) const { - const auto str = fmt::format("{{hostname: {}, type: {}}}", - std::quoted(n.hostname()), n.get_type()); + format(const scord::node& n, FormatContext& ctx) const -> format_context::iterator { + const auto str = fmt::format("{{hostname: {:?}, type: {}}}", + n.hostname(), n.get_type()); return formatter::format(str, ctx); } }; @@ -830,7 +832,7 @@ struct fmt::formatter> // parse is inherited from formatter. template auto - format(const std::vector& v, FormatContext& ctx) const { + format(const std::vector& v, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(v, ", ")); return formatter::format(str, ctx); } @@ -841,7 +843,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::transfer::mapping& m, FormatContext& ctx) const { + format(const scord::transfer::mapping& m, FormatContext& ctx) const -> format_context::iterator { using mapping = scord::transfer::mapping; @@ -868,7 +870,7 @@ struct fmt::formatter : fmt::formatter { // parse is inherited from formatter. template auto - format(const scord::transfer& tx, FormatContext& ctx) const { + format(const scord::transfer& tx, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{id: {}}}", tx.id()); return formatter::format(str, ctx); } @@ -897,7 +899,7 @@ struct fmt::formatter { template auto - format(const enum scord::adhoc_storage::type& t, FormatContext& ctx) const { + format(const enum scord::adhoc_storage::type& t, FormatContext& ctx) const -> format_context::iterator { using scord::adhoc_storage; std::string_view name = "unknown"; @@ -921,7 +923,7 @@ struct fmt::formatter { break; } - return format_to(ctx.out(), "{}", name); + return fmt::format_to(ctx.out(), "{}", name); } }; @@ -932,7 +934,7 @@ struct fmt::formatter template auto format(const scord::adhoc_storage::execution_mode& exec_mode, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { using execution_mode = scord::adhoc_storage::execution_mode; @@ -964,7 +966,7 @@ struct fmt::formatter template auto format(const scord::adhoc_storage::access_type& type, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { using access_type = scord::adhoc_storage::access_type; @@ -991,13 +993,13 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const { - return format_to( + format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator { + return fmt::format_to( ctx.out(), - "{{controller: {}, data_stager: {}, execution_mode: {}, " + "{{controller: {:?}, data_stager: {:?}, execution_mode: {}, " "access_type: {}, walltime: {}, should_flush: {}}}", - std::quoted(c.controller_address()), - std::quoted(c.data_stager_address()), c.exec_mode(), + c.controller_address(), + c.data_stager_address(), c.exec_mode(), c.access_type(), c.walltime(), c.should_flush()); } }; @@ -1008,7 +1010,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const std::nullopt_t& /*t*/, FormatContext& ctx) const { + format(const std::nullopt_t& /*t*/, FormatContext& ctx) const -> format_context::iterator { return formatter::format("none", ctx); } }; @@ -1019,7 +1021,7 @@ struct fmt::formatter> : formatter { // parse is inherited from formatter. template auto - format(const std::optional& v, FormatContext& ctx) const { + format(const std::optional& v, FormatContext& ctx) const -> format_context::iterator { return formatter::format( v ? fmt::format("{}", v.value()) : "none", ctx); } @@ -1030,10 +1032,10 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::adhoc_storage& s, FormatContext& ctx) const { + format(const scord::adhoc_storage& s, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format( - "{{type: {}, id: {}, name: {}, context: {}}}", s.type(), s.id(), - std::quoted(s.name()), s.context()); + "{{type: {}, id: {}, name: {:?}, context: {}}}", s.type(), s.id(), + s.name(), s.context()); return formatter::format(str, ctx); } }; @@ -1044,7 +1046,7 @@ struct fmt::formatter // parse is inherited from formatter. template auto - format(const scord::adhoc_storage::resources& r, FormatContext& ctx) const { + format(const scord::adhoc_storage::resources& r, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{nodes: {}}}", r.nodes()); @@ -1058,7 +1060,7 @@ struct fmt::formatter // parse is inherited from formatter. template auto - format(const enum scord::pfs_storage::type& t, FormatContext& ctx) const { + format(const enum scord::pfs_storage::type& t, FormatContext& ctx) const -> format_context::iterator { using scord::pfs_storage; std::string_view name = "unknown"; @@ -1081,7 +1083,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::pfs_storage::ctx& c, FormatContext& ctx) const { + format(const scord::pfs_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{mount_point: {}}}", c.mount_point()); return formatter::format(str, ctx); } @@ -1092,7 +1094,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::pfs_storage& s, FormatContext& ctx) const { + format(const scord::pfs_storage& s, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{context: {}}}", s.context()); return formatter::format(str, ctx); } @@ -1103,7 +1105,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job::resources& r, FormatContext& ctx) const { + format(const scord::job::resources& r, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{nodes: {}}}", r.nodes()); return formatter::format(str, ctx); } @@ -1114,7 +1116,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::job::requirements& r, FormatContext& ctx) const { + format(const scord::job::requirements& r, FormatContext& ctx) const -> format_context::iterator { return formatter::format( fmt::format("{{inputs: {}, outputs: {}, " "expected_outputs: {}, adhoc_storage: {}}}", @@ -1129,7 +1131,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::qos::scope& s, FormatContext& ctx) const { + format(const scord::qos::scope& s, FormatContext& ctx) const -> format_context::iterator { using scope = scord::qos::scope; @@ -1161,7 +1163,7 @@ struct fmt::formatter> template auto format(const std::optional& e, - FormatContext& ctx) const { + FormatContext& ctx) const -> format_context::iterator { if(!e) { return formatter::format("none", ctx); @@ -1195,7 +1197,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::qos::subclass& sc, FormatContext& ctx) const { + format(const scord::qos::subclass& sc, FormatContext& ctx) const -> format_context::iterator { using subclass = scord::qos::subclass; @@ -1219,7 +1221,7 @@ struct fmt::formatter : formatter { // parse is inherited from formatter. template auto - format(const scord::qos::limit& l, FormatContext& ctx) const { + format(const scord::qos::limit& l, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("{{entity: {}, subclass: {}, value: {}}}", l.entity(), l.subclass(), l.value()); return formatter::format(str, ctx); @@ -1232,7 +1234,7 @@ struct fmt::formatter> // parse is inherited from formatter. template auto - format(const std::vector& l, FormatContext& ctx) const { + format(const std::vector& l, FormatContext& ctx) const -> format_context::iterator { const auto str = fmt::format("[{}]", fmt::join(l, ", ")); return formatter::format(str, ctx); } diff --git a/src/lib/types.cpp b/src/lib/types.cpp index e51c2a08ee7e9dc6812fc96e54213718f106174a..a889d336c43ef26477cbae4cd360048b6ebea953 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -1150,7 +1150,7 @@ private: return scord::transfer(entity->e_transfer); default: throw std::runtime_error(fmt::format( - "Unexpected scope value: {}", entity->e_scope)); + "Unexpected scope value: {}", (int)entity->e_scope)); } } diff --git a/src/scord-ctl/config_file.cpp b/src/scord-ctl/config_file.cpp index bb53f65352112a1d0dd72f42ad988e98b7ed0ddf..2cbb3b86b447c826e95a715932f7a6759aa15018 100644 --- a/src/scord-ctl/config_file.cpp +++ b/src/scord-ctl/config_file.cpp @@ -24,6 +24,9 @@ #include #include +#include +#include +#include #include #include #include "config_file.hpp" @@ -87,7 +90,7 @@ to_adhoc_storage_type(const ryml::csubstr& type) { throw std::runtime_error{ fmt::format("Unsupported adhoc storage type '{}' in " "configuration file", - type)}; + type.data())}; } return valid_types.at(type); @@ -155,7 +158,7 @@ parse_command_node(const ryml::ConstNodeRef& node) { ::validate_command(cmdline); } else { fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n", - child.key()); + child.key().data()); } } @@ -221,7 +224,7 @@ parse_adhoc_config_node(const ryml::ConstNodeRef& node) { shrink_command = ::parse_command_node(child); } else { fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n", - child.key()); + child.key().data()); } } @@ -297,7 +300,7 @@ parse_config_node(const ryml::ConstNodeRef& node) { adhoc_configs = ::parse_adhoc_storage_node(child); } else { fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n", - child.key()); + child.key().data()); } } diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index edff7198471bb9dbf5f3b786983527d604813221..13e6a938fa1afc20fb63ee4874f0eba5336f0f6b 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -70,12 +70,12 @@ rpc_server::print_configuration() const { if(const auto& env = command.env(); env.has_value()) { for(const auto& [k, v] : *env) { - LOGGER_INFO(" - {} = {}", k, std::quoted(v)); + LOGGER_INFO(" - {} = {:?}", k, (v)); } } LOGGER_INFO(" - command:"); - LOGGER_INFO(" {}", std::quoted(command.cmdline())); + LOGGER_INFO(" {:?}", (command.cmdline())); }; LOGGER_INFO(" - adhoc storage configurations:"); @@ -126,8 +126,8 @@ rpc_server::deploy_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc, + (adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; @@ -211,8 +211,8 @@ rpc_server::expand_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc, + (adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; @@ -272,8 +272,8 @@ rpc_server::shrink_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc, + (adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; @@ -331,8 +331,8 @@ rpc_server::terminate_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}}}", rpc, - std::quoted(adhoc_uuid), adhoc_type); + LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}}}", rpc, (adhoc_uuid), + adhoc_type); auto ec = scord::error_code::success; diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index 2f723ef6119c7ee03863259e98fe6af8a0bb1e4f..63f6ccc71e417bd4312e9e8893d50d4fe18a18aa 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -131,4 +131,5 @@ pfs_storage_metadata::update(scord::pfs_storage::ctx pfs_context) { m_pfs_storage.update(std::move(pfs_context)); } + } // namespace scord::internal diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 1448ff1fc2ee7f43565dc28a05540d296d53f10e..8ec74eb6e00a65fdae5df9ce1f9f272bb601d26e 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -65,6 +65,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); provider::define(EXPAND(transfer_datasets)); + provider::define(EXPAND(transfer_update)); #undef EXPAND } @@ -274,9 +275,9 @@ rpc_server::register_adhoc_storage( const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, adhoc_ctx: {}, " + LOGGER_INFO("rpc {:>} body: {{name: {:?}, type: {}, adhoc_ctx: {}, " "adhoc_resources: {}}}", - rpc, std::quoted(name), type, ctx, resources); + rpc, name, type, ctx, resources); scord::error_code ec; std::optional adhoc_id; @@ -370,8 +371,8 @@ rpc_server::update_adhoc_storage( const auto child_rpc = rpc_info::create( name, adhoc_storage.context().controller_address()); - LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", - child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), + LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), adhoc_storage.get_resources()); if(const auto call_rv = endp->call( @@ -467,8 +468,8 @@ rpc_server::deploy_adhoc_storage(const network::request& req, const auto child_rpc = rpc.add_child(adhoc_storage.context().controller_address()); - LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", - child_rpc, std::quoted(adhoc_metadata_ptr->uuid()), + LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), adhoc_storage.get_resources()); if(const auto call_rv = endp->call( @@ -546,8 +547,8 @@ rpc_server::terminate_adhoc_storage(const network::request& req, const auto child_rpc = rpc.add_child(adhoc_storage.context().controller_address()); - LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc, - std::quoted(adhoc_metadata_ptr->uuid()), + LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}}}", child_rpc, + adhoc_metadata_ptr->uuid(), adhoc_storage.type()); if(const auto call_rv = @@ -594,8 +595,8 @@ rpc_server::register_pfs_storage(const network::request& req, const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, - std::quoted(name), type, ctx); + LOGGER_INFO("rpc {:>} body: {{name: {:?}, type: {}, pfs_ctx: {}}}", rpc, + name, type, ctx); scord::error_code ec; std::optional pfs_id = 0; @@ -758,4 +759,42 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, req.respond(resp); } + +void +rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, + float obtained_bw) { + + using network::get_address; + using network::response_with_id; + using network::rpc_info; + + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, + transfer_id, obtained_bw); + + scord::error_code ec; + + // TODO: generate a global ID for the transfer and contact Cargo to + // actually request it + + const auto resp = response_with_id{rpc.id(), ec, transfer_id}; + + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + + // TODO: create a transfer in transfer manager + // We need the contact point, and different qos + + ec = m_transfer_manager.update(transfer_id, obtained_bw); + if(ec.no_such_entity) { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error updating transfer_storage\"", + rpc.id()); + } + + + req.respond(resp); +} + + } // namespace scord diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index f86a60c8aa52ccacfddc44c9f416673e22aab093..ef6b066cca4511ffc065ad59f256face678a58ec 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -105,6 +105,10 @@ private: const std::vector& limits, enum scord::transfer::mapping mapping); + void + transfer_update(const network::request& req, uint64_t transfer_id, + float obtained_bw); + job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager;