diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 93eed114e0bb2848bd6b933259e1bd9e5432a8cf..634d73f76ff1906e22e48c81cd00749c9607df8c 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -158,6 +158,12 @@ transfer transfer_datasets(const server& srv, const std::vector& sources, const std::vector& targets) { + if(sources.size() != targets.size()) { + throw std::runtime_error( + "The number of input datasets does not match the number of " + "output datasets"); + } + network::client rpc_client{srv.protocol()}; const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt index a1ee38475250e0f4089d9e2535c240390feda6cd..9b47c0046be7be64b8c0a9a3bd5e11532c074df4 100644 --- a/util/CMakeLists.txt +++ b/util/CMakeLists.txt @@ -52,6 +52,21 @@ target_link_libraries(cargo_shutdown cargo ) -install(TARGETS cargo_ping cargo_shutdown +add_executable(ccp) + +target_sources(ccp + PRIVATE + copy.cpp +) + +target_link_libraries(ccp + PUBLIC + fmt::fmt + CLI11::CLI11 + net::rpc_client + cargo +) + +install(TARGETS cargo_ping cargo_shutdown ccp RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) diff --git a/util/copy.cpp b/util/copy.cpp new file mode 100644 index 0000000000000000000000000000000000000000..745263a064335c04eb29770d08fa90f2221e74e7 --- /dev/null +++ b/util/copy.cpp @@ -0,0 +1,144 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +enum class dataset_flags { posix, mpio }; + +std::map dataset_flags_map{ + {"posix", dataset_flags::posix}, + {"mpio", dataset_flags::mpio}}; + +struct copy_config { + std::string progname; + std::string server_address; + std::vector inputs; + dataset_flags input_flags = dataset_flags::posix; + std::vector outputs; + dataset_flags output_flags = dataset_flags::posix; +}; + +copy_config +parse_command_line(int argc, char* argv[]) { + + copy_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Cargo parallel copy tool", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, + "Address of the Cargo server (can also be\n" + "provided via the CCP_SERVER environment\n" + "variable)") + ->option_text("ADDRESS") + ->envname("CCP_SERVER") + ->required(); + + app.add_option("-i,--input", cfg.inputs, "Input dataset(s)") + ->option_text("SRC...") + ->required(); + + app.add_option("-o,--output", cfg.outputs, "Output dataset(s)") + ->option_text("DST...") + ->required(); + + app.add_option("--if", cfg.input_flags, + "Flags for input datasets. Accepted values\n" + " - posix: read data using POSIX (default)\n" + " - mpio: read data using MPI-IO") + ->option_text("FLAGS") + ->transform(CLI::CheckedTransformer(dataset_flags_map, + CLI::ignore_case)); + + app.add_option("--of", cfg.output_flags, + "Flags for output datasets. Accepted values\n" + " - posix: write data using POSIX (default)\n" + " - mpio: write data using MPI-IO") + ->option_text("FLAGS") + ->transform(CLI::CheckedTransformer(dataset_flags_map, + CLI::ignore_case)); + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + +int +main(int argc, char* argv[]) { + + const auto cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + + cargo::server server{address}; + std::vector inputs; + std::vector outputs; + + std::transform(cfg.inputs.cbegin(), cfg.inputs.cend(), + std::back_inserter(inputs), [&](const auto& src) { + return cargo::dataset{ + src, cfg.input_flags == dataset_flags::mpio + ? cargo::dataset::type::parallel + : cargo::dataset::type::posix}; + }); + + std::transform(cfg.outputs.cbegin(), cfg.outputs.cend(), + std::back_inserter(outputs), [&cfg](const auto& tgt) { + return cargo::dataset{ + tgt, cfg.output_flags == dataset_flags::mpio + ? cargo::dataset::type::parallel + : cargo::dataset::type::posix}; + }); + + const auto tx = cargo::transfer_datasets(server, inputs, outputs); + + if(const auto st = tx.wait(); st.failed()) { + throw std::runtime_error(st.error().message()); + } + } catch(const std::exception& ex) { + fmt::print(stderr, "{}: Error: {}\n", cfg.progname, ex.what()); + return EXIT_FAILURE; + } +}