diff --git a/README.md b/README.md
index c42b4aaa0ec7efc4f8b95c0f17e13f73997df9e0..67ec64bdefc61eb5ddf96be2cc16410bb7b93187 100644
--- a/README.md
+++ b/README.md
@@ -135,10 +135,9 @@ and need to be available in the system:
- [JSON-C](https://github.com/json-c/json-c) version 0.13.1.
- [Thallium](https://github.com/mochi-hpc/mochi-thallium) version 0.10.1 or
later.
-- [libconfig-dev] version 1.4.9 or later.
- [redis-plus-plus](https://github.com/sewenew/redis-plus-plus) version 1.3.
3 or later, and its dependencies:
- - [hiredis](https://github.com/redis/hiredis) version 0.14.1 or later.
+ - [hiredis](https://github.com/redis/hiredis) version 1.1.0 or later.
The following libraries are also required by Scord, but will be
automatically downloaded and compiled by the project as part of the standard
@@ -320,3 +319,16 @@ Once finished, the server can be stopped by pressing Ctrl+C:
[2021-11-19 13:19:26.552280] [scord] [158733] [info]
[2021-11-19 13:19:26.552289] [scord] [158733] [info] [Stopped]
```
+
+## CLI utils
+Scord has 3 command-line utils : scord-ping, scord-query and scord-adhoc.
+
+scord-adhoc can be used when the slurm integration is not available and the user wants to try `scord-adhocfs-cargo` integration:
+
+The next command will copy `/tmp/temporal` to `/tmp/temporal2` using `cargo`, and will try to deploy an `adhocfs` (using gekkofs.sh script)
+
+```
+scord_adhoc --server tcp://127.0.0.1:52000 --controller tcp://127.0.0.1:52001 --stager tcp://127.0.0.1:62000 --slurm_id 10 --adhocfs gekkofs --function create --inputs lustre:/tmp/temporal,gekkofs:/tmp/temporal2
+```
+
+As the slurm integration is not ready, all the three servers must be launched manually. This also enables the possibility to use `cargo` with `LD_PRELOAD` that is not available within slurm (as v0.3.2) and use adhocfs.
diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index 39ea9270ccc82f621fce58e247e17b7e0afccba1..bedc6a59c3ccec852fff69c2da4882e2ac4118eb 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -56,3 +56,24 @@ set_target_properties(
scord_query PROPERTIES __INSTALLED_PATH
${CMAKE_INSTALL_FULL_BINDIR}/scord_query
)
+
+
+# scord_adhoc: creates an adhocfs with cargo transfers
+add_executable(scord_adhoc)
+
+target_sources(scord_adhoc
+ PRIVATE
+ scord_adhoc.cpp
+)
+
+target_link_libraries(scord_adhoc
+ PUBLIC fmt::fmt CLI11::CLI11 libscord)
+
+install(TARGETS scord_adhoc
+ RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
+)
+
+set_target_properties(
+ scord_adhoc PROPERTIES __INSTALLED_PATH
+ ${CMAKE_INSTALL_FULL_BINDIR}/scord_adhoc
+)
diff --git a/cli/scord_adhoc.cpp b/cli/scord_adhoc.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..8e05de3a2cd7b1706e1965952671dd8a76cb1e00
--- /dev/null
+++ b/cli/scord_adhoc.cpp
@@ -0,0 +1,249 @@
+/******************************************************************************
+ * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+
+struct query_config {
+ std::string progname;
+ std::string server_address;
+ std::string controller_address;
+ std::string stager_address;
+ std::uint32_t slurm_id{};
+ std::uint32_t adhocid{};
+ std::string nodes;
+ std::string adhocfs;
+ std::string inputs;
+ std::string outputs;
+ std::string function;
+};
+
+
+/* Function that delimits a std::string into a vector of strings, using a
+ * specified delimiter */
+std::vector
+split(const std::string& s, char delimiter) {
+ std::vector tokens;
+ std::string token;
+ std::istringstream tokenStream(s);
+ while(std::getline(tokenStream, token, delimiter)) {
+ tokens.push_back(token);
+ }
+ return tokens;
+}
+
+
+query_config
+parse_command_line(int argc, char* argv[]) {
+
+ query_config cfg;
+
+ cfg.progname = std::filesystem::path{argv[0]}.filename().string();
+
+ CLI::App app{"Scord adhoc client [Register a job, does adhoc fs actions]",
+ cfg.progname};
+
+ app.add_option("-s,--server", cfg.server_address, "Server address")
+ ->option_text("ADDRESS")
+ ->required();
+ app.add_option("-c,--controller", cfg.controller_address,
+ "Controller address")
+ ->option_text("CONTROLLERADDRESS")
+ ->required();
+ app.add_option("-d,--stager", cfg.stager_address, "Cargo address")
+ ->option_text("CARGOADDRESS")
+ ->required();
+ app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required();
+
+ app.add_option("-n,--nodes", cfg.nodes, "Nodes");
+
+ app.add_option("-a,--adhocfs", cfg.adhocfs, "Adhoc FS type")->required();
+
+ app.add_option("--adhocid", cfg.adhocid, "Adhoc ID");
+
+ app.add_option("-i,--inputs", cfg.inputs,
+ "Input dataset {lustre:/a,gekkofs:/b;lustre/a1...}");
+
+ app.add_option("-o,--outputs", cfg.outputs, "Output dataset");
+
+ app.add_option("-f,--function", cfg.function,
+ "Function {create, stage-in, stage-out, destroy}")
+ ->required();
+
+ try {
+ app.parse(argc, argv);
+ return cfg;
+ } catch(const CLI::ParseError& ex) {
+ std::exit(app.exit(ex));
+ }
+}
+
+auto
+parse_address(const std::string& address) {
+ const auto pos = address.find("://");
+ if(pos == std::string::npos) {
+ throw std::runtime_error(fmt::format("Invalid address: {}", address));
+ }
+
+ const auto protocol = address.substr(0, pos);
+ return std::make_pair(protocol, address);
+}
+
+
+int
+main(int argc, char* argv[]) {
+
+ using namespace std::chrono_literals;
+
+ query_config cfg = parse_command_line(argc, argv);
+
+ try {
+ const auto [protocol, address] = parse_address(cfg.server_address);
+
+ scord::server srv{protocol, address};
+
+ // Step 1 : If function is create, register job and adhoc fs server
+ if(cfg.function == "create") {
+
+ // Step 1a : Define job resources
+
+ // separate SLURM nodes string into vector of nodes
+ auto v_nodes = split(cfg.nodes, ',');
+ std::vector nodes;
+ for(auto& node : v_nodes) {
+ nodes.push_back(scord::node{node});
+ }
+
+ scord::job::resources job_resources(nodes);
+
+ // Step 1b : Define adhoc_storage
+ /* type needs to process the sctring in cfg.adhocfs */
+ auto type = scord::adhoc_storage::type::gekkofs;
+ if(cfg.adhocfs == "gekkofs") {
+ type = scord::adhoc_storage::type::gekkofs;
+ } else if(cfg.adhocfs == "hercules") {
+ type = scord::adhoc_storage::type::hercules;
+ } else if(cfg.adhocfs == "expand") {
+ type = scord::adhoc_storage::type::expand;
+ } else {
+ throw std::runtime_error(
+ fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs));
+ }
+
+
+ std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id);
+ scord::adhoc_storage::resources resources{nodes};
+
+
+ scord::adhoc_storage::ctx ctx{
+ cfg.controller_address,
+ cfg.stager_address,
+ scord::adhoc_storage::execution_mode::separate_new,
+ scord::adhoc_storage::access_type::read_write,
+ 100,
+ false};
+
+
+ scord::adhoc_storage adhoc_storage = register_adhoc_storage(
+ srv, adhoc_name, type, ctx, resources);
+
+
+ fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id());
+
+ auto path = deploy_adhoc_storage(srv, adhoc_storage);
+
+
+ // Step 1c : Define job_requirements
+ /*
+
+ job::requirements::requirements(
+ std::vector inputs,
+ std::vector outputs,
+ std::vector expected_outputs)
+ : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)),
+ m_expected_outputs(std::move(expected_outputs)) {}
+
+ job::requirements::requirements(
+ std::vector inputs,
+ std::vector outputs,
+ std::vector expected_outputs,
+ scord::adhoc_storage adhoc_storage)
+ */
+
+
+ /* Separate inputs into vector of inputs */
+
+ std::vector inputs;
+ auto v_routes_in = split(cfg.inputs, ';');
+
+ for(auto& src_dst : v_routes_in) {
+ auto route = split(src_dst, ',');
+
+ inputs.push_back(scord::dataset_route(
+ scord::dataset{route[0]}, scord::dataset{route[1]}));
+ }
+
+ /* Separate outputs into vector of outputs */
+ std::vector outputs;
+ auto v_routes_out = split(cfg.outputs, ';');
+
+ for(auto& src_dst : v_routes_out) {
+ auto route = split(src_dst, ',');
+
+ outputs.push_back(scord::dataset_route(
+ scord::dataset{route[0]}, scord::dataset{route[1]}));
+ }
+
+
+ scord::job::requirements job_requirements{
+ inputs, outputs, std::vector{},
+ adhoc_storage};
+
+ auto job = scord::register_job(srv, job_resources, job_requirements,
+ cfg.slurm_id);
+
+
+ // Now Tranfer Datasets
+
+ // convert inputs to split inputs (src, dst)
+ std::vector inputs_src, inputs_dst;
+
+ for(auto& route : inputs) {
+ inputs_src.push_back(route.source());
+ inputs_dst.push_back(route.destination());
+ }
+
+ scord::transfer_datasets(srv, job, inputs_src, inputs_dst,
+ std::vector{},
+ scord::transfer::mapping::n_to_n);
+ }
+
+ } catch(const std::exception& ex) {
+ fmt::print(stderr, "Error: {}\n", ex.what());
+ return EXIT_FAILURE;
+ }
+}
diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp
index a883c0e08964ca5ddcdbc0d7b7efbad5618fa144..10bb1e41de766d6c352a31753c3997a8546e9452 100644
--- a/src/scord/rpc_server.cpp
+++ b/src/scord/rpc_server.cpp
@@ -258,8 +258,7 @@ rpc_server::register_job(const network::request& req,
};
- m_redis.value().hmset(std::to_string(slurm_id), m.begin(),
- m.end());
+ m_redis.value().hmset(std::to_string(slurm_id), m.begin(), m.end());
}
} else {
@@ -913,9 +912,9 @@ rpc_server::scheduler_update() {
tr_info->update(status.bw());
auto bw = tr_info->measured_bandwidth();
uint64_t qos = 0;
- try {
+ if(!tr_info->qos().empty()) {
qos = tr_info->qos().front().value();
- } catch(const std::exception& e) {
+ } else {
continue;
}