diff --git a/CMakeLists.txt b/CMakeLists.txt
index 622c1c165b2b729c2ff805db6b219e749c1d939c..f2b5c05ee0fba3c79a6e711314e5705af1161a92 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -213,7 +213,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building {fmt}")
FetchContent_Declare(
fmt
GIT_REPOSITORY https://github.com/fmtlib/fmt
- GIT_TAG d141cdbeb0fb422a3fb7173b285fd38e0d1772dc # v8.0.1
+ GIT_TAG a33701196adfad74917046096bf5a2aa0ab0bb50 # v9.1.0
GIT_SHALLOW ON
GIT_PROGRESS ON
)
@@ -226,7 +226,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building spdlog")
FetchContent_Declare(
spdlog
GIT_REPOSITORY https://github.com/gabime/spdlog
- GIT_TAG eb3220622e73a4889eee355ffa37972b3cac3df5 # v1.9.2
+ GIT_TAG 7e635fca68d014934b4af8a1cf874f63989352b7 # v1.12.0
GIT_SHALLOW ON
GIT_PROGRESS ON
)
@@ -280,7 +280,7 @@ if (SCORD_BUILD_TESTS)
FetchContent_Declare(
Catch2
GIT_REPOSITORY https://github.com/catchorg/Catch2.git
- GIT_TAG 605a34765aa5d5ecbf476b4598a862ada971b0cc # v3.0.1
+ GIT_TAG 6e79e682b726f524310d55dec8ddac4e9c52fb5f # v3.4.0
GIT_SHALLOW ON
GIT_PROGRESS ON
)
diff --git a/ci/check_rpcs.py b/ci/check_rpcs.py
index 444bf155f48fbb659c57a9316a76eb74e71ae6ac..4ab96fb9e5d4bb36ef8f028d3da32020c8b6164f 100755
--- a/ci/check_rpcs.py
+++ b/ci/check_rpcs.py
@@ -19,7 +19,7 @@ RPC_NAMES = {
'ADM_remove_pfs_storage',
'ADM_transfer_datasets', 'ADM_get_transfer_priority',
'ADM_set_transfer_priority', 'ADM_cancel_transfer',
- 'ADM_get_pending_transfers',
+ 'ADM_get_pending_transfers', 'ADM_transfer_update',
'ADM_set_qos_constraints', 'ADM_get_qos_constraints',
'ADM_define_data_operation', 'ADM_connect_data_operation',
'ADM_finalize_data_operation',
diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt
index 070a57133acdc04b2eeb69c469a7eefc9c2ed398..1be1d9388bd74c9ae644e3d927c34779e459851f 100644
--- a/examples/c/CMakeLists.txt
+++ b/examples/c/CMakeLists.txt
@@ -52,7 +52,7 @@ list(APPEND c_examples_without_controller
add_library(c_examples_common STATIC)
target_sources(c_examples_common PUBLIC common.h PRIVATE common.c)
-target_link_libraries(c_examples_common libscord_c_types)
+target_link_libraries(c_examples_common libscord_c_types spdlog::spdlog fmt::fmt)
foreach(example IN LISTS c_examples_with_controller c_examples_without_controller)
add_executable(${example}_c)
diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..060633b1ba345c3f6885ef05ea5e37a60fcb21ed
--- /dev/null
+++ b/examples/cxx/ADM_transfer_update.cpp
@@ -0,0 +1,95 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include
+#include "common.hpp"
+
+#define NJOB_NODES 50
+#define NADHOC_NODES 25
+#define NINPUTS 10
+#define NOUTPUTS 5
+#define NSOURCES 5
+#define NTARGETS 5
+#define NLIMITS 4
+
+int
+main(int argc, char* argv[]) {
+
+ test_info test_info{
+ .name = TESTNAME,
+ .requires_server = true,
+ .requires_controller = true,
+ .requires_data_stager = true,
+ };
+
+ const auto cli_args = process_args(argc, argv, test_info);
+
+ scord::server server{"tcp", cli_args.server_address};
+
+ const auto job_nodes = prepare_nodes(NJOB_NODES);
+ const auto adhoc_nodes = prepare_nodes(NADHOC_NODES);
+ const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS);
+ const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS);
+ const auto expected_outputs =
+ prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS);
+
+ const auto sources = prepare_datasets("source-dataset-{}", NSOURCES);
+ const auto targets = prepare_datasets("target-dataset-{}", NTARGETS);
+ const auto qos_limits = prepare_qos_limits(NLIMITS);
+ const auto mapping = scord::transfer::mapping::n_to_n;
+
+ std::string name = "adhoc_storage_42";
+ const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{
+ cli_args.controller_address,
+ cli_args.data_stager_address,
+ scord::adhoc_storage::execution_mode::separate_new,
+ scord::adhoc_storage::access_type::read_write,
+ 100,
+ false};
+ const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes};
+
+ try {
+ const auto adhoc_storage = scord::register_adhoc_storage(
+ server, name, scord::adhoc_storage::type::gekkofs,
+ adhoc_storage_ctx, adhoc_resources);
+
+ scord::job::requirements reqs(inputs, outputs, expected_outputs,
+ adhoc_storage);
+
+ const auto job = scord::register_job(
+ server, scord::job::resources{job_nodes}, reqs, 0);
+ const auto transfer = scord::transfer_datasets(
+ server, job, sources, targets, qos_limits, mapping);
+
+ scord::transfer_update(server, transfer.id(), 10.0f);
+ fmt::print(stdout, "ADM_transfer_update() remote procedure completed "
+ "successfully\n");
+ exit(EXIT_SUCCESS);
+ } catch(const std::exception& e) {
+ fmt::print(stderr, "FATAL: ADM_transfer_update() failed: {}\n",
+ e.what());
+ exit(EXIT_FAILURE);
+ }
+}
diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt
index 86580f1f903e150e16b8ff55e332fe26a85879fd..67bd704db4d63d0eb59a569bd5809aafb66b3b0f 100644
--- a/examples/cxx/CMakeLists.txt
+++ b/examples/cxx/CMakeLists.txt
@@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller
ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage
# transfers
ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority
- ADM_cancel_transfer ADM_get_pending_transfers
+ ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update
# qos
ADM_set_qos_constraints ADM_get_qos_constraints
# data operations
@@ -56,7 +56,7 @@ foreach(example IN LISTS cxx_examples_with_controller cxx_examples_without_contr
add_executable(${example}_cxx)
target_sources(${example}_cxx PRIVATE ${example}.cpp)
target_link_libraries(${example}_cxx
- PUBLIC fmt::fmt libscord cxx_examples_common)
+ PUBLIC fmt::fmt spdlog::spdlog libscord cxx_examples_common)
set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example})
endforeach()
diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp
index 30723ada3bedafa02a05c01d0a6253a9edc3aa4e..464a02ea2abf97a3ad3309343944b94d34d4c592 100644
--- a/src/common/net/server.cpp
+++ b/src/common/net/server.cpp
@@ -31,6 +31,7 @@
#include
#include
#include
+#include
#include
#ifdef SCORD_DEBUG_BUILD
diff --git a/src/common/net/utilities.hpp b/src/common/net/utilities.hpp
index cc1dda421a4df64194e88428fdfa9f204f11fa9a..b45c21213d932de18ad5b7dcad4e01c3fb9ce485 100644
--- a/src/common/net/utilities.hpp
+++ b/src/common/net/utilities.hpp
@@ -120,15 +120,15 @@ struct fmt::formatter {
}
template
- constexpr auto
- format(const network::rpc_info& rpc, FormatContext& ctx) const {
- format_to(ctx.out(), "{}{} id: {} name: {} ", m_outbound ? "<=" : "=>",
+ auto
+ format(const network::rpc_info& rpc, FormatContext& ctx) const -> format_context::iterator {
+ fmt::format_to(ctx.out(), "{}{} id: {} name: {:?} ", m_outbound ? "<=" : "=>",
rpc.pid() ? fmt::format(" pid: {}", *rpc.pid()) : "",
- rpc.id(), std::quoted(rpc.name()));
- return m_outbound ? format_to(ctx.out(), "to: {}",
- std::quoted(rpc.address()))
- : format_to(ctx.out(), "from: {}",
- std::quoted(rpc.address()));
+ rpc.id(), rpc.name());
+ return m_outbound ? fmt::format_to(ctx.out(), "to: {:?}",
+ rpc.address())
+ : fmt::format_to(ctx.out(), "from: {:?}",
+ rpc.address());
}
};
diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp
index 1b9f797ae3586f46ea313a8af49bf576c3b99c1d..71f03dc54e591aa4dbcf1706262a2ad6fa6942ab 100644
--- a/src/lib/c_wrapper.cpp
+++ b/src/lib/c_wrapper.cpp
@@ -245,6 +245,15 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
return ADM_SUCCESS;
}
+ADM_return_t
+ADM_transfer_update(ADM_server_t server, uint64_t transfer_id,
+ float obtained_bw) {
+
+ return scord::detail::transfer_update(scord::server{server}, transfer_id,
+ obtained_bw);
+}
+
+
ADM_return_t
ADM_set_dataset_information(ADM_server_t server, ADM_job_t job,
ADM_dataset_t target, ADM_dataset_info_t info) {
diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp
index 17c8c2c5e5b20e1d33e6387c68a316687b9b1708..cc37fb09fc60a6301e8aad218aae8c532196ecb9 100644
--- a/src/lib/detail/impl.cpp
+++ b/src/lib/detail/impl.cpp
@@ -530,4 +530,37 @@ transfer_datasets(const server& srv, const job& job,
return tl::make_unexpected(scord::error_code::other);
}
+
+scord::error_code
+transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) {
+
+ network::client rpc_client{srv.protocol()};
+
+ const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address());
+
+ if(const auto& lookup_rv = rpc_client.lookup(srv.address());
+ lookup_rv.has_value()) {
+ const auto& endp = lookup_rv.value();
+
+ LOGGER_INFO("rpc {:<} body: {{transfer_id: {}, obtained_bw: {}}}", rpc,
+ transfer_id, obtained_bw);
+
+ if(const auto& call_rv =
+ endp.call(rpc.name(), transfer_id, obtained_bw);
+ call_rv.has_value()) {
+
+ const network::generic_response resp{call_rv.value()};
+
+ LOGGER_EVAL(resp.error_code(), INFO, ERROR,
+ "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc,
+ resp.error_code(), resp.op_id());
+
+ return resp.error_code();
+ }
+ }
+
+ LOGGER_ERROR("rpc call failed");
+ return scord::error_code::other;
+}
+
} // namespace scord::detail
diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp
index 31529bcea64791e04c85b76fea48bc1ddddcdfb2..d26575c1800f758e0cfac52d38614b0a47f65034 100644
--- a/src/lib/detail/impl.hpp
+++ b/src/lib/detail/impl.hpp
@@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job,
const std::vector& limits,
transfer::mapping mapping);
+scord::error_code
+transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw);
+
+
} // namespace scord::detail
#endif // SCORD_ADMIRE_IMPL_HPP
diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp
index 8686636971f809ee875ef60954d6366d7ee14f23..4b341d640d5f2859839a4e98de855c01c419a5de 100644
--- a/src/lib/libscord.cpp
+++ b/src/lib/libscord.cpp
@@ -377,6 +377,19 @@ transfer_datasets(const server& srv, const job& job,
return rv.value();
}
+
+void
+transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) {
+
+ const auto ec = detail::transfer_update(srv, transfer_id, obtained_bw);
+
+ if(!ec) {
+ throw std::runtime_error(
+ fmt::format("ADM_transfer_update() error: {}", ec.message()));
+ }
+}
+
+
ADM_return_t
set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
ADM_dataset_info_t info) {
diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h
index dc6527af60341e56184483024fa80fd22c8f7a1c..d3c75164701b2a148945580707a885362e053579 100644
--- a/src/lib/scord/scord.h
+++ b/src/lib/scord/scord.h
@@ -251,6 +251,19 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
ADM_qos_limit_t limits[], size_t limits_len,
ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer);
+/**
+ * Sets the obtained bw for the transfer operation
+ *
+ * @param[in] server The server to which the request is directed
+ * @param[in] transfer An ADM_TRANSFER identifying the originating transfer.
+ * @param[in] obtained_bw a float indicating the obtained bandwidth
+ *
+ * @return Returns if the remote procedure has been completed
+ * successfully or not.
+ */
+ADM_return_t
+ADM_transfer_update(ADM_server_t server, uint64_t transfer_id,
+ float obtained_bw);
/**
* Sets information for the dataset identified by resource_id.
diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp
index 469d7d41484394e7169f3e52eb8e93e7c9639def..045bdf282e616ee71485958243cf75ff6a27ca10 100644
--- a/src/lib/scord/scord.hpp
+++ b/src/lib/scord/scord.hpp
@@ -98,6 +98,9 @@ transfer_datasets(const server& srv, const job& job,
const std::vector& limits,
transfer::mapping mapping);
+void
+transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw);
+
ADM_return_t
set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target,
ADM_dataset_info_t info);
diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp
index d6e771d17ac219f47669b412794ee345db955370..ec767ccb30acb09c5c2207ca993feb95ab81d693 100644
--- a/src/lib/scord/types.hpp
+++ b/src/lib/scord/types.hpp
@@ -29,7 +29,9 @@
#include
#include
#include
+#include
#include
+#include
#include
#include
#include
@@ -712,7 +714,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::error_code& ec, FormatContext& ctx) const {
+ format(const scord::error_code& ec, FormatContext& ctx) const -> format_context::iterator {
return formatter::format(ec.name(), ctx);
}
};
@@ -722,8 +724,8 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::job_info& ji, FormatContext& ctx) const {
- return format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}",
+ format(const scord::job_info& ji, FormatContext& ctx) const -> format_context::iterator {
+ return fmt::format_to(ctx.out(), "{{adhoc_controller: {}, io_procs: {}}}",
ji.adhoc_controller_address(), ji.io_procs());
}
};
@@ -733,7 +735,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::job& j, FormatContext& ctx) const {
+ format(const scord::job& j, FormatContext& ctx) const -> format_context::iterator {
return formatter::format(
fmt::format("{{id: {}, slurm_id: {}}}", j.id(), j.slurm_id()),
ctx);
@@ -745,8 +747,8 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::dataset& d, FormatContext& ctx) const {
- const auto str = fmt::format("{{id: {}}}", std::quoted(d.id()));
+ format(const scord::dataset& d, FormatContext& ctx) const -> format_context::iterator {
+ const auto str = fmt::format("{{id: {:?}}}", d.id());
return formatter::format(str, ctx);
}
};
@@ -757,7 +759,7 @@ struct fmt::formatter>
// parse is inherited from formatter.
template
auto
- format(const std::vector& v, FormatContext& ctx) const {
+ format(const std::vector& v, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("[{}]", fmt::join(v, ", "));
return formatter::format(str, ctx);
}
@@ -768,7 +770,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::dataset_route& r, FormatContext& ctx) const {
+ format(const scord::dataset_route& r, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{src: {}, dst: {}}}", r.source(),
r.destination());
return formatter::format(str, ctx);
@@ -782,7 +784,7 @@ struct fmt::formatter>
template
auto
format(const std::vector& v,
- FormatContext& ctx) const {
+ FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("[{}]", fmt::join(v, ", "));
return formatter::format(str, ctx);
}
@@ -793,7 +795,7 @@ struct fmt::formatter : fmt::formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::node::type& t, FormatContext& ctx) const {
+ format(const scord::node::type& t, FormatContext& ctx) const -> format_context::iterator {
using scord::node;
std::string_view name = "unknown";
@@ -817,9 +819,9 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::node& n, FormatContext& ctx) const {
- const auto str = fmt::format("{{hostname: {}, type: {}}}",
- std::quoted(n.hostname()), n.get_type());
+ format(const scord::node& n, FormatContext& ctx) const -> format_context::iterator {
+ const auto str = fmt::format("{{hostname: {:?}, type: {}}}",
+ n.hostname(), n.get_type());
return formatter::format(str, ctx);
}
};
@@ -830,7 +832,7 @@ struct fmt::formatter>
// parse is inherited from formatter.
template
auto
- format(const std::vector& v, FormatContext& ctx) const {
+ format(const std::vector& v, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("[{}]", fmt::join(v, ", "));
return formatter::format(str, ctx);
}
@@ -841,7 +843,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::transfer::mapping& m, FormatContext& ctx) const {
+ format(const scord::transfer::mapping& m, FormatContext& ctx) const -> format_context::iterator {
using mapping = scord::transfer::mapping;
@@ -868,7 +870,7 @@ struct fmt::formatter : fmt::formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::transfer& tx, FormatContext& ctx) const {
+ format(const scord::transfer& tx, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{id: {}}}", tx.id());
return formatter::format(str, ctx);
}
@@ -897,7 +899,7 @@ struct fmt::formatter {
template
auto
- format(const enum scord::adhoc_storage::type& t, FormatContext& ctx) const {
+ format(const enum scord::adhoc_storage::type& t, FormatContext& ctx) const -> format_context::iterator {
using scord::adhoc_storage;
std::string_view name = "unknown";
@@ -921,7 +923,7 @@ struct fmt::formatter {
break;
}
- return format_to(ctx.out(), "{}", name);
+ return fmt::format_to(ctx.out(), "{}", name);
}
};
@@ -932,7 +934,7 @@ struct fmt::formatter
template
auto
format(const scord::adhoc_storage::execution_mode& exec_mode,
- FormatContext& ctx) const {
+ FormatContext& ctx) const -> format_context::iterator {
using execution_mode = scord::adhoc_storage::execution_mode;
@@ -964,7 +966,7 @@ struct fmt::formatter
template
auto
format(const scord::adhoc_storage::access_type& type,
- FormatContext& ctx) const {
+ FormatContext& ctx) const -> format_context::iterator {
using access_type = scord::adhoc_storage::access_type;
@@ -991,13 +993,13 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const {
- return format_to(
+ format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator {
+ return fmt::format_to(
ctx.out(),
- "{{controller: {}, data_stager: {}, execution_mode: {}, "
+ "{{controller: {:?}, data_stager: {:?}, execution_mode: {}, "
"access_type: {}, walltime: {}, should_flush: {}}}",
- std::quoted(c.controller_address()),
- std::quoted(c.data_stager_address()), c.exec_mode(),
+ c.controller_address(),
+ c.data_stager_address(), c.exec_mode(),
c.access_type(), c.walltime(), c.should_flush());
}
};
@@ -1008,7 +1010,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const std::nullopt_t& /*t*/, FormatContext& ctx) const {
+ format(const std::nullopt_t& /*t*/, FormatContext& ctx) const -> format_context::iterator {
return formatter::format("none", ctx);
}
};
@@ -1019,7 +1021,7 @@ struct fmt::formatter> : formatter {
// parse is inherited from formatter.
template
auto
- format(const std::optional& v, FormatContext& ctx) const {
+ format(const std::optional& v, FormatContext& ctx) const -> format_context::iterator {
return formatter::format(
v ? fmt::format("{}", v.value()) : "none", ctx);
}
@@ -1030,10 +1032,10 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::adhoc_storage& s, FormatContext& ctx) const {
+ format(const scord::adhoc_storage& s, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format(
- "{{type: {}, id: {}, name: {}, context: {}}}", s.type(), s.id(),
- std::quoted(s.name()), s.context());
+ "{{type: {}, id: {}, name: {:?}, context: {}}}", s.type(), s.id(),
+ s.name(), s.context());
return formatter::format(str, ctx);
}
};
@@ -1044,7 +1046,7 @@ struct fmt::formatter
// parse is inherited from formatter.
template
auto
- format(const scord::adhoc_storage::resources& r, FormatContext& ctx) const {
+ format(const scord::adhoc_storage::resources& r, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{nodes: {}}}", r.nodes());
@@ -1058,7 +1060,7 @@ struct fmt::formatter
// parse is inherited from formatter.
template
auto
- format(const enum scord::pfs_storage::type& t, FormatContext& ctx) const {
+ format(const enum scord::pfs_storage::type& t, FormatContext& ctx) const -> format_context::iterator {
using scord::pfs_storage;
std::string_view name = "unknown";
@@ -1081,7 +1083,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::pfs_storage::ctx& c, FormatContext& ctx) const {
+ format(const scord::pfs_storage::ctx& c, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{mount_point: {}}}", c.mount_point());
return formatter::format(str, ctx);
}
@@ -1092,7 +1094,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::pfs_storage& s, FormatContext& ctx) const {
+ format(const scord::pfs_storage& s, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{context: {}}}", s.context());
return formatter::format(str, ctx);
}
@@ -1103,7 +1105,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::job::resources& r, FormatContext& ctx) const {
+ format(const scord::job::resources& r, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{nodes: {}}}", r.nodes());
return formatter::format(str, ctx);
}
@@ -1114,7 +1116,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::job::requirements& r, FormatContext& ctx) const {
+ format(const scord::job::requirements& r, FormatContext& ctx) const -> format_context::iterator {
return formatter::format(
fmt::format("{{inputs: {}, outputs: {}, "
"expected_outputs: {}, adhoc_storage: {}}}",
@@ -1129,7 +1131,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::qos::scope& s, FormatContext& ctx) const {
+ format(const scord::qos::scope& s, FormatContext& ctx) const -> format_context::iterator {
using scope = scord::qos::scope;
@@ -1161,7 +1163,7 @@ struct fmt::formatter>
template
auto
format(const std::optional& e,
- FormatContext& ctx) const {
+ FormatContext& ctx) const -> format_context::iterator {
if(!e) {
return formatter::format("none", ctx);
@@ -1195,7 +1197,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::qos::subclass& sc, FormatContext& ctx) const {
+ format(const scord::qos::subclass& sc, FormatContext& ctx) const -> format_context::iterator {
using subclass = scord::qos::subclass;
@@ -1219,7 +1221,7 @@ struct fmt::formatter : formatter {
// parse is inherited from formatter.
template
auto
- format(const scord::qos::limit& l, FormatContext& ctx) const {
+ format(const scord::qos::limit& l, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("{{entity: {}, subclass: {}, value: {}}}",
l.entity(), l.subclass(), l.value());
return formatter::format(str, ctx);
@@ -1232,7 +1234,7 @@ struct fmt::formatter>
// parse is inherited from formatter.
template
auto
- format(const std::vector& l, FormatContext& ctx) const {
+ format(const std::vector& l, FormatContext& ctx) const -> format_context::iterator {
const auto str = fmt::format("[{}]", fmt::join(l, ", "));
return formatter::format(str, ctx);
}
diff --git a/src/lib/types.cpp b/src/lib/types.cpp
index e51c2a08ee7e9dc6812fc96e54213718f106174a..a889d336c43ef26477cbae4cd360048b6ebea953 100644
--- a/src/lib/types.cpp
+++ b/src/lib/types.cpp
@@ -1150,7 +1150,7 @@ private:
return scord::transfer(entity->e_transfer);
default:
throw std::runtime_error(fmt::format(
- "Unexpected scope value: {}", entity->e_scope));
+ "Unexpected scope value: {}", (int)entity->e_scope));
}
}
diff --git a/src/scord-ctl/config_file.cpp b/src/scord-ctl/config_file.cpp
index bb53f65352112a1d0dd72f42ad988e98b7ed0ddf..2cbb3b86b447c826e95a715932f7a6759aa15018 100644
--- a/src/scord-ctl/config_file.cpp
+++ b/src/scord-ctl/config_file.cpp
@@ -24,6 +24,9 @@
#include
#include
+#include
+#include
+#include
#include
#include
#include "config_file.hpp"
@@ -87,7 +90,7 @@ to_adhoc_storage_type(const ryml::csubstr& type) {
throw std::runtime_error{
fmt::format("Unsupported adhoc storage type '{}' in "
"configuration file",
- type)};
+ type.data())};
}
return valid_types.at(type);
@@ -155,7 +158,7 @@ parse_command_node(const ryml::ConstNodeRef& node) {
::validate_command(cmdline);
} else {
fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n",
- child.key());
+ child.key().data());
}
}
@@ -221,7 +224,7 @@ parse_adhoc_config_node(const ryml::ConstNodeRef& node) {
shrink_command = ::parse_command_node(child);
} else {
fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n",
- child.key());
+ child.key().data());
}
}
@@ -297,7 +300,7 @@ parse_config_node(const ryml::ConstNodeRef& node) {
adhoc_configs = ::parse_adhoc_storage_node(child);
} else {
fmt::print(stderr, "WARNING: Unknown key: '{}'. Ignored.\n",
- child.key());
+ child.key().data());
}
}
diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp
index edff7198471bb9dbf5f3b786983527d604813221..13e6a938fa1afc20fb63ee4874f0eba5336f0f6b 100644
--- a/src/scord-ctl/rpc_server.cpp
+++ b/src/scord-ctl/rpc_server.cpp
@@ -70,12 +70,12 @@ rpc_server::print_configuration() const {
if(const auto& env = command.env(); env.has_value()) {
for(const auto& [k, v] : *env) {
- LOGGER_INFO(" - {} = {}", k, std::quoted(v));
+ LOGGER_INFO(" - {} = {:?}", k, (v));
}
}
LOGGER_INFO(" - command:");
- LOGGER_INFO(" {}", std::quoted(command.cmdline()));
+ LOGGER_INFO(" {:?}", (command.cmdline()));
};
LOGGER_INFO(" - adhoc storage configurations:");
@@ -126,8 +126,8 @@ rpc_server::deploy_adhoc_storage(
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
std::optional adhoc_dir;
- LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc,
- std::quoted(adhoc_uuid), adhoc_type, adhoc_resources);
+ LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc,
+ (adhoc_uuid), adhoc_type, adhoc_resources);
auto ec = scord::error_code::success;
@@ -211,8 +211,8 @@ rpc_server::expand_adhoc_storage(
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
std::optional adhoc_dir;
- LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc,
- std::quoted(adhoc_uuid), adhoc_type, adhoc_resources);
+ LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc,
+ (adhoc_uuid), adhoc_type, adhoc_resources);
auto ec = scord::error_code::success;
@@ -272,8 +272,8 @@ rpc_server::shrink_adhoc_storage(
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
std::optional adhoc_dir;
- LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc,
- std::quoted(adhoc_uuid), adhoc_type, adhoc_resources);
+ LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}, resources: {}}}", rpc,
+ (adhoc_uuid), adhoc_type, adhoc_resources);
auto ec = scord::error_code::success;
@@ -331,8 +331,8 @@ rpc_server::terminate_adhoc_storage(
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
- LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}}}", rpc,
- std::quoted(adhoc_uuid), adhoc_type);
+ LOGGER_INFO("rpc {:>} body: {{uuid: {:?}, type: {}}}", rpc, (adhoc_uuid),
+ adhoc_type);
auto ec = scord::error_code::success;
diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp
index 2f723ef6119c7ee03863259e98fe6af8a0bb1e4f..63f6ccc71e417bd4312e9e8893d50d4fe18a18aa 100644
--- a/src/scord/internal_types.cpp
+++ b/src/scord/internal_types.cpp
@@ -131,4 +131,5 @@ pfs_storage_metadata::update(scord::pfs_storage::ctx pfs_context) {
m_pfs_storage.update(std::move(pfs_context));
}
+
} // namespace scord::internal
diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp
index 1448ff1fc2ee7f43565dc28a05540d296d53f10e..8ec74eb6e00a65fdae5df9ce1f9f272bb601d26e 100644
--- a/src/scord/rpc_server.cpp
+++ b/src/scord/rpc_server.cpp
@@ -65,6 +65,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize,
provider::define(EXPAND(update_pfs_storage));
provider::define(EXPAND(remove_pfs_storage));
provider::define(EXPAND(transfer_datasets));
+ provider::define(EXPAND(transfer_update));
#undef EXPAND
}
@@ -274,9 +275,9 @@ rpc_server::register_adhoc_storage(
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
- LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, adhoc_ctx: {}, "
+ LOGGER_INFO("rpc {:>} body: {{name: {:?}, type: {}, adhoc_ctx: {}, "
"adhoc_resources: {}}}",
- rpc, std::quoted(name), type, ctx, resources);
+ rpc, name, type, ctx, resources);
scord::error_code ec;
std::optional adhoc_id;
@@ -370,8 +371,8 @@ rpc_server::update_adhoc_storage(
const auto child_rpc = rpc_info::create(
name, adhoc_storage.context().controller_address());
- LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}",
- child_rpc, std::quoted(adhoc_metadata_ptr->uuid()),
+ LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}",
+ child_rpc, adhoc_metadata_ptr->uuid(),
adhoc_storage.type(), adhoc_storage.get_resources());
if(const auto call_rv = endp->call(
@@ -467,8 +468,8 @@ rpc_server::deploy_adhoc_storage(const network::request& req,
const auto child_rpc =
rpc.add_child(adhoc_storage.context().controller_address());
- LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}",
- child_rpc, std::quoted(adhoc_metadata_ptr->uuid()),
+ LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}",
+ child_rpc, adhoc_metadata_ptr->uuid(),
adhoc_storage.type(), adhoc_storage.get_resources());
if(const auto call_rv = endp->call(
@@ -546,8 +547,8 @@ rpc_server::terminate_adhoc_storage(const network::request& req,
const auto child_rpc =
rpc.add_child(adhoc_storage.context().controller_address());
- LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc,
- std::quoted(adhoc_metadata_ptr->uuid()),
+ LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}}}", child_rpc,
+ adhoc_metadata_ptr->uuid(),
adhoc_storage.type());
if(const auto call_rv =
@@ -594,8 +595,8 @@ rpc_server::register_pfs_storage(const network::request& req,
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
- LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc,
- std::quoted(name), type, ctx);
+ LOGGER_INFO("rpc {:>} body: {{name: {:?}, type: {}, pfs_ctx: {}}}", rpc,
+ name, type, ctx);
scord::error_code ec;
std::optional pfs_id = 0;
@@ -758,4 +759,42 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id,
req.respond(resp);
}
+
+void
+rpc_server::transfer_update(const network::request& req, uint64_t transfer_id,
+ float obtained_bw) {
+
+ using network::get_address;
+ using network::response_with_id;
+ using network::rpc_info;
+
+ const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
+
+ LOGGER_INFO("rpc {:>} body: {{transfer_id: {}, obtained_bw: {}}}", rpc,
+ transfer_id, obtained_bw);
+
+ scord::error_code ec;
+
+ // TODO: generate a global ID for the transfer and contact Cargo to
+ // actually request it
+
+ const auto resp = response_with_id{rpc.id(), ec, transfer_id};
+
+ LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);
+
+ // TODO: create a transfer in transfer manager
+ // We need the contact point, and different qos
+
+ ec = m_transfer_manager.update(transfer_id, obtained_bw);
+ if(ec.no_such_entity) {
+ LOGGER_ERROR(
+ "rpc id: {} error_msg: \"Error updating transfer_storage\"",
+ rpc.id());
+ }
+
+
+ req.respond(resp);
+}
+
+
} // namespace scord
diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp
index f86a60c8aa52ccacfddc44c9f416673e22aab093..ef6b066cca4511ffc065ad59f256face678a58ec 100644
--- a/src/scord/rpc_server.hpp
+++ b/src/scord/rpc_server.hpp
@@ -105,6 +105,10 @@ private:
const std::vector& limits,
enum scord::transfer::mapping mapping);
+ void
+ transfer_update(const network::request& req, uint64_t transfer_id,
+ float obtained_bw);
+
job_manager m_job_manager;
adhoc_storage_manager m_adhoc_manager;
pfs_storage_manager m_pfs_manager;