From 4a84a83ccbfd549c8210d23957659ac206f3e48d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 27 Feb 2024 12:16:43 +0100 Subject: [PATCH 01/19] FTIO interface created --- cli/CMakeLists.txt | 21 ++++++++- cli/ftio.cpp | 114 +++++++++++++++++++++++++++++++++++++++++++++ src/master.cpp | 67 +++++++++++++++++++++++++- src/master.hpp | 15 ++++++ 4 files changed, 214 insertions(+), 3 deletions(-) create mode 100644 cli/ftio.cpp diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index 68fea90..f779733 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -101,7 +101,26 @@ target_link_libraries(shaping cargo ) -install(TARGETS cargo_ping cargo_shutdown ccp shaping + +################################################################################ +## ftio: A CLI tool to send the ftio info to a Cargo server +add_executable(cargo_ftio) + +target_sources(cargo_ftio + PRIVATE + ftio.cpp +) + +target_link_libraries(cargo_ftio + PUBLIC + fmt::fmt + CLI11::CLI11 + net::rpc_client + cargo +) + + +install(TARGETS cargo_ping cargo_shutdown ccp shaping cargo_ftio RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) diff --git a/cli/ftio.cpp b/cli/ftio.cpp new file mode 100644 index 0000000..94713be --- /dev/null +++ b/cli/ftio.cpp @@ -0,0 +1,114 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +struct ftio_config { + std::string progname; + std::string server_address; + float confidence; + float probability; +}; + +ftio_config +parse_command_line(int argc, char* argv[]) { + + ftio_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Cargo ftio client", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + + app.add_option("-c,--conf", cfg.confidence, "confidence") + ->option_text("float") + ->required(); + + app.add_option("-p,--probability", cfg.probability, "probability") + ->option_text("float") + ->default_str("-1.0"); + + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + ftio_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + network::client rpc_client{protocol}; + + if(const auto result = rpc_client.lookup(address); result.has_value()) { + const auto& endpoint = result.value(); + const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability); + + if(retval.has_value()) { + + auto error_code = int{retval.value()}; + + fmt::print("ftio_int RPC was successful!\n"); + fmt::print(" (server replied with: {})\n", error_code); + return EXIT_SUCCESS; + } + + fmt::print(stderr, "ftio_int RPC failed\n"); + return EXIT_FAILURE; + + } else { + fmt::print(stderr, "Failed to lookup address: {}\n", address); + return EXIT_FAILURE; + } + } catch(const std::exception& ex) { + fmt::print(stderr, "Error: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/src/master.cpp b/src/master.cpp index ae46240..2913f86 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -88,7 +88,12 @@ master_server::master_server(std::string name, std::string address, provider(m_network_engine, 0), m_mpi_listener_ess(thallium::xstream::create()), m_mpi_listener_ult(m_mpi_listener_ess->make_thread( - [this]() { mpi_listener_ult(); })) { + [this]() { mpi_listener_ult(); })), + m_ftio_listener_ess(thallium::xstream::create()), + m_ftio_listener_ult(m_ftio_listener_ess->make_thread( + [this]() { ftio_scheduling_ult(); })) + +{ #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); @@ -97,6 +102,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(transfer_status)); provider::define(EXPAND(bw_control)); provider::define(EXPAND(transfer_statuses)); + provider::define(EXPAND(ftio_int)); #undef EXPAND @@ -110,6 +116,10 @@ master_server::master_server(std::string name, std::string address, m_mpi_listener_ult = thallium::managed{}; m_mpi_listener_ess->join(); m_mpi_listener_ess = thallium::managed{}; + m_ftio_listener_ult->join(); + m_ftio_listener_ult = thallium::managed{}; + m_ftio_listener_ess->join(); + m_ftio_listener_ess = thallium::managed{}; }); } @@ -125,7 +135,7 @@ master_server::mpi_listener_ult() { if(!msg) { std::this_thread::sleep_for(10ms); - //thallium::thread::self().sleep(m_network_engine, 10); + // thallium::thread::self().sleep(m_network_engine, 10); continue; } @@ -163,6 +173,28 @@ master_server::mpi_listener_ult() { LOGGER_INFO("Exit"); } + +void +master_server::ftio_scheduling_ult() { + + + while(!m_shutting_down) { + + std::this_thread::sleep_for(1000ms); + // thallium::thread::self().sleep(m_network_engine, 10); + + + // Do something with the confidence and probability + if (ftio_changed) { + ftio_changed = false; + LOGGER_INFO("Confidence is {}, probability is {}", confidence, + probability); + } + } + + LOGGER_INFO("Shutting down."); +} + #define RPC_NAME() (__FUNCTION__) void @@ -417,4 +449,35 @@ master_server::transfer_statuses(const network::request& req, }); } + +void +master_server::ftio_int(const network::request& req, float conf, + float prob) { + using network::get_address; + using network::rpc_info; + using proto::generic_response; + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + confidence = conf; + probability = prob; + ftio_changed = true; + LOGGER_INFO("rpc {:>} body: {{confidence: {}, probability: {}}}", rpc, + conf, prob); + + // do the magic here + + // 1. Update the confidence and probability values inside cargo + + // Scheduling thread should be running and waiting for them + + // + + const auto resp = generic_response{rpc.id(), error_code::success}; + + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + + req.respond(resp); +} + } // namespace cargo diff --git a/src/master.hpp b/src/master.hpp index f671e3a..3c84ba7 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -44,6 +44,9 @@ private: void mpi_listener_ult(); + void + ftio_scheduling_ult(); + void ping(const network::request& req); @@ -66,11 +69,23 @@ private: void bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping); + + void + ftio_int(const network::request& req, float confidence, float probability); + private: // Dedicated execution stream for the MPI listener ULT thallium::managed m_mpi_listener_ess; // ULT for the MPI listener thallium::managed m_mpi_listener_ult; + // Dedicated execution stream for the ftio scheduler + thallium::managed m_ftio_listener_ess; + // ULT for the ftio scheduler + thallium::managed m_ftio_listener_ult; + // FTIO decision values (below 0, implies not used) + float confidence = -1.0f; + float probability = -1.0f; + bool ftio_changed = true; // Request manager request_manager m_request_manager; }; -- GitLab From c47cd4664cd01eb82bb97d7a1d2263cdf1be32cb Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 29 Feb 2024 14:58:57 +0100 Subject: [PATCH 02/19] Adding period --- cli/ftio.cpp | 7 ++++++- src/master.cpp | 21 +++++++++++---------- src/master.hpp | 9 +++++---- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/cli/ftio.cpp b/cli/ftio.cpp index 94713be..347d25d 100644 --- a/cli/ftio.cpp +++ b/cli/ftio.cpp @@ -34,6 +34,7 @@ struct ftio_config { std::string server_address; float confidence; float probability; + float period; }; ftio_config @@ -57,6 +58,10 @@ parse_command_line(int argc, char* argv[]) { ->option_text("float") ->default_str("-1.0"); + app.add_option("-t,--period", cfg.period, "period") + ->option_text("float") + ->required(); + try { app.parse(argc, argv); @@ -89,7 +94,7 @@ main(int argc, char* argv[]) { if(const auto result = rpc_client.lookup(address); result.has_value()) { const auto& endpoint = result.value(); - const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability); + const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, cfg.period); if(retval.has_value()) { diff --git a/src/master.cpp b/src/master.cpp index 2913f86..466159a 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -185,10 +185,10 @@ master_server::ftio_scheduling_ult() { // Do something with the confidence and probability - if (ftio_changed) { - ftio_changed = false; - LOGGER_INFO("Confidence is {}, probability is {}", confidence, - probability); + if (m_ftio_changed) { + m_ftio_changed = false; + LOGGER_INFO("Confidence is {}, probability is {} and period is {}", m_confidence, + m_probability, m_period); } } @@ -452,18 +452,19 @@ master_server::transfer_statuses(const network::request& req, void master_server::ftio_int(const network::request& req, float conf, - float prob) { + float prob, float period) { using network::get_address; using network::rpc_info; using proto::generic_response; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - confidence = conf; - probability = prob; - ftio_changed = true; - LOGGER_INFO("rpc {:>} body: {{confidence: {}, probability: {}}}", rpc, - conf, prob); + m_confidence = conf; + m_probability = prob; + m_period = period; + m_ftio_changed = true; + LOGGER_INFO("rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}", rpc, + conf, prob, period); // do the magic here diff --git a/src/master.hpp b/src/master.hpp index 3c84ba7..3874a89 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -71,7 +71,7 @@ private: void - ftio_int(const network::request& req, float confidence, float probability); + ftio_int(const network::request& req, float confidence, float probability, float period); private: // Dedicated execution stream for the MPI listener ULT @@ -83,9 +83,10 @@ private: // ULT for the ftio scheduler thallium::managed m_ftio_listener_ult; // FTIO decision values (below 0, implies not used) - float confidence = -1.0f; - float probability = -1.0f; - bool ftio_changed = true; + float m_confidence = -1.0f; + float m_probability = -1.0f; + float m_period = -1.0f; + bool m_ftio_changed = true; // Request manager request_manager m_request_manager; }; -- GitLab From ce0df018cfafb80b733b4c1dc9739a52848eefd9 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 1 Mar 2024 14:56:59 +0100 Subject: [PATCH 03/19] Added directory reading for FS Plugins, still bug with gekkofs writting, debugging --- src/master.cpp | 20 ++++----- .../posix_file/fs_plugin/fs_plugin.cpp | 1 + .../posix_file/fs_plugin/fs_plugin.hpp | 7 ++++ .../posix_file/fs_plugin/gekko_plugin.cpp | 42 +++++++++++++++++-- .../posix_file/fs_plugin/gekko_plugin.hpp | 6 +++ .../posix_file/fs_plugin/posix_plugin.cpp | 26 +++++++++++- .../posix_file/fs_plugin/posix_plugin.hpp | 8 ++++ 7 files changed, 95 insertions(+), 15 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index 466159a..648a1ea 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -66,7 +66,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno, } return std::make_tuple( - static_cast(cargo::tag::seq_mixed), + static_cast(cargo::tag::sequential), cargo::transfer_message{tid, seqno, input.path(), static_cast(input.get_type()), output.path(), @@ -290,15 +290,15 @@ master_server::transfer_datasets(const network::request& req, // bbb/xxx -> ttt/xxx const auto& p = s.path(); - std::vector files; - if(std::filesystem::is_directory(p)) { + std::vector files; + // Check stat of p using FSPlugin class + auto fs = FSPlugin::make_fs(static_cast(s.get_type())); + struct stat buf; + fs->stat(p, &buf); + if(buf.st_mode & S_IFDIR) { LOGGER_INFO("Expanding input directory {}", p); - for(const auto& f : - std::filesystem::recursive_directory_iterator(p)) { - if(std::filesystem::is_regular_file(f)) { - files.push_back(f.path()); - } - } + files = fs->readdir(p); + /* We have all the files expanded. Now create a new @@ -319,7 +319,7 @@ master_server::transfer_datasets(const network::request& req, } d_new.path(d.path() / std::filesystem::path( - f.string().substr(leading + 1))); + f.substr(leading + 1))); LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), d_new.path()); diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp index 7664aff..336e3c2 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -17,6 +17,7 @@ FSPlugin::make_fs(type t) { switch(t) { case type::posix: + case type::parallel: return std::make_unique(); #ifdef GEKKOFS_PLUGIN case type::gekkofs: diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp index c98e74a..1ecf75f 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp @@ -2,6 +2,7 @@ #define FS_PLUGIN_HPP #include +#include #include #include #include @@ -36,6 +37,12 @@ public: lseek(int fd, off_t offset, int whence) = 0; virtual off_t fallocate(int fd, int mode, off_t offset, off_t len) = 0; + virtual std::vector + readdir(const std::string& path) = 0; + virtual int + unlink(const std::string& path) = 0; + virtual int + stat(const std::string& path, struct stat* buf) = 0; }; } // namespace cargo #endif // FS_PLUGIN_HPP \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index 1ff0caa..e300b5a 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -7,14 +7,14 @@ namespace cargo { gekko_plugin::gekko_plugin() { int result = gkfs_init(); - if (result != 0) { + if(result != 0) { std::cerr << "Failed to initialize gekkofs" << std::endl; - } + } } gekko_plugin::~gekko_plugin() { int result = gkfs_end(); - if (result != 0) { + if(result != 0) { std::cerr << "Failed to finalize gekkofs" << std::endl; } } @@ -62,4 +62,40 @@ gekko_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { (void) len; return len; } + +int +gekko_plugin::unlink(const std::string& path) { + return gkfs::syscall::gkfs_remove(path); +} + + +std::vector +gekko_plugin::readdir(const std::string& path) { + // Fill recursively the files, checking if the file is a directory + std::vector files; + std::vector final_list; + files = gkfs::syscall::gkfs_get_file_list(path); + + for(auto& file : files) { + file = "/" + file; + struct stat buf; + stat(file, &buf); + if(S_ISDIR(buf.st_mode)) { + std::vector subfiles = readdir(file); + final_list.insert(final_list.end(), subfiles.begin(), + subfiles.end()); + } else { + final_list.push_back(file); + } + } + return final_list; +} + +// stat +int +gekko_plugin::stat(const std::string& path, struct stat* buf) { + return gkfs::syscall::gkfs_stat(path, buf); +} + + } // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp index 2d27132..b233282 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp @@ -25,6 +25,12 @@ public: // Fallocate is not needed in GekkoFS as pwrite takes care of it. off_t fallocate(int fd, int mode, off_t offset, off_t len) final; + std::vector + readdir(const std::string& path) final; + int + unlink(const std::string& path) final; + int + stat(const std::string& path, struct stat* buf) final; }; }; // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp index 4d0f134..92c7817 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp @@ -6,8 +6,7 @@ namespace cargo { -posix_plugin::posix_plugin() { -} +posix_plugin::posix_plugin() {} int @@ -45,4 +44,27 @@ posix_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { (void) mode; return ::posix_fallocate(fd, offset, static_cast(len)); } + +std::vector +posix_plugin::readdir(const std::string& path) { + std::vector files; + for(const auto& f : std::filesystem::recursive_directory_iterator(path)) { + if(std::filesystem::is_regular_file(f)) { + files.push_back(f.path()); + } + } + return files; +} + + +int +posix_plugin::unlink(const std::string& path) { + return ::unlink(path.c_str()); +} +int +posix_plugin::stat(const std::string& path, struct stat* buf) { + return ::stat(path.c_str(), buf); +} + + }; // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp index 0110284..dbd3541 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp @@ -24,6 +24,14 @@ public: lseek(int fd, off_t offset, int whence) final; off_t fallocate(int fd, int mode, off_t offset, off_t len) final; + std::vector + readdir(const std::string& path) final; + int + unlink(const std::string& path) final; + int + stat(const std::string& path, struct stat* buf) final; + + }; } // namespace cargo #endif // POSIX_PLUGIN_HPP -- GitLab From 50420484c9bafcafe7a78047495d6c1e473e4aaf Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Sun, 3 Mar 2024 21:19:07 +0100 Subject: [PATCH 04/19] Bug Solves (finishing gekkofs, corrupts hosts list) --- src/posix_file/posix_file/file.hpp | 11 ++++++++--- src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp | 10 +++++++--- src/worker/mpio_read.cpp | 6 +++++- 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 03d6fbe..8ee47ed 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -163,7 +163,7 @@ public: : m_path(std::move(filepath)) {} file(std::filesystem::path filepath, int fd, - std::unique_ptr fs_plugin) noexcept + std::shared_ptr fs_plugin) noexcept : m_path(std::move(filepath)), m_handle(fd), m_fs_plugin(std::move(fs_plugin)) {} @@ -203,6 +203,11 @@ public: } } + void + close() noexcept { + m_fs_plugin->close(m_handle.native()); + } + template std::size_t pread(MemoryBuffer&& buf, offset offset, std::size_t size) const { @@ -284,7 +289,7 @@ public: protected: const std::filesystem::path m_path; file_handle m_handle; - std::unique_ptr m_fs_plugin; + std::shared_ptr m_fs_plugin; }; @@ -292,7 +297,7 @@ static inline file open(const std::filesystem::path& filepath, int flags, ::mode_t mode, cargo::FSPlugin::type t) { - std::unique_ptr fs_plugin; + std::shared_ptr fs_plugin; fs_plugin = cargo::FSPlugin::make_fs(t); // We don't check if it exists, we just create it if flags is set to O_CREAT diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index e300b5a..7f05362 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -9,14 +9,18 @@ gekko_plugin::gekko_plugin() { int result = gkfs_init(); if(result != 0) { std::cerr << "Failed to initialize gekkofs" << std::endl; - } + } else { + std::cout << "Initialized gekkofs" << std::endl; + } } gekko_plugin::~gekko_plugin() { - int result = gkfs_end(); + /*int result = gkfs_end(); if(result != 0) { std::cerr << "Failed to finalize gekkofs" << std::endl; - } + } else { + std::cout << "Finalized gekkofs" << std::endl; + }*/ } // Override the open function int diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 92f0a05..0d8ccbf 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -47,6 +47,7 @@ mpio_read::operator()() { using posix_file::views::strided; m_status = error_code::transfer_in_progress; try { + const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); @@ -179,8 +180,10 @@ mpio_read::progress(int ongoing_index) { return index; } } - + // LOG indexes and sizes + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), file_range.size()); @@ -221,6 +224,7 @@ mpio_read::progress(int ongoing_index) { } m_status = error_code::success; + m_output_file->close(); return -1; } -- GitLab From f3d96593463a1d4a1d22e30b0e4f3c7f9ed38fe2 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 4 Mar 2024 07:25:22 +0100 Subject: [PATCH 05/19] Moved to static initialization of fs, avoids multiple init and finish --- src/posix_file/posix_file/file.hpp | 8 ++++---- src/posix_file/posix_file/fs_plugin/fs_plugin.cpp | 14 +++++++++++--- src/posix_file/posix_file/fs_plugin/fs_plugin.hpp | 4 +++- .../posix_file/fs_plugin/gekko_plugin.cpp | 8 ++------ 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 8ee47ed..f330bb6 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -155,7 +155,7 @@ class file { public: - file(cargo::FSPlugin::type t) { + explicit file(cargo::FSPlugin::type t) { m_fs_plugin = cargo::FSPlugin::make_fs(t); }; @@ -196,7 +196,7 @@ public: } int ret = m_fs_plugin->fallocate(m_handle.native(), mode, offset, - static_cast(len)); + static_cast(len)); if(ret == -1) { throw io_error("posix_file::file::fallocate", errno); @@ -305,9 +305,9 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode, if(flags & O_CREAT) { fs_plugin->mkdir(filepath.parent_path().c_str(), 0755); } - + int fd = fs_plugin->open(filepath.c_str(), flags, mode); - + if(fd == -1) { throw io_error("posix_file::open ", errno); } diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp index 336e3c2..fa39137 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -12,16 +12,24 @@ namespace cargo { -std::unique_ptr +static std::shared_ptr m_fs_posix; +static std::shared_ptr m_fs_gekkofs; + +std::shared_ptr FSPlugin::make_fs(type t) { switch(t) { case type::posix: case type::parallel: - return std::make_unique(); + if(m_fs_posix == nullptr) + m_fs_posix = std::make_shared(); + return m_fs_posix; #ifdef GEKKOFS_PLUGIN case type::gekkofs: - return std::make_unique(); + if(m_fs_gekkofs == nullptr) + m_fs_gekkofs = std::make_shared(); + return m_fs_gekkofs; + #endif #ifdef HERCULES_PLUGIN case type::hercules: diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp index 1ecf75f..3bead0c 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp @@ -19,7 +19,9 @@ public: expand, dataclay }; - static std::unique_ptr make_fs(type); + + static std::shared_ptr make_fs(type); + // One instance per fs type virtual ~FSPlugin() = default; diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index 7f05362..29dce45 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -9,18 +9,14 @@ gekko_plugin::gekko_plugin() { int result = gkfs_init(); if(result != 0) { std::cerr << "Failed to initialize gekkofs" << std::endl; - } else { - std::cout << "Initialized gekkofs" << std::endl; } } gekko_plugin::~gekko_plugin() { - /*int result = gkfs_end(); + int result = gkfs_end(); if(result != 0) { std::cerr << "Failed to finalize gekkofs" << std::endl; - } else { - std::cout << "Finalized gekkofs" << std::endl; - }*/ + } } // Override the open function int -- GitLab From fa560be03e7d9555e83ecd8a56e2337e2b23297c Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 6 Mar 2024 08:26:43 +0100 Subject: [PATCH 06/19] Reorder transfer op()()/progress(index) to seq them --- src/worker/worker.cpp | 46 ++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 0178b96..8a62527 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -52,8 +52,8 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, } void -update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - std::string name, cargo::transfer_state st, float bw, +update_state(int rank, std::uint64_t tid, std::uint32_t seqno, std::string name, + cargo::transfer_state st, float bw, std::optional ec = std::nullopt) { mpi::communicator world; @@ -115,11 +115,30 @@ worker::run() { auto op = I->second.first.get(); int index = I->second.second; if(op) { + if(index == -1) { + // operation not started + // Print error message + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::running, + -1.0f); + cargo::error_code ec = (*op)(); + if(ec != cargo::error_code::transfer_in_progress) { + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::failed, + -1.0f, ec); + I = m_ops.erase(I); + break; + } + + index = 0; + } + // Operation in progress index = op->progress(index); if(index == -1) { // operation finished cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), ec ? transfer_state::failed : transfer_state::completed, 0.0f, ec); @@ -129,8 +148,9 @@ worker::run() { } else { // update only if BW is set if(op->bw() > 0.0f) { - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), - transfer_state::running, op->bw()); + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::running, + op->bw()); } I->second.second = index; ++I; @@ -166,7 +186,7 @@ worker::run() { t, workers, m.input_path(), m.output_path(), m_block_size, m.i_type(), m.o_type()), - 0))); + -1))); const auto op = m_ops[make_pair(m.input_path(), m.output_path())] @@ -174,16 +194,8 @@ worker::run() { op->set_comm(msg->source(), m.tid(), m.seqno(), t); - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), - transfer_state::running, -1.0f); - // Different scenarios read -> write | write -> read - - cargo::error_code ec = (*op)(); - if(ec != cargo::error_code::transfer_in_progress) { - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), - transfer_state::failed, -1.0f, ec); - m_ops.erase(make_pair(m.input_path(), m.output_path())); - } + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::pending, -1.0f); break; } @@ -199,8 +211,6 @@ worker::run() { LOGGER_INFO("Operation non existent", msg->source(), m); } } - - break; } -- GitLab From 5e8efca30a9990950e9b737816449580415d0d99 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 6 Mar 2024 09:31:25 +0100 Subject: [PATCH 07/19] Full stage-out support for adhoc-fs (gekkoFS) --- src/posix_file/posix_file/file.hpp | 4 ++-- src/posix_file/posix_file/fs_plugin/fs_plugin.hpp | 2 ++ src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp | 10 ++++++++++ src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp | 2 ++ src/posix_file/posix_file/fs_plugin/posix_plugin.cpp | 4 ++++ src/posix_file/posix_file/fs_plugin/posix_plugin.hpp | 3 ++- src/worker/mpio_write.cpp | 8 +++++--- src/worker/seq_mixed.cpp | 6 +++--- src/worker/sequential.cpp | 7 +++++-- 9 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index f330bb6..4f1a347 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -180,12 +180,12 @@ public: std::size_t size() const noexcept { - return std::filesystem::file_size(m_path); + return m_fs_plugin->size(m_path); } auto remove() noexcept { - return std::filesystem::remove(m_path); + return m_fs_plugin->unlink(m_path); } void diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp index 3bead0c..495b10f 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp @@ -45,6 +45,8 @@ public: unlink(const std::string& path) = 0; virtual int stat(const std::string& path, struct stat* buf) = 0; + virtual ssize_t + size(const std::string& path) = 0; }; } // namespace cargo #endif // FS_PLUGIN_HPP \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index 29dce45..5fea7ba 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -97,5 +97,15 @@ gekko_plugin::stat(const std::string& path, struct stat* buf) { return gkfs::syscall::gkfs_stat(path, buf); } +ssize_t +gekko_plugin::size(const std::string& path) { + struct stat buf; + int res = gekko_plugin::stat(path, &buf); + if(res != 0) { + return -1; + } + return buf.st_size; +} + } // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp index b233282..fd4e5c2 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp @@ -31,6 +31,8 @@ public: unlink(const std::string& path) final; int stat(const std::string& path, struct stat* buf) final; + ssize_t + size(const std::string& path) final; }; }; // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp index 92c7817..f0ec739 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp @@ -66,5 +66,9 @@ posix_plugin::stat(const std::string& path, struct stat* buf) { return ::stat(path.c_str(), buf); } +ssize_t +posix_plugin::size(const std::string& path) { + return std::filesystem::file_size(path); +} }; // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp index dbd3541..c570488 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp @@ -31,7 +31,8 @@ public: int stat(const std::string& path, struct stat* buf) final; - + ssize_t + size(const std::string& path) final; }; } // namespace cargo #endif // POSIX_PLUGIN_HPP diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 44b8070..a2b6adc 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -41,7 +41,11 @@ mpio_write::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; - std::size_t file_size = std::filesystem::file_size(m_input_path); + // We need to open the file and ask size (using fs_plugin) + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + + std::size_t file_size = m_input_file->size(); // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); @@ -68,8 +72,6 @@ mpio_write::operator()() { block_size); } - m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); m_workers_size = workers_size; m_workers_rank = workers_rank; diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index b087122..aca9e4c 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -39,7 +39,9 @@ seq_mixed_operation::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; - std::size_t file_size = std::filesystem::file_size(m_input_path); + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + std::size_t file_size = m_input_file->size(); // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); @@ -66,8 +68,6 @@ seq_mixed_operation::operator()() { block_size); } - m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index b91d865..825b7e0 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -39,7 +39,10 @@ seq_operation::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; - std::size_t file_size = std::filesystem::file_size(m_input_path); + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + std::size_t file_size = m_input_file->size(); + // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); @@ -176,7 +179,7 @@ seq_operation::progress(int ongoing_index) { // We need to create the directory if it does not exists (using // FSPlugin) - if ( write and ongoing_index == 0 ) { + if(write and ongoing_index == 0) { m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); -- GitLab From f5b0b9ed1d7ba15d06b9c756b8e0c9aed14df8f5 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 6 Mar 2024 17:42:37 +0100 Subject: [PATCH 08/19] First prototype FTIO - ADHOC - CARGO integration --- src/master.cpp | 226 +++++++++++++++--- src/master.hpp | 35 ++- src/posix_file/posix_file/file.hpp | 5 +- .../posix_file/fs_plugin/gekko_plugin.cpp | 15 +- src/request_manager.cpp | 22 ++ src/request_manager.hpp | 3 + src/worker/mpio_read.cpp | 4 +- src/worker/sequential.cpp | 2 +- tests/posix_file_tests.cpp | 2 +- 9 files changed, 272 insertions(+), 42 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index 648a1ea..f6c7b2e 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -177,18 +177,58 @@ master_server::mpi_listener_ult() { void master_server::ftio_scheduling_ult() { - while(!m_shutting_down) { - std::this_thread::sleep_for(1000ms); - // thallium::thread::self().sleep(m_network_engine, 10); + if(!m_pending_transfer.m_work or m_period < 0.0f) { + std::this_thread::sleep_for(1000ms); + } // Do something with the confidence and probability - if (m_ftio_changed) { + + if(m_ftio_changed) { m_ftio_changed = false; - LOGGER_INFO("Confidence is {}, probability is {} and period is {}", m_confidence, - m_probability, m_period); + LOGGER_INFO("Confidence is {}, probability is {} and period is {}", + m_confidence, m_probability, m_period); + } + + if(!m_pending_transfer.m_work) + continue; + + LOGGER_INFO("Waiting period : {}", m_period); + std::this_thread::sleep_for( + std::chrono::seconds((int)(m_period ))); + + LOGGER_INFO("Checking if there is work to do in {}", + m_pending_transfer.m_sources); + transfer_dataset_internal(m_pending_transfer); + // This launches the workers to do the work... + // We wait until this transfer is finished + LOGGER_INFO("Transferring : {}", m_pending_transfer.m_expanded_sources); + bool finished = false; + while(!finished) { + std::this_thread::sleep_for(10ms); + m_request_manager.lookup(m_pending_transfer.m_p.tid()) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Failed to lookup request: {}", ec); + }) + .map([&](auto&& rs) { + if(rs.state() == transfer_state::completed) { + finished = true; + } + }); + } + + if(finished) { + // Delete all source files + LOGGER_INFO("Transfer finished for {}", + m_pending_transfer.m_expanded_sources); + auto fs = FSPlugin::make_fs(cargo::FSPlugin::type::gekkofs); + for(auto& file : m_pending_transfer.m_expanded_sources) { + LOGGER_INFO("Deleting {}", file.path()); + // We need to use gekkofs to delete + fs->unlink(file.path()); + } } } @@ -252,6 +292,120 @@ master_server::shutdown(const network::request& req) { server::shutdown(); } +// Function that gets a pending_request, fills the request and sends the mpi +// message for the transfer We only put files that has mtime < actual +// timestamp , intended for stage-out and ftio +void +master_server::transfer_dataset_internal(pending_transfer& pt) { + + mpi::communicator world; + std::vector v_s_new; + std::vector v_d_new; + time_t now = time(0); + now = now - 5; // Threshold for mtime + for(auto i = 0u; i < pt.m_sources.size(); ++i) { + + const auto& s = pt.m_sources[i]; + const auto& d = pt.m_targets[i]; + + // We need to expand directories to single files on the s + // Then create a new message for each file and append the + // file to the d prefix + // We will asume that the path is the original absolute + // The prefix selects the method of transfer + // And if not specified then we will use none + // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then + // bbb/xxx -> ttt/xxx + const auto& p = s.path(); + + std::vector files; + // Check stat of p using FSPlugin class + auto fs = FSPlugin::make_fs( + static_cast(s.get_type())); + struct stat buf; + fs->stat(p, &buf); + if(buf.st_mode & S_IFDIR) { + LOGGER_INFO("Expanding input directory {}", p); + files = fs->readdir(p); + + /* + We have all the files expanded. Now create a new + cargo::dataset for each file as s and a new + cargo::dataset appending the base directory in d to the + file name. + */ + for(const auto& f : files) { + cargo::dataset s_new(s); + cargo::dataset d_new(d); + s_new.path(f); + // We need to get filename from the original root + // path (d.path) plus the path from f, removing the + // initial path p (taking care of the trailing /) + auto leading = p.size(); + if(leading > 0 and p.back() == '/') { + leading--; + } + + d_new.path(d.path() / + std::filesystem::path(f.substr(leading + 1))); + + LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), + d_new.path()); + fs->stat(s_new.path(), &buf); + if(buf.st_mtime < now) { + v_s_new.push_back(s_new); + v_d_new.push_back(d_new); + } + } + } else { + fs->stat(s.path(), &buf); + if(buf.st_mtime < now) { + v_s_new.push_back(s); + v_d_new.push_back(d); + } + } + } + + // empty m_expanded_sources + pt.m_expanded_sources.assign(v_s_new.begin(), v_s_new.end()); + pt.m_expanded_targets.assign(v_d_new.begin(), v_d_new.end()); + + // We have two vectors, so we process the transfer + // [1] Update request_manager + // [2] Send message to worker + + auto ec = m_request_manager.update(pt.m_p.tid(), v_s_new.size(), + pt.m_p.nworkers()); + if(ec != error_code::success) { + LOGGER_ERROR("Failed to update request: {}", ec); + return; + }; + + assert(v_s_new.size() == v_d_new.size()); + + // For all the transfers + for(std::size_t i = 0; i < v_s_new.size(); ++i) { + const auto& s = v_s_new[i]; + const auto& d = v_d_new[i]; + + // Create the directory if it does not exist (only in + // parallel transfer) + if(!std::filesystem::path(d.path()).parent_path().empty() and + d.supports_parallel_transfer()) { + std::filesystem::create_directories( + std::filesystem::path(d.path()).parent_path()); + } + + + // Send message to worker + for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { + const auto [t, m] = make_message(pt.m_p.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } + } +} + void master_server::transfer_datasets(const network::request& req, const std::vector& sources, @@ -268,8 +422,8 @@ master_server::transfer_datasets(const network::request& req, targets); - // As we accept directories expanding directories should be done before and - // update sources and targets. + // As we accept directories expanding directories should be done before + // and update sources and targets. std::vector v_s_new; std::vector v_d_new; @@ -291,14 +445,15 @@ master_server::transfer_datasets(const network::request& req, const auto& p = s.path(); std::vector files; - // Check stat of p using FSPlugin class - auto fs = FSPlugin::make_fs(static_cast(s.get_type())); + // Check stat of p using FSPlugin class + auto fs = FSPlugin::make_fs( + static_cast(s.get_type())); struct stat buf; fs->stat(p, &buf); if(buf.st_mode & S_IFDIR) { LOGGER_INFO("Expanding input directory {}", p); files = fs->readdir(p); - + /* We have all the files expanded. Now create a new @@ -318,8 +473,8 @@ master_server::transfer_datasets(const network::request& req, leading--; } - d_new.path(d.path() / std::filesystem::path( - f.substr(leading + 1))); + d_new.path(d.path() / + std::filesystem::path(f.substr(leading + 1))); LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), d_new.path()); @@ -342,7 +497,7 @@ master_server::transfer_datasets(const network::request& req, .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); - // For all the files + // For all the transfers for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; @@ -357,10 +512,28 @@ master_server::transfer_datasets(const network::request& req, std::filesystem::path(d.path()).parent_path()); } - for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { - const auto [t, m] = make_message(r.tid(), i, s, d); - LOGGER_INFO("msg <= to: {} body: {}", rank, m); - world.send(static_cast(rank), t, m); + // If we are not using ftio start transfer if we are on + // stage-out + if(m_ftio) { + // If we are on stage-out + if(s.get_type() == cargo::dataset::type::gekkofs) { + + // We have only one pendingTransfer for FTIO + // that can be updated, the issue is that we + // need the tid. + m_pending_transfer.m_p = r; + m_pending_transfer.m_sources = sources; + m_pending_transfer.m_targets = targets; + m_pending_transfer.m_work = true; + LOGGER_INFO("Stored stage-out information"); + } + } else { + for(std::size_t rank = 1; rank <= r.nworkers(); + ++rank) { + const auto [t, m] = make_message(r.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } } } LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, @@ -451,8 +624,8 @@ master_server::transfer_statuses(const network::request& req, void -master_server::ftio_int(const network::request& req, float conf, - float prob, float period) { +master_server::ftio_int(const network::request& req, float conf, float prob, + float period) { using network::get_address; using network::rpc_info; using proto::generic_response; @@ -463,16 +636,11 @@ master_server::ftio_int(const network::request& req, float conf, m_probability = prob; m_period = period; m_ftio_changed = true; - LOGGER_INFO("rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}", rpc, - conf, prob, period); - - // do the magic here - - // 1. Update the confidence and probability values inside cargo - - // Scheduling thread should be running and waiting for them + m_ftio = true; - // + LOGGER_INFO( + "rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}", + rpc, conf, prob, period); const auto resp = generic_response{rpc.id(), error_code::success}; diff --git a/src/master.hpp b/src/master.hpp index 3874a89..5d88da7 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -28,9 +28,25 @@ #include "net/server.hpp" #include "cargo.hpp" #include "request_manager.hpp" +#include "parallel_request.hpp" namespace cargo { +class pending_transfer { +public: + pending_transfer():m_p(cargo::parallel_request(0,0,0)) { + m_work = false; + } + + bool m_work; + cargo::parallel_request m_p; + std::vector m_sources; + std::vector m_targets; + // Expanded sources and targets (those that are being processed by the worker) + std::vector m_expanded_sources; + std::vector m_expanded_targets; +}; + class master_server : public network::server, public network::provider { public: @@ -64,14 +80,16 @@ private: void transfer_statuses(const network::request& req, std::uint64_t tid); - // Receives a request to increase or decrease BW + // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void - bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping); + bw_control(const network::request& req, std::uint64_t tid, + std::int16_t shaping); - void - ftio_int(const network::request& req, float confidence, float probability, float period); + void + ftio_int(const network::request& req, float confidence, float probability, + float period); private: // Dedicated execution stream for the MPI listener ULT @@ -87,6 +105,15 @@ private: float m_probability = -1.0f; float m_period = -1.0f; bool m_ftio_changed = true; + // FTIO enabled flag, we need to call ftio once. + bool m_ftio = false; + + + pending_transfer m_pending_transfer; + + + void + transfer_dataset_internal(pending_transfer& pt); // Request manager request_manager m_request_manager; }; diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 4f1a347..0eb65d9 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -159,8 +159,9 @@ public: m_fs_plugin = cargo::FSPlugin::make_fs(t); }; - explicit file(std::filesystem::path filepath) noexcept - : m_path(std::move(filepath)) {} + explicit file(std::filesystem::path filepath, + cargo::FSPlugin::type t) noexcept + : m_path(std::move(filepath)), m_fs_plugin(cargo::FSPlugin::make_fs(t)) {} file(std::filesystem::path filepath, int fd, std::shared_ptr fs_plugin) noexcept diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index 5fea7ba..87b7e9a 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -77,17 +77,24 @@ gekko_plugin::readdir(const std::string& path) { files = gkfs::syscall::gkfs_get_file_list(path); for(auto& file : files) { - file = "/" + file; + struct stat buf; - stat(file, &buf); + stat("/" + file, &buf); if(S_ISDIR(buf.st_mode)) { - std::vector subfiles = readdir(file); + + std::vector subfiles = readdir("/" + file); final_list.insert(final_list.end(), subfiles.begin(), subfiles.end()); } else { - final_list.push_back(file); + + if(path.size() != 1) { + final_list.push_back(path + "/" + file); + } else { + final_list.push_back("/" + file); + } } } + return final_list; } diff --git a/src/request_manager.cpp b/src/request_manager.cpp index 567446d..0f62302 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -55,6 +55,28 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { return parallel_request{tid, nfiles, nworkers}; } +/** + * @brief Update the request for ftio processing (as it is modified by readdir) + * + * @param request + * @param nfiles + * @param nworkers + * @return error_code + */ +error_code +request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers){ + abt::unique_lock lock(m_mutex); + + if(const auto it = m_requests.find(tid); it != m_requests.end()) { + it->second.resize(nfiles, std::vector{nworkers}); + return error_code::success; + + } + LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); + return error_code::no_such_transfer; +} + + error_code request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, diff --git a/src/request_manager.hpp b/src/request_manager.hpp index 46bedda..c813d45 100644 --- a/src/request_manager.hpp +++ b/src/request_manager.hpp @@ -58,6 +58,9 @@ public: tl::expected create(std::size_t nfiles, std::size_t nworkers); + error_code + update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers); + error_code update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 0d8ccbf..ad741c5 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -168,9 +168,11 @@ mpio_read::progress(int ongoing_index) { using posix_file::views::strided; try { int index = 0; + // TODO : FS not defined... + m_status = error_code::transfer_in_progress; for(const auto& file_range : - all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + all_of(posix_file::file{m_input_path, m_fs_i_type}) | as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { ++index; diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 825b7e0..dbd7ae3 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -190,7 +190,7 @@ seq_operation::progress(int ongoing_index) { int index = 0; m_status = error_code::transfer_in_progress; for(const auto& file_range : - all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + all_of(posix_file::file{m_input_path, m_fs_i_type}) | as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { ++index; diff --git a/tests/posix_file_tests.cpp b/tests/posix_file_tests.cpp index 55265a1..d0346b6 100644 --- a/tests/posix_file_tests.cpp +++ b/tests/posix_file_tests.cpp @@ -49,7 +49,7 @@ struct StringMaker { struct scoped_file : public posix_file::file { explicit scoped_file(std::filesystem::path filepath) - : posix_file::file(std::move(filepath)) {} + : posix_file::file(std::move(filepath), cargo::FSPlugin::type::posix) {} ~scoped_file() { remove(); -- GitLab From 1e3636c5c924e082f5819caa0d7005c5bdd9b55f Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 7 Mar 2024 15:59:35 +0100 Subject: [PATCH 09/19] Solved testing, still unkown why there are two conflicting definitions of scoped_file --- tests/posix_file_tests.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/posix_file_tests.cpp b/tests/posix_file_tests.cpp index d0346b6..ee83313 100644 --- a/tests/posix_file_tests.cpp +++ b/tests/posix_file_tests.cpp @@ -48,8 +48,8 @@ struct StringMaker { struct scoped_file : public posix_file::file { - explicit scoped_file(std::filesystem::path filepath) - : posix_file::file(std::move(filepath), cargo::FSPlugin::type::posix) {} + explicit scoped_file(std::filesystem::path filepath, cargo::FSPlugin::type type) + : posix_file::file(std::move(filepath), type) {} ~scoped_file() { remove(); @@ -72,7 +72,7 @@ create_temporary_file(std::size_t desired_size) { abort(); } - return scoped_file{std::filesystem::path{name}}; + return scoped_file{std::filesystem::path{name}, cargo::FSPlugin::type::posix}; } std::vector -- GitLab From d0605d9af1a008945ed92b01dd2ab4a5df173477 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 12 Mar 2024 14:25:33 +0100 Subject: [PATCH 10/19] Be able to change period --- src/master.cpp | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/master.cpp b/src/master.cpp index f6c7b2e..a9f81da 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -196,8 +196,17 @@ master_server::ftio_scheduling_ult() { continue; LOGGER_INFO("Waiting period : {}", m_period); + // Wait in small periods, just in case we change it, This should be mutexed... + auto elapsed = m_period; + while (elapsed > 0) { std::this_thread::sleep_for( - std::chrono::seconds((int)(m_period ))); + std::chrono::seconds((int)(1))); + elapsed -= 1; + if (m_ftio_changed) { + elapsed = m_period; + m_ftio_changed = false; + } + } LOGGER_INFO("Checking if there is work to do in {}", m_pending_transfer.m_sources); -- GitLab From 9173c21c0137c618c343e8844269f4a41b42580b Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 21 Mar 2024 17:37:41 +0100 Subject: [PATCH 11/19] DataClay User Lib support (stage-in) --- CMakeLists.txt | 8 ++ cli/copy.cpp | 2 + cmake/FindDataClay.cmake | 60 +++++++++++ cmake/FindExpand.cmake | 18 ++-- cmake/FindGekkoFS.cmake | 18 ++-- cmake/FindHercules.cmake | 18 ++-- src/posix_file/CMakeLists.txt | 10 +- .../posix_file/fs_plugin/dataclay_plugin.cpp | 101 ++++++++++++++++++ .../posix_file/fs_plugin/dataclay_plugin.hpp | 39 +++++++ .../posix_file/fs_plugin/fs_plugin.cpp | 12 ++- 10 files changed, 250 insertions(+), 36 deletions(-) create mode 100644 cmake/FindDataClay.cmake create mode 100644 src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp create mode 100644 src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 5fffd32..e67b9af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -281,6 +281,14 @@ if (Expand_FOUND) message(STATUS "[${PROJECT_NAME}] Found Expand") endif() +### DataClay: Optional for DataClay +find_package(DataClay) +if (DataClay_FOUND) + add_compile_definitions(DATACLAY_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found DataClay") +endif() + + ### Threads: required by ASIO find_package(Threads REQUIRED) diff --git a/cli/copy.cpp b/cli/copy.cpp index f55a402..dcccfa2 100644 --- a/cli/copy.cpp +++ b/cli/copy.cpp @@ -78,6 +78,7 @@ parse_command_line(int argc, char* argv[]) { "Flags for input datasets. Accepted values\n" " - posix: read data using POSIX (default)\n" " - parallel: read data using MPI-IO\n" + " - dataclay: read data using DATACLAY\n" " - gekkofs: read data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, @@ -87,6 +88,7 @@ parse_command_line(int argc, char* argv[]) { "Flags for output datasets. Accepted values\n" " - posix: write data using POSIX (default)\n" " - parallel: write data using MPI-IO\n" + " - dataclay: write data using DATACLAY\n" " - gekkofs: write data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, diff --git a/cmake/FindDataClay.cmake b/cmake/FindDataClay.cmake new file mode 100644 index 0000000..b60a0d2 --- /dev/null +++ b/cmake/FindDataClay.cmake @@ -0,0 +1,60 @@ +################################################################################ +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of cargo. # +# # +# cargo is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# cargo is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with cargo. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(DataClay_INCLUDE_DIR + NAMES dataclayplugin.h +) + +find_library(DataClay_LIBRARY + NAMES dataclay-plugin/libdataclayplugin.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + DataClay + DEFAULT_MSG + DataClay_INCLUDE_DIR + DataClay_LIBRARY +) + +if(DataClay_FOUND) + set(DataClay_LIBRARIES ${DataClay_LIBRARY}) + set(DataClay_INCLUDE_DIRS ${DataClay_INCLUDE_DIR}) + + + if(NOT TARGET DataClay::DataClay) + add_library(DataClay::DataClay UNKNOWN IMPORTED) + set_target_properties(DataClay::DataClay PROPERTIES + IMPORTED_LOCATION "${DataClay_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${DataClay_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + DataClay_INCLUDE_DIR + DataClay_LIBRARY +) diff --git a/cmake/FindExpand.cmake b/cmake/FindExpand.cmake index d780604..bcf79f3 100644 --- a/cmake/FindExpand.cmake +++ b/cmake/FindExpand.cmake @@ -1,27 +1,23 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # # # -# This software was partially supported by the # -# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # # # -# This software was partially supported by the # -# ADA-FS project under the SPPEXA project funded by the DFG. # +# This file is part of cargo. # # # -# This file is part of GekkoFS. # -# # -# GekkoFS is free software: you can redistribute it and/or modify # +# cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # -# GekkoFS is distributed in the hope that it will be useful, # +# cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # -# along with GekkoFS. If not, see . # +# along with cargo. If not, see . # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ diff --git a/cmake/FindGekkoFS.cmake b/cmake/FindGekkoFS.cmake index 7aebff4..26f05cf 100644 --- a/cmake/FindGekkoFS.cmake +++ b/cmake/FindGekkoFS.cmake @@ -1,27 +1,23 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # # # -# This software was partially supported by the # -# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # # # -# This software was partially supported by the # -# ADA-FS project under the SPPEXA project funded by the DFG. # +# This file is part of cargo. # # # -# This file is part of GekkoFS. # -# # -# GekkoFS is free software: you can redistribute it and/or modify # +# cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # -# GekkoFS is distributed in the hope that it will be useful, # +# cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # -# along with GekkoFS. If not, see . # +# along with cargo. If not, see . # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ diff --git a/cmake/FindHercules.cmake b/cmake/FindHercules.cmake index 2f877ab..1654e4f 100644 --- a/cmake/FindHercules.cmake +++ b/cmake/FindHercules.cmake @@ -1,27 +1,23 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # # # -# This software was partially supported by the # -# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # # # -# This software was partially supported by the # -# ADA-FS project under the SPPEXA project funded by the DFG. # +# This file is part of cargo. # # # -# This file is part of GekkoFS. # -# # -# GekkoFS is free software: you can redistribute it and/or modify # +# cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # -# GekkoFS is distributed in the hope that it will be useful, # +# cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # -# along with GekkoFS. If not, see . # +# along with cargo. If not, see . # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ diff --git a/src/posix_file/CMakeLists.txt b/src/posix_file/CMakeLists.txt index f539920..68446ad 100644 --- a/src/posix_file/CMakeLists.txt +++ b/src/posix_file/CMakeLists.txt @@ -32,9 +32,11 @@ if (Expand_FOUND) set(EXPAND_INCLUDES posix_file/fs_plugin/expand_plugin.hpp posix_file/fs_plugin/expand_plugin.cpp) endif() if (Hercules_FOUND) - set(HERCULES_INCLUDES posix_file/fs_plugin/hercules_plugin.chpp posix_file/fs_plugin/hercules_plugin.cpp) + set(HERCULES_INCLUDES posix_file/fs_plugin/hercules_plugin.hpp posix_file/fs_plugin/hercules_plugin.cpp) +endif() +if (DataClay_FOUND) + set(DATACLAY_INCLUDES posix_file/fs_plugin/dataclay_plugin.hpp posix_file/fs_plugin/dataclay_plugin.cpp) endif() - target_sources( posix_file @@ -52,6 +54,7 @@ target_sources( ${GEKKO_INCLUDES} ${HERCULES_INCLUDES} ${EXPAND_INCLUDES} + ${DATACLAY_INCLUDES} ) @@ -73,6 +76,9 @@ if (Hercules_FOUND) set(ADHOC ${ADHOC} Hercules::Hercules) endif() +if (DataClay_FOUND) + set(ADHOC ${ADHOC} DataClay::DataClay) +endif() target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected PRIVATE ${ADHOC}) diff --git a/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp new file mode 100644 index 0000000..658cca2 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp @@ -0,0 +1,101 @@ + +#include "fs_plugin.hpp" +#include "dataclay_plugin.hpp" +extern "C" { +#include +} +#include +namespace cargo { +dataclay_plugin::dataclay_plugin() { + ::dataclay_plugin("cargo"); +} + +dataclay_plugin::~dataclay_plugin() {} +// Override the open function +int +dataclay_plugin::open(const std::string& path, int flags, unsigned int mode) { + // Call to dataclayfs has the signature inverted + return dataclay_open(path.c_str(), flags, mode); +} + +// Override the pread function +ssize_t +dataclay_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return dataclay_pread(fd, (char*) buf, count, offset); +} + +// Override the pwrite function +ssize_t +dataclay_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + int result = dataclay_pwrite(fd, (char*) buf, count, offset); + return result; +} + + +bool +dataclay_plugin::mkdir(const std::string& path, mode_t mode) { + // int result = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); + (void) path; + (void) mode; + return true; // We don't have directories +} + +bool +dataclay_plugin::close(int fd) { + dataclay_close(fd); + return true; +} + +off_t +dataclay_plugin::lseek(int fd, off_t offset, int whence) { + (void) fd; + (void) offset; + (void) whence; + std::cerr << "dataclay_plugin lseek not supported" << std::endl; + return 0; +} + +off_t +dataclay_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + std::cerr << "dataclay_plugin fallocate not supported" << std::endl; + return len; +} + +int +dataclay_plugin::unlink(const std::string& path) { + + (void) path; + std::cerr << "dataclay_plugin unlink not supported" << std::endl; + return 0; +} + + +std::vector +dataclay_plugin::readdir(const std::string& path) { + (void) path; + std::cerr << "dataclay_plugin readdir not supported" << std::endl; + return {}; +} + +// stat +int +dataclay_plugin::stat(const std::string& path, struct stat* buf) { + (void) path; + (void) buf; + std::cerr << "dataclay_plugin stat not supported" << std::endl; + return 0; +} + +ssize_t +dataclay_plugin::size(const std::string& path) { + (void) path; + std::cerr << "dataclay_plugin size not supported" << std::endl; + return 0; +} + + +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp new file mode 100644 index 0000000..2d18de5 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp @@ -0,0 +1,39 @@ + +#ifndef DATACLAY_PLUGIN_HPP +#define DATACLAY_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class dataclay_plugin : public FSPlugin { + +public: + dataclay_plugin(); + ~dataclay_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + // Fallocate is not needed in dataclay as pwrite takes care of it. + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; + std::vector + readdir(const std::string& path) final; + int + unlink(const std::string& path) final; + int + stat(const std::string& path, struct stat* buf) final; + ssize_t + size(const std::string& path) final; +}; +}; // namespace cargo + +#endif // dataclay_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp index fa39137..f69d5cd 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -9,11 +9,14 @@ #ifdef EXPAND_PLUGIN #include "expand_plugin.hpp" #endif - +#ifdef DATACLAY_PLUGIN +#include "dataclay_plugin.hpp" +#endif namespace cargo { static std::shared_ptr m_fs_posix; static std::shared_ptr m_fs_gekkofs; +static std::shared_ptr m_fs_dataclay; std::shared_ptr FSPlugin::make_fs(type t) { @@ -30,6 +33,13 @@ FSPlugin::make_fs(type t) { m_fs_gekkofs = std::make_shared(); return m_fs_gekkofs; +#endif +#ifdef DATACLAY_PLUGIN + case type::dataclay: + if(m_fs_dataclay == nullptr) + m_fs_dataclay = std::make_shared(); + return m_fs_dataclay; + #endif #ifdef HERCULES_PLUGIN case type::hercules: -- GitLab From 9e6047635d1d1721a7bcaaaf0fab1766a45fa78c Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 22 Mar 2024 13:09:44 +0100 Subject: [PATCH 12/19] Dataclay minimal support integrated --- CMakeLists.txt | 3 +++ README.md | 4 ++-- cmake/FindDataClay.cmake | 17 +++++++++++++++-- .../posix_file/fs_plugin/dataclay_plugin.cpp | 11 ++++------- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e67b9af..5c80035 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -285,10 +285,13 @@ endif() find_package(DataClay) if (DataClay_FOUND) add_compile_definitions(DATACLAY_PLUGIN) + add_compile_definitions(DataClay_PATH="${DataClay_MODEL_DIR}/dataclay-plugin") message(STATUS "[${PROJECT_NAME}] Found DataClay") endif() + + ### Threads: required by ASIO find_package(Threads REQUIRED) diff --git a/README.md b/README.md index c43948a..ad2f739 100644 --- a/README.md +++ b/README.md @@ -190,8 +190,8 @@ cli/ccp --server ofi+tcp://127.0.0.1:62000 --input /directory/subdir --output /d `--if or --of` can be: posix, gekkofs, hercules, dataclay, expand and parallel (for MPIIO requests, but only one side is allowed). Typically you should use posix or parallel and then one specialized adhocfs. Posix is also able to be used with LD_PRELOAD, however -higher performance and flexibility can be obtained using the specific configuration. +higher performance and flexibility can be obtained using the specific configuration. Some backends are only available with directory support for stage-in. On the other hand, MPIIO (parallel) uses normally file locking so there is a performance imapact, and posix is faster (we supose no external modifications are done). -Other commands are `ping`, `shutdown` and `shaping` (for bw control). \ No newline at end of file +Other commands are `ping`, `shutdown`, `shaping` (for bw control) and `cargo_ftio` to interactions with ftio (stage-out and gekkofs) \ No newline at end of file diff --git a/cmake/FindDataClay.cmake b/cmake/FindDataClay.cmake index b60a0d2..2008f6a 100644 --- a/cmake/FindDataClay.cmake +++ b/cmake/FindDataClay.cmake @@ -25,8 +25,20 @@ find_path(DataClay_INCLUDE_DIR NAMES dataclayplugin.h + PREFIX dataclay-plugin ) +find_path(DataClay_MODEL_DIR + NAMES client.py + PREFIX dataclay-plugin +) +message(STATUS "[${PROJECT_NAME}] DataClay library MODEL DIR ${DataClay_MODEL_DIR}") + +find_package(Python3 REQUIRED Development) +message(STATUS "[${PROJECT_NAME}] DataClay library needs Python include ${Python3_INCLUDE_DIRS}") + + + find_library(DataClay_LIBRARY NAMES dataclay-plugin/libdataclayplugin.so ) @@ -37,18 +49,19 @@ find_package_handle_standard_args( DEFAULT_MSG DataClay_INCLUDE_DIR DataClay_LIBRARY + DataClay_MODEL_DIR ) if(DataClay_FOUND) set(DataClay_LIBRARIES ${DataClay_LIBRARY}) - set(DataClay_INCLUDE_DIRS ${DataClay_INCLUDE_DIR}) + set(DataClay_INCLUDE_DIRS ${DataClay_INCLUDE_DIR} ) if(NOT TARGET DataClay::DataClay) add_library(DataClay::DataClay UNKNOWN IMPORTED) set_target_properties(DataClay::DataClay PROPERTIES IMPORTED_LOCATION "${DataClay_LIBRARY}" - INTERFACE_INCLUDE_DIRECTORIES "${DataClay_INCLUDE_DIR}" + INTERFACE_INCLUDE_DIRECTORIES "${DataClay_INCLUDE_DIR};${Python3_INCLUDE_DIRS}" ) endif() endif() diff --git a/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp index 658cca2..7181be2 100644 --- a/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp @@ -7,15 +7,15 @@ extern "C" { #include namespace cargo { dataclay_plugin::dataclay_plugin() { - ::dataclay_plugin("cargo"); + ::dataclay_plugin("cargo", DataClay_PATH); + std::cout << "dataclay_plugin loaded" << std::endl; } dataclay_plugin::~dataclay_plugin() {} // Override the open function int dataclay_plugin::open(const std::string& path, int flags, unsigned int mode) { - // Call to dataclayfs has the signature inverted - return dataclay_open(path.c_str(), flags, mode); + return dataclay_open((char *)path.c_str(), flags, mode); } // Override the pread function @@ -27,14 +27,12 @@ dataclay_plugin::pread(int fd, void* buf, size_t count, off_t offset) { // Override the pwrite function ssize_t dataclay_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { - int result = dataclay_pwrite(fd, (char*) buf, count, offset); - return result; + return dataclay_pwrite(fd, (char*) buf, count, offset); } bool dataclay_plugin::mkdir(const std::string& path, mode_t mode) { - // int result = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); (void) path; (void) mode; return true; // We don't have directories @@ -61,7 +59,6 @@ dataclay_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { (void) mode; (void) offset; (void) len; - std::cerr << "dataclay_plugin fallocate not supported" << std::endl; return len; } -- GitLab From 5b3d7d3bb56959ca75336310ac22583dcf715eb8 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 27 Mar 2024 11:17:04 +0100 Subject: [PATCH 13/19] Solved : pending/completed, ccp without files plus gekkofs library --- src/master.cpp | 44 ++++++++++--------- .../posix_file/fs_plugin/gekko_plugin.cpp | 4 +- src/request_manager.cpp | 33 +++++++------- 3 files changed, 41 insertions(+), 40 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index a9f81da..276ea68 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -181,7 +181,7 @@ master_server::ftio_scheduling_ult() { if(!m_pending_transfer.m_work or m_period < 0.0f) { std::this_thread::sleep_for(1000ms); - } + } // Do something with the confidence and probability @@ -196,13 +196,13 @@ master_server::ftio_scheduling_ult() { continue; LOGGER_INFO("Waiting period : {}", m_period); - // Wait in small periods, just in case we change it, This should be mutexed... + // Wait in small periods, just in case we change it, This should be + // mutexed... auto elapsed = m_period; - while (elapsed > 0) { - std::this_thread::sleep_for( - std::chrono::seconds((int)(1))); + while(elapsed > 0) { + std::this_thread::sleep_for(std::chrono::seconds((int) (1))); elapsed -= 1; - if (m_ftio_changed) { + if(m_ftio_changed) { elapsed = m_period; m_ftio_changed = false; } @@ -231,7 +231,7 @@ master_server::ftio_scheduling_ult() { if(finished) { // Delete all source files LOGGER_INFO("Transfer finished for {}", - m_pending_transfer.m_expanded_sources); + m_pending_transfer.m_expanded_sources); auto fs = FSPlugin::make_fs(cargo::FSPlugin::type::gekkofs); for(auto& file : m_pending_transfer.m_expanded_sources) { LOGGER_INFO("Deleting {}", file.path()); @@ -505,7 +505,19 @@ master_server::transfer_datasets(const network::request& req, }) .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); - + if(m_ftio) { + if(sources[0].get_type() == cargo::dataset::type::gekkofs) { + + // We have only one pendingTransfer for FTIO + // that can be updated, the issue is that we + // need the tid. + m_pending_transfer.m_p = r; + m_pending_transfer.m_sources = sources; + m_pending_transfer.m_targets = targets; + m_pending_transfer.m_work = true; + LOGGER_INFO("Stored stage-out information"); + } + } // For all the transfers for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; @@ -523,20 +535,10 @@ master_server::transfer_datasets(const network::request& req, // If we are not using ftio start transfer if we are on // stage-out - if(m_ftio) { + if(!m_ftio) { // If we are on stage-out - if(s.get_type() == cargo::dataset::type::gekkofs) { - - // We have only one pendingTransfer for FTIO - // that can be updated, the issue is that we - // need the tid. - m_pending_transfer.m_p = r; - m_pending_transfer.m_sources = sources; - m_pending_transfer.m_targets = targets; - m_pending_transfer.m_work = true; - LOGGER_INFO("Stored stage-out information"); - } - } else { + + for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), i, s, d); diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index 87b7e9a..aff3eac 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -28,13 +28,13 @@ gekko_plugin::open(const std::string& path, int flags, unsigned int mode) { // Override the pread function ssize_t gekko_plugin::pread(int fd, void* buf, size_t count, off_t offset) { - return gkfs::syscall::gkfs_pread_ws(fd, buf, count, offset); + return gkfs::syscall::gkfs_pread(fd, buf, count, offset); } // Override the pwrite function ssize_t gekko_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { - return gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, offset); + return gkfs::syscall::gkfs_pwrite(fd, buf, count, offset); } diff --git a/src/request_manager.cpp b/src/request_manager.cpp index 0f62302..4d697df 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -57,29 +57,28 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { /** * @brief Update the request for ftio processing (as it is modified by readdir) - * - * @param request - * @param nfiles - * @param nworkers - * @return error_code + * + * @param request + * @param nfiles + * @param nworkers + * @return error_code */ error_code -request_manager::update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers){ +request_manager::update(std::uint64_t tid, std::size_t nfiles, + std::size_t nworkers) { abt::unique_lock lock(m_mutex); + m_requests[tid] = std::vector{nfiles, + std::vector{nworkers}}; + + return error_code::success; + - if(const auto it = m_requests.find(tid); it != m_requests.end()) { - it->second.resize(nfiles, std::vector{nworkers}); - return error_code::success; - - } - LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); - return error_code::no_such_transfer; } error_code -request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, - transfer_state s, float bw, +request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, + std::string name, transfer_state s, float bw, std::optional ec) { abt::unique_lock lock(m_mutex); @@ -114,7 +113,7 @@ request_manager::lookup(std::uint64_t tid) { return request_status{ps}; } } - // TODO : completed should have the name of the file if its not found + // TODO : completed should have the name of the file if its not found return request_status{"", transfer_state::completed, 0.0f}; } @@ -143,7 +142,7 @@ request_manager::lookup_all(std::uint64_t tid) { // not finished rs = request_status{ps}; } - rs.bw(bw/(double)fs.size()); + rs.bw(bw / (double) fs.size()); result.push_back(rs); } return result; -- GitLab From 959561d919000550038dd5e30e545e093d8b5074 Mon Sep 17 00:00:00 2001 From: Marc Vef Date: Wed, 27 Mar 2024 20:01:37 +0100 Subject: [PATCH 14/19] Adding --run argument to cargo_ftio to trigger run now. Has no effect when period is set. --- .gitignore | 4 ++-- cli/ftio.cpp | 15 ++++++++++----- src/master.cpp | 47 +++++++++++++++++++++++++++++++++-------------- src/master.hpp | 4 ++-- 4 files changed, 47 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index a34243d..cf4f696 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ build* -cmake-build-debug +cmake-build-* .idea -.vscode +.vscode \ No newline at end of file diff --git a/cli/ftio.cpp b/cli/ftio.cpp index 347d25d..751f600 100644 --- a/cli/ftio.cpp +++ b/cli/ftio.cpp @@ -35,6 +35,7 @@ struct ftio_config { float confidence; float probability; float period; + bool run{false}; }; ftio_config @@ -52,16 +53,18 @@ parse_command_line(int argc, char* argv[]) { app.add_option("-c,--conf", cfg.confidence, "confidence") ->option_text("float") - ->required(); + ->default_val("-1.0"); app.add_option("-p,--probability", cfg.probability, "probability") ->option_text("float") - ->default_str("-1.0"); + ->default_val("-1.0"); app.add_option("-t,--period", cfg.period, "period") ->option_text("float") - ->required(); - + ->default_val("-1.0"); + app.add_flag( + "--run", cfg.run, + "Trigger stage operation to run now. Has no effect when period is set > 0"); try { app.parse(argc, argv); @@ -94,7 +97,9 @@ main(int argc, char* argv[]) { if(const auto result = rpc_client.lookup(address); result.has_value()) { const auto& endpoint = result.value(); - const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, cfg.period); + const auto retval = + endpoint.call("ftio_int", cfg.confidence, cfg.probability, + cfg.period, cfg.run); if(retval.has_value()) { diff --git a/src/master.cpp b/src/master.cpp index 276ea68..747abe1 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -179,34 +179,45 @@ master_server::ftio_scheduling_ult() { while(!m_shutting_down) { - if(!m_pending_transfer.m_work or m_period < 0.0f) { + if(!m_pending_transfer.m_work or !m_ftio_run) { std::this_thread::sleep_for(1000ms); } + // if(!m_pending_transfer.m_work or m_period < 0.0f) { + // std::this_thread::sleep_for(1000ms); + // } // Do something with the confidence and probability - if(m_ftio_changed) { - m_ftio_changed = false; - LOGGER_INFO("Confidence is {}, probability is {} and period is {}", - m_confidence, m_probability, m_period); - } + // if(m_ftio_run) { + // m_ftio_run = false; + // LOGGER_INFO("Confidence is {}, probability is {} and + // period is {}", + // m_confidence, m_probability, m_period); + // } if(!m_pending_transfer.m_work) continue; - - LOGGER_INFO("Waiting period : {}", m_period); + if(m_period > 0) { + LOGGER_INFO("Waiting period : {}", m_period); + } else { + LOGGER_INFO("Waiting for run trigger ..."); + } // Wait in small periods, just in case we change it, This should be // mutexed... auto elapsed = m_period; while(elapsed > 0) { std::this_thread::sleep_for(std::chrono::seconds((int) (1))); elapsed -= 1; - if(m_ftio_changed) { + // reset elapsed value when new RPC comes in + if(m_ftio_run) { elapsed = m_period; - m_ftio_changed = false; + m_ftio_run = false; } } + if(!m_ftio_run) { + continue; + } LOGGER_INFO("Checking if there is work to do in {}", m_pending_transfer.m_sources); @@ -239,6 +250,12 @@ master_server::ftio_scheduling_ult() { fs->unlink(file.path()); } } + if(m_period > 0) { + // always run whenever period is set + m_ftio_run = true; + } else { + m_ftio_run = false; + } } LOGGER_INFO("Shutting down."); @@ -636,7 +653,7 @@ master_server::transfer_statuses(const network::request& req, void master_server::ftio_int(const network::request& req, float conf, float prob, - float period) { + float period, bool run) { using network::get_address; using network::rpc_info; using proto::generic_response; @@ -646,12 +663,14 @@ master_server::ftio_int(const network::request& req, float conf, float prob, m_confidence = conf; m_probability = prob; m_period = period; - m_ftio_changed = true; + m_ftio_run = run; + if(m_period > 0) + m_ftio_run = true; m_ftio = true; LOGGER_INFO( - "rpc {:>} body: {{confidence: {}, probability: {}, period: {}}}", - rpc, conf, prob, period); + "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}}}", + rpc, conf, prob, period, run); const auto resp = generic_response{rpc.id(), error_code::success}; diff --git a/src/master.hpp b/src/master.hpp index 5d88da7..1fa39f5 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -89,7 +89,7 @@ private: void ftio_int(const network::request& req, float confidence, float probability, - float period); + float period, bool run); private: // Dedicated execution stream for the MPI listener ULT @@ -104,7 +104,7 @@ private: float m_confidence = -1.0f; float m_probability = -1.0f; float m_period = -1.0f; - bool m_ftio_changed = true; + bool m_ftio_run = true; // FTIO enabled flag, we need to call ftio once. bool m_ftio = false; -- GitLab From 560a72c41d48203db37917308adeb4d53a2a679d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 2 Apr 2024 16:03:27 +0200 Subject: [PATCH 15/19] Removed python requirement without DataClay --- cmake/FindDataClay.cmake | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmake/FindDataClay.cmake b/cmake/FindDataClay.cmake index 2008f6a..30ae1c0 100644 --- a/cmake/FindDataClay.cmake +++ b/cmake/FindDataClay.cmake @@ -34,8 +34,6 @@ find_path(DataClay_MODEL_DIR ) message(STATUS "[${PROJECT_NAME}] DataClay library MODEL DIR ${DataClay_MODEL_DIR}") -find_package(Python3 REQUIRED Development) -message(STATUS "[${PROJECT_NAME}] DataClay library needs Python include ${Python3_INCLUDE_DIRS}") @@ -53,6 +51,9 @@ find_package_handle_standard_args( ) if(DataClay_FOUND) + find_package(Python3 REQUIRED Development) + message(STATUS "[${PROJECT_NAME}] DataClay library needs Python include ${Python3_INCLUDE_DIRS}") + set(DataClay_LIBRARIES ${DataClay_LIBRARY}) set(DataClay_INCLUDE_DIRS ${DataClay_INCLUDE_DIR} ) -- GitLab From 2487d029f728147e2921ea611cb683dc4024395e Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 10 Apr 2024 09:52:38 +0200 Subject: [PATCH 16/19] update systemd (conflict with the one in scord) --- systemd/cargo@.service.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemd/cargo@.service.in b/systemd/cargo@.service.in index 6e13006..425c636 100644 --- a/systemd/cargo@.service.in +++ b/systemd/cargo@.service.in @@ -3,7 +3,7 @@ Description=Cargo parallel data stager [Service] Type=simple -EnvironmentFile=%S/cargo/%I.cfg +EnvironmentFile=%h/.config/cargo/%I.cfg ExecStart=@CMAKE_INSTALL_FULL_BINDIR@/cargoctl start -s ${CARGO_ADDRESS} -H ${CARGO_HOSTS} -n ${CARGO_NUM_NODES} ExecStop=@CMAKE_INSTALL_FULL_BINDIR@/cargoctl stop -s ${CARGO_ADDRESS} Restart=no -- GitLab From edd7a4ce480a95b84017507c623efd0e4596662f Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 24 Apr 2024 15:11:34 +0200 Subject: [PATCH 17/19] Changed TMP privileges, we need to access real tmp for gekko_hosts --- systemd/cargo@.service.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/systemd/cargo@.service.in b/systemd/cargo@.service.in index 425c636..425616b 100644 --- a/systemd/cargo@.service.in +++ b/systemd/cargo@.service.in @@ -7,5 +7,5 @@ EnvironmentFile=%h/.config/cargo/%I.cfg ExecStart=@CMAKE_INSTALL_FULL_BINDIR@/cargoctl start -s ${CARGO_ADDRESS} -H ${CARGO_HOSTS} -n ${CARGO_NUM_NODES} ExecStop=@CMAKE_INSTALL_FULL_BINDIR@/cargoctl stop -s ${CARGO_ADDRESS} Restart=no -PrivateTmp=true +PrivateTmp=false NoNewPrivileges=true -- GitLab From ccd48f9c0dab265f7ba4934094d75c07b0e2e73a Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 24 Apr 2024 17:38:59 +0200 Subject: [PATCH 18/19] Added --pause --resume flags at ftio cli (delays ongoing transfers) --- cli/ftio.cpp | 11 +++++++++- src/master.cpp | 43 ++++++++++++++++++++++++++++----------- src/master.hpp | 4 +++- src/worker/mpio_read.cpp | 21 ++++++++++++++----- src/worker/mpio_write.cpp | 14 +++++++++++-- 5 files changed, 72 insertions(+), 21 deletions(-) diff --git a/cli/ftio.cpp b/cli/ftio.cpp index 751f600..ddeb25e 100644 --- a/cli/ftio.cpp +++ b/cli/ftio.cpp @@ -36,6 +36,8 @@ struct ftio_config { float probability; float period; bool run{false}; + bool pause{false}; + bool resume{false}; }; ftio_config @@ -62,10 +64,17 @@ parse_command_line(int argc, char* argv[]) { app.add_option("-t,--period", cfg.period, "period") ->option_text("float") ->default_val("-1.0"); + app.add_flag( "--run", cfg.run, "Trigger stage operation to run now. Has no effect when period is set > 0"); + app.add_flag( + "--pause", cfg.pause, + "Trigger stage operation to slow down. Other parameters not used"); + + app.add_flag("--resume", cfg.resume, "Trigger stage operation to resume, only pause or resume will take into account. Others parameters not used."); + try { app.parse(argc, argv); return cfg; @@ -99,7 +108,7 @@ main(int argc, char* argv[]) { const auto& endpoint = result.value(); const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, - cfg.period, cfg.run); + cfg.period, cfg.run, cfg.pause, cfg.resume); if(retval.has_value()) { diff --git a/src/master.cpp b/src/master.cpp index 747abe1..4753936 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -562,6 +562,8 @@ master_server::transfer_datasets(const network::request& req, LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); } + } else { + m_ftio_tid = r.tid(); } } LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, @@ -653,24 +655,41 @@ master_server::transfer_statuses(const network::request& req, void master_server::ftio_int(const network::request& req, float conf, float prob, - float period, bool run) { + float period, bool run, bool pause, bool resume) { using network::get_address; using network::rpc_info; using proto::generic_response; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - - m_confidence = conf; - m_probability = prob; - m_period = period; - m_ftio_run = run; - if(m_period > 0) - m_ftio_run = true; - m_ftio = true; - + if(pause) { + // send shaping info + for(int rank = 1; rank < world.size(); ++rank) { + // Slowdown 1 second per block + const auto m = cargo::shaper_message{m_ftio_tid, +10}; + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), + static_cast(tag::bw_shaping), m); + } + } else if(resume) { + for(int rank = 1; rank < world.size(); ++rank) { + // Restart operation + const auto m = cargo::shaper_message{m_ftio_tid, -1}; + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), + static_cast(tag::bw_shaping), m); + } + } else { + m_confidence = conf; + m_probability = prob; + m_period = period; + m_ftio_run = run; + if(m_period > 0) + m_ftio_run = true; + m_ftio = true; + } LOGGER_INFO( - "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}}}", - rpc, conf, prob, period, run); + "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}, pause: {}, resume: {}}}", + rpc, conf, prob, period, run, pause, resume); const auto resp = generic_response{rpc.id(), error_code::success}; diff --git a/src/master.hpp b/src/master.hpp index 1fa39f5..612ae8a 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -89,7 +89,7 @@ private: void ftio_int(const network::request& req, float confidence, float probability, - float period, bool run); + float period, bool run, bool pause, bool resume); private: // Dedicated execution stream for the MPI listener ULT @@ -105,6 +105,8 @@ private: float m_probability = -1.0f; float m_period = -1.0f; bool m_ftio_run = true; + // We store the tid of the ftio transfer to proper slow it down. + std::uint64_t m_ftio_tid = 0; // FTIO enabled flag, we need to call ftio once. bool m_ftio = false; diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index ad741c5..526ef1b 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -47,7 +47,7 @@ mpio_read::operator()() { using posix_file::views::strided; m_status = error_code::transfer_in_progress; try { - + const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); @@ -172,7 +172,8 @@ mpio_read::progress(int ongoing_index) { m_status = error_code::transfer_in_progress; for(const auto& file_range : - all_of(posix_file::file{m_input_path, m_fs_i_type}) | as_blocks(m_block_size) | + all_of(posix_file::file{m_input_path, m_fs_i_type}) | + as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { ++index; @@ -183,14 +184,24 @@ mpio_read::progress(int ongoing_index) { } } // LOG indexes and sizes - + assert(m_buffer_regions[index].size() >= file_range.size()); - + auto start = std::chrono::steady_clock::now(); m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), file_range.size()); // Do sleep - std::this_thread::sleep_for(sleep_value()); + auto total_sleep = sleep_value(); + auto small_sleep = total_sleep / 100; + if(small_sleep == std::chrono::milliseconds(0)) + small_sleep = std::chrono::milliseconds(1); + while(total_sleep > std::chrono::milliseconds(0)) { + std::this_thread::sleep_for(small_sleep); + total_sleep -= small_sleep; + if(total_sleep > sleep_value()) { + break; + } + } auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index a2b6adc..ab401d0 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -144,8 +144,18 @@ mpio_write::progress(int ongoing_index) { m_bytes_per_rank += n; - // Do sleep - std::this_thread::sleep_for(sleep_value()); + // Do sleep (But be a bit reactive...) + auto total_sleep = sleep_value(); + auto small_sleep = total_sleep / 100; + if (small_sleep == std::chrono::milliseconds(0)) small_sleep = std::chrono::milliseconds(1); + while( total_sleep > std::chrono::milliseconds(0)) { + std::this_thread::sleep_for(small_sleep); + total_sleep -= small_sleep; + if (total_sleep > sleep_value()) { + break; + } + } + auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = -- GitLab From ff29f8c79ec728dc89f0220659a2a500eb02c0be Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 30 Apr 2024 09:38:10 +0200 Subject: [PATCH 19/19] 0.3.6 release --- CMakeLists.txt | 2 +- COPYRIGHT_NOTICE | 2 +- README.md | 19 ++++++++++++++++++- spack/packages/cargo/package.py | 6 +++--- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5c80035..7dd8c7b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.3.5 + VERSION 0.3.6 LANGUAGES C CXX ) diff --git a/COPYRIGHT_NOTICE b/COPYRIGHT_NOTICE index e8a05ef..4c06aeb 100644 --- a/COPYRIGHT_NOTICE +++ b/COPYRIGHT_NOTICE @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * Copyright 2022-2024, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). diff --git a/README.md b/README.md index ad2f739..b19c751 100644 --- a/README.md +++ b/README.md @@ -194,4 +194,21 @@ higher performance and flexibility can be obtained using the specific configurat On the other hand, MPIIO (parallel) uses normally file locking so there is a performance imapact, and posix is faster (we supose no external modifications are done). -Other commands are `ping`, `shutdown`, `shaping` (for bw control) and `cargo_ftio` to interactions with ftio (stage-out and gekkofs) \ No newline at end of file +Other commands are `ping`, `shutdown`, `shaping` (for bw control) and `cargo_ftio` to interactions with ftio (stage-out and gekkofs) + +`cargo_ftio` provides --resume, --pause and --run options to pause and resume the ftio related transfers. We set ftio transfers, the transfers that have gekkofs as --of, that had been setup after a ftio command. + +```shell +#SETUP FTIO, this enables stage-out to be delayed (10000 seconds) +cargo_ftio --server tcp://127.0.0.1:62000 -c -1 -p -1 -t 10000 +#SETUP Stage-out (monitors data directory and subdirs for new file) +ccp --server tcp://127.0.0.1:62000 --input /data --output ~/stage-out --if gekkofs --of parallel +#UPDATE FTIO (as needed, each 25 seconds will do the transfer order) +cargo_ftio --server tcp://127.0.0.1:62000 -c -1 -p -1 -t 25 +``` + +## User libraries for adhocfs +If Cargo finds the adhoc fs libraries (we support GekkoFS and dataclay, in this release), it will automatically use them. +The CMake command will show which adhocfs are detected. + +On the other hand, LD_preload techniques could be used. \ No newline at end of file diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index ec8198e..57976a8 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -27,7 +27,7 @@ class Cargo(CMakePackage): """A parallel data stager for malleable applications.""" homepage = "https://storage.bsc.es/gitlab/hpc/cargo" - url = "https://storage.bsc.es/gitlab/hpc/cargo/-/archive/v0.3.5/cargo-v0.3.5.tar.bz2" + url = "https://storage.bsc.es/gitlab/hpc/cargo/-/archive/v0.3.6/cargo-v0.3.6.tar.bz2" git = "https://storage.bsc.es/gitlab/hpc/cargo.git" maintainers("alberto-miranda") @@ -40,8 +40,8 @@ class Cargo(CMakePackage): version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960", deprecated=True) version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd", deprecated=True) version("0.3.4", sha256="42b740fb7e82c49d73dfb6caf7549876f72913afb75996c6558e956ea63de3da", deprecated=True) - version("0.3.5", sha256="5c2e998aa96b15bdf513e8c2fce5f20859cf9a6a51882c59b80d5d801a10edd8") - + version("0.3.5", sha256="5c2e998aa96b15bdf513e8c2fce5f20859cf9a6a51882c59b80d5d801a10edd8", deprecated=True) + version("0.3.6") # build variants variant('build_type', default='Release', -- GitLab