From e08f232adadd23ef24dd3c97ffd046211d253da3 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 20 Nov 2023 14:23:38 +0100 Subject: [PATCH] Fix QoS handling in rpc_server::scheduler_update() and add scord_adhoc utility --- README.md | 16 ++- cli/CMakeLists.txt | 21 ++++ cli/scord_adhoc.cpp | 249 +++++++++++++++++++++++++++++++++++++++ src/scord/rpc_server.cpp | 7 +- 4 files changed, 287 insertions(+), 6 deletions(-) create mode 100644 cli/scord_adhoc.cpp diff --git a/README.md b/README.md index c42b4aaa..67ec64bd 100644 --- a/README.md +++ b/README.md @@ -135,10 +135,9 @@ and need to be available in the system: - [JSON-C](https://github.com/json-c/json-c) version 0.13.1. - [Thallium](https://github.com/mochi-hpc/mochi-thallium) version 0.10.1 or later. -- [libconfig-dev] version 1.4.9 or later. - [redis-plus-plus](https://github.com/sewenew/redis-plus-plus) version 1.3. 3 or later, and its dependencies: - - [hiredis](https://github.com/redis/hiredis) version 0.14.1 or later. + - [hiredis](https://github.com/redis/hiredis) version 1.1.0 or later. The following libraries are also required by Scord, but will be automatically downloaded and compiled by the project as part of the standard @@ -320,3 +319,16 @@ Once finished, the server can be stopped by pressing Ctrl+C: [2021-11-19 13:19:26.552280] [scord] [158733] [info] [2021-11-19 13:19:26.552289] [scord] [158733] [info] [Stopped] ``` + +## CLI utils +Scord has 3 command-line utils : scord-ping, scord-query and scord-adhoc. + +scord-adhoc can be used when the slurm integration is not available and the user wants to try `scord-adhocfs-cargo` integration: + +The next command will copy `/tmp/temporal` to `/tmp/temporal2` using `cargo`, and will try to deploy an `adhocfs` (using gekkofs.sh script) + +``` +scord_adhoc --server tcp://127.0.0.1:52000 --controller tcp://127.0.0.1:52001 --stager tcp://127.0.0.1:62000 --slurm_id 10 --adhocfs gekkofs --function create --inputs lustre:/tmp/temporal,gekkofs:/tmp/temporal2 +``` + +As the slurm integration is not ready, all the three servers must be launched manually. This also enables the possibility to use `cargo` with `LD_PRELOAD` that is not available within slurm (as v0.3.2) and use adhocfs. diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index 39ea9270..bedc6a59 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -56,3 +56,24 @@ set_target_properties( scord_query PROPERTIES __INSTALLED_PATH ${CMAKE_INSTALL_FULL_BINDIR}/scord_query ) + + +# scord_adhoc: creates an adhocfs with cargo transfers +add_executable(scord_adhoc) + +target_sources(scord_adhoc + PRIVATE + scord_adhoc.cpp +) + +target_link_libraries(scord_adhoc + PUBLIC fmt::fmt CLI11::CLI11 libscord) + +install(TARGETS scord_adhoc + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} +) + +set_target_properties( + scord_adhoc PROPERTIES __INSTALLED_PATH + ${CMAKE_INSTALL_FULL_BINDIR}/scord_adhoc +) diff --git a/cli/scord_adhoc.cpp b/cli/scord_adhoc.cpp new file mode 100644 index 00000000..8e05de3a --- /dev/null +++ b/cli/scord_adhoc.cpp @@ -0,0 +1,249 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include + +struct query_config { + std::string progname; + std::string server_address; + std::string controller_address; + std::string stager_address; + std::uint32_t slurm_id{}; + std::uint32_t adhocid{}; + std::string nodes; + std::string adhocfs; + std::string inputs; + std::string outputs; + std::string function; +}; + + +/* Function that delimits a std::string into a vector of strings, using a + * specified delimiter */ +std::vector +split(const std::string& s, char delimiter) { + std::vector tokens; + std::string token; + std::istringstream tokenStream(s); + while(std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + return tokens; +} + + +query_config +parse_command_line(int argc, char* argv[]) { + + query_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Scord adhoc client [Register a job, does adhoc fs actions]", + cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + app.add_option("-c,--controller", cfg.controller_address, + "Controller address") + ->option_text("CONTROLLERADDRESS") + ->required(); + app.add_option("-d,--stager", cfg.stager_address, "Cargo address") + ->option_text("CARGOADDRESS") + ->required(); + app.add_option("-j,--slurm_id", cfg.slurm_id, "Slurm ID")->required(); + + app.add_option("-n,--nodes", cfg.nodes, "Nodes"); + + app.add_option("-a,--adhocfs", cfg.adhocfs, "Adhoc FS type")->required(); + + app.add_option("--adhocid", cfg.adhocid, "Adhoc ID"); + + app.add_option("-i,--inputs", cfg.inputs, + "Input dataset {lustre:/a,gekkofs:/b;lustre/a1...}"); + + app.add_option("-o,--outputs", cfg.outputs, "Output dataset"); + + app.add_option("-f,--function", cfg.function, + "Function {create, stage-in, stage-out, destroy}") + ->required(); + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + using namespace std::chrono_literals; + + query_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + + scord::server srv{protocol, address}; + + // Step 1 : If function is create, register job and adhoc fs server + if(cfg.function == "create") { + + // Step 1a : Define job resources + + // separate SLURM nodes string into vector of nodes + auto v_nodes = split(cfg.nodes, ','); + std::vector nodes; + for(auto& node : v_nodes) { + nodes.push_back(scord::node{node}); + } + + scord::job::resources job_resources(nodes); + + // Step 1b : Define adhoc_storage + /* type needs to process the sctring in cfg.adhocfs */ + auto type = scord::adhoc_storage::type::gekkofs; + if(cfg.adhocfs == "gekkofs") { + type = scord::adhoc_storage::type::gekkofs; + } else if(cfg.adhocfs == "hercules") { + type = scord::adhoc_storage::type::hercules; + } else if(cfg.adhocfs == "expand") { + type = scord::adhoc_storage::type::expand; + } else { + throw std::runtime_error( + fmt::format("Invalid adhoc fs type: {}", cfg.adhocfs)); + } + + + std::string adhoc_name = cfg.adhocfs + std::to_string(cfg.slurm_id); + scord::adhoc_storage::resources resources{nodes}; + + + scord::adhoc_storage::ctx ctx{ + cfg.controller_address, + cfg.stager_address, + scord::adhoc_storage::execution_mode::separate_new, + scord::adhoc_storage::access_type::read_write, + 100, + false}; + + + scord::adhoc_storage adhoc_storage = register_adhoc_storage( + srv, adhoc_name, type, ctx, resources); + + + fmt::print("AdhocStorage ID: {}\n", adhoc_storage.id()); + + auto path = deploy_adhoc_storage(srv, adhoc_storage); + + + // Step 1c : Define job_requirements + /* + + job::requirements::requirements( + std::vector inputs, + std::vector outputs, + std::vector expected_outputs) + : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), + m_expected_outputs(std::move(expected_outputs)) {} + + job::requirements::requirements( + std::vector inputs, + std::vector outputs, + std::vector expected_outputs, + scord::adhoc_storage adhoc_storage) + */ + + + /* Separate inputs into vector of inputs */ + + std::vector inputs; + auto v_routes_in = split(cfg.inputs, ';'); + + for(auto& src_dst : v_routes_in) { + auto route = split(src_dst, ','); + + inputs.push_back(scord::dataset_route( + scord::dataset{route[0]}, scord::dataset{route[1]})); + } + + /* Separate outputs into vector of outputs */ + std::vector outputs; + auto v_routes_out = split(cfg.outputs, ';'); + + for(auto& src_dst : v_routes_out) { + auto route = split(src_dst, ','); + + outputs.push_back(scord::dataset_route( + scord::dataset{route[0]}, scord::dataset{route[1]})); + } + + + scord::job::requirements job_requirements{ + inputs, outputs, std::vector{}, + adhoc_storage}; + + auto job = scord::register_job(srv, job_resources, job_requirements, + cfg.slurm_id); + + + // Now Tranfer Datasets + + // convert inputs to split inputs (src, dst) + std::vector inputs_src, inputs_dst; + + for(auto& route : inputs) { + inputs_src.push_back(route.source()); + inputs_dst.push_back(route.destination()); + } + + scord::transfer_datasets(srv, job, inputs_src, inputs_dst, + std::vector{}, + scord::transfer::mapping::n_to_n); + } + + } catch(const std::exception& ex) { + fmt::print(stderr, "Error: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index a883c0e0..10bb1e41 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -258,8 +258,7 @@ rpc_server::register_job(const network::request& req, }; - m_redis.value().hmset(std::to_string(slurm_id), m.begin(), - m.end()); + m_redis.value().hmset(std::to_string(slurm_id), m.begin(), m.end()); } } else { @@ -913,9 +912,9 @@ rpc_server::scheduler_update() { tr_info->update(status.bw()); auto bw = tr_info->measured_bandwidth(); uint64_t qos = 0; - try { + if(!tr_info->qos().empty()) { qos = tr_info->qos().front().value(); - } catch(const std::exception& e) { + } else { continue; } -- GitLab