From 57f5693ef89636ab81d06eaa27f87c9763470c5a Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 27 Oct 2023 14:32:37 +0200 Subject: [PATCH 01/14] WIP BW Shaping messages --- cli/shaping.cpp | 114 ++++++++++++++++++++++++++++++++++++++ src/master.cpp | 26 +++++++++ src/master.hpp | 5 ++ src/proto/mpi/message.hpp | 49 +++++++++++++++- src/worker/worker.cpp | 10 ++++ 5 files changed, 203 insertions(+), 1 deletion(-) create mode 100644 cli/shaping.cpp diff --git a/cli/shaping.cpp b/cli/shaping.cpp new file mode 100644 index 0000000..edefb7c --- /dev/null +++ b/cli/shaping.cpp @@ -0,0 +1,114 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +struct shaping_config { + std::string progname; + std::string server_address; + std::int64_t tid; + std::int16_t shaping; +}; + +shaping_config +parse_command_line(int argc, char* argv[]) { + + shaping_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Cargo shaping client", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + + app.add_option("-i,--tid", cfg.tid, "transfer id") + ->option_text("integer") + ->required(); + + app.add_option("-b,--bw", cfg.shaping, "bw shaping") + ->option_text("integer") + ->required(); + + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + shaping_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + network::client rpc_client{protocol}; + + if(const auto result = rpc_client.lookup(address); result.has_value()) { + const auto& endpoint = result.value(); + const auto retval = endpoint.call("bw_shaping", cfg.tid, cfg.shaping); + + if(retval.has_value()) { + + auto error_code = int{retval.value()}; + + fmt::print("bw_shaping RPC was successful!\n"); + fmt::print(" (server replied with: {})\n", error_code); + return EXIT_SUCCESS; + } + + fmt::print(stderr, "bw_shaping RPC failed\n"); + return EXIT_FAILURE; + + } else { + fmt::print(stderr, "Failed to lookup address: {}\n", address); + return EXIT_FAILURE; + } + } catch(const std::exception& ex) { + fmt::print(stderr, "Error: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/src/master.cpp b/src/master.cpp index 4b106eb..ea9fc9a 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -85,6 +85,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(shutdown)); provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); + provider::define(EXPAND(bw_shaping)); #undef EXPAND @@ -169,6 +170,31 @@ master_server::ping(const network::request& req) { req.respond(resp); } + +void +master_server::bw_shaping(const network::request& req, std::uint64_t tid, std::int16_t shaping) { + using network::get_address; + using network::rpc_info; + using proto::generic_response; + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{tid: {}, shaping: {}}}", rpc, tid, shaping); + + for(int rank = 1; rank < world.size(); ++rank) { + const auto m = cargo::shaper_message{tid, shaping}; + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), static_cast(tag::bw_shaping), m); + } + + const auto resp = generic_response{rpc.id(), error_code::success}; + + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + + req.respond(resp); + +} + void master_server::shutdown(const network::request& req) { using network::get_address; diff --git a/src/master.hpp b/src/master.hpp index 3d5797a..6243daf 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -58,6 +58,11 @@ private: void transfer_status(const network::request& req, std::uint64_t tid); + // Receives a request to increase or decrease BW + // -1 faster, 0 , +1 slower + void + bw_shaping(const network::request& req, std::uint64_t tid, std::int16_t shaping); + private: // Dedicated execution stream for the MPI listener ULT thallium::managed m_mpi_listener_ess; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index bc8c5a1..b2818b4 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -35,7 +35,7 @@ namespace cargo { -enum class tag : int { pread, pwrite, sequential, status, shutdown }; +enum class tag : int { pread, pwrite, sequential, bw_shaping, status, shutdown }; class transfer_message { @@ -138,6 +138,42 @@ private: std::optional m_error_code{}; }; +class shaper_message { + + friend class boost::serialization::access; + +public: + shaper_message() = default; + + shaper_message(std::uint64_t tid, std::int16_t shaping) + : m_tid(tid), m_shaping(shaping) { + } + + [[nodiscard]] std::uint64_t + tid() const { + return m_tid; + } + + [[nodiscard]] std::int16_t + shaping() const { + return m_shaping; + } + +private: + template + void + serialize(Archive& ar, const unsigned int version) { + (void) version; + + ar & m_tid; + ar & m_shaping; + } + + std::uint64_t m_tid{}; + std::uint16_t m_shaping{}; +}; + + class shutdown_message { friend class boost::serialization::access; @@ -186,6 +222,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::shaper_message& s, FormatContext& ctx) const { + const auto str = fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 484f3b0..41bf8be 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -138,6 +138,16 @@ worker::run() { break; } + case tag::bw_shaping: { + shaper_message m; + world.recv(msg->source(), msg->tag(), m); + LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + + // TODO: Do something with m.shaping; + + break; + } + case tag::shutdown: LOGGER_INFO("msg => from: {} body: {{shutdown}}", msg->source()); -- GitLab From 0e186ffdba1191ac8c8505c7f79caad72beeff53 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 30 Oct 2023 12:54:38 +0100 Subject: [PATCH 02/14] WIP sleep --- src/worker/mpio_read.cpp | 19 ++++++++++++++++--- src/worker/mpio_read.hpp | 5 +++++ src/worker/mpio_write.cpp | 18 +++++++++++++++++- src/worker/mpio_write.hpp | 4 ++++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 1c6bbd0..c386adc 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -48,7 +48,7 @@ mpio_read::operator()() const { m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = input_file.size(); - std::size_t block_size = 512u; + std::size_t block_size = 512*1024u; // create block type MPI_Datatype block_type; @@ -133,9 +133,22 @@ mpio_read::operator()() const { all_of(posix_file::file{m_input_path}) | as_blocks(block_size) | strided(workers_size, workers_rank)) { assert(buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); output_file.pwrite(buffer_regions[index], file_range.offset(), file_range.size()); - + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast>( + end - start) + .count(); + if((elapsed_seconds) > 0) { + LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s", block_size/1024.0, + elapsed_seconds, (block_size/(1024.0*1024.0)) / (elapsed_seconds)); + } + // Do sleep + std::this_thread::sleep_for(sleep_value()); + ++index; } } catch(const mpioxx::io_error& e) { @@ -144,7 +157,7 @@ mpio_read::operator()() const { } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); return make_system_error(e.error_code()); - } catch (const std::system_error& e) { + } catch(const std::system_error& e) { LOGGER_ERROR("Unexpected system error: {}", e.what()); return make_system_error(e.code().value()); } catch(const std::exception& e) { diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 5ce311d..8122015 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -40,10 +40,15 @@ public: cargo::error_code operator()() const final; + std::chrono::milliseconds sleep_value(); + // We pass a - or + value to decrease or increase the bw shaping. + void sleep_value(std::int16_t incr); + private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + std::int16_t m_sleep_value; }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 5f40048..86add66 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -41,7 +41,7 @@ mpio_write::operator()() const { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); - std::size_t block_size = 512u; + std::size_t block_size = 512*1024u; // 512 kb std::size_t file_size = std::filesystem::file_size(m_input_path); // compute the number of blocks in the file @@ -81,6 +81,7 @@ mpio_write::operator()() const { strided(workers_size, workers_rank)) { assert(buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); const std::size_t n = input_file.pread(buffer_regions[index], file_range.offset(), file_range.size()); @@ -91,7 +92,22 @@ mpio_write::operator()() const { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); + auto start = std::chrono::steady_clock::now(); + + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast>( + end - start) + .count(); + if((elapsed_seconds) > 0) { + LOGGER_INFO("BW (pread) Update: {} / {} = {} mb/s", block_size/1024.0, + elapsed_seconds, (block_size/(1024.0*1024.0)) / (elapsed_seconds)); + } + bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); ++index; } diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 69b2ced..08ef35f 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -41,11 +41,15 @@ public: cargo::error_code operator()() const final; + std::chrono::milliseconds sleep_value(); + // We pass a - or + value to decrease or increase the bw shaping. + void sleep_value(std::int16_t incr); private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + std::int16_t m_sleep_value; }; } // namespace cargo -- GitLab From fbdda65c7dcf7ee76560104fe5eea8a8da1d1dc4 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 30 Oct 2023 13:27:27 +0100 Subject: [PATCH 03/14] updated thread --- src/worker/mpio_read.cpp | 22 ++++++++++++++++++---- src/worker/mpio_read.hpp | 2 +- src/worker/mpio_write.cpp | 25 +++++++++++++++++-------- src/worker/mpio_write.hpp | 2 +- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index c386adc..0ececfa 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -27,6 +27,7 @@ #include "mpio_read.hpp" #include "mpioxx.hpp" #include "memory.hpp" +#include namespace cargo { @@ -48,7 +49,7 @@ mpio_read::operator()() const { m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = input_file.size(); - std::size_t block_size = 512*1024u; + std::size_t block_size = 512 * 1024u; // create block type MPI_Datatype block_type; @@ -143,12 +144,14 @@ mpio_read::operator()() const { end - start) .count(); if((elapsed_seconds) > 0) { - LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s", block_size/1024.0, - elapsed_seconds, (block_size/(1024.0*1024.0)) / (elapsed_seconds)); + LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s", + block_size / 1024.0, elapsed_seconds, + (block_size / (1024.0 * 1024.0)) / + (elapsed_seconds)); } // Do sleep std::this_thread::sleep_for(sleep_value()); - + ++index; } } catch(const mpioxx::io_error& e) { @@ -168,4 +171,15 @@ mpio_read::operator()() const { return error_code::success; } +std::chrono::milliseconds +mpio_read::sleep_value() const { + return std::chrono::milliseconds{0}; +} + +void +mpio_read::sleep_value(std::int16_t incr) { + m_sleep_value += incr; +} + + } // namespace cargo \ No newline at end of file diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 8122015..3486830 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -40,7 +40,7 @@ public: cargo::error_code operator()() const final; - std::chrono::milliseconds sleep_value(); + std::chrono::milliseconds sleep_value() const; // We pass a - or + value to decrease or increase the bw shaping. void sleep_value(std::int16_t incr); diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 86add66..90eb2d6 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -27,7 +27,7 @@ #include "mpio_write.hpp" #include "mpioxx.hpp" #include "memory.hpp" - +#include namespace cargo { cargo::error_code @@ -41,7 +41,7 @@ mpio_write::operator()() const { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); - std::size_t block_size = 512*1024u; // 512 kb + std::size_t block_size = 512 * 1024u; // 512 kb std::size_t file_size = std::filesystem::file_size(m_input_path); // compute the number of blocks in the file @@ -81,7 +81,7 @@ mpio_write::operator()() const { strided(workers_size, workers_rank)) { assert(buffer_regions[index].size() >= file_range.size()); - auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::steady_clock::now(); const std::size_t n = input_file.pread(buffer_regions[index], file_range.offset(), file_range.size()); @@ -92,8 +92,6 @@ mpio_write::operator()() const { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); - auto start = std::chrono::steady_clock::now(); - auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -101,8 +99,10 @@ mpio_write::operator()() const { end - start) .count(); if((elapsed_seconds) > 0) { - LOGGER_INFO("BW (pread) Update: {} / {} = {} mb/s", block_size/1024.0, - elapsed_seconds, (block_size/(1024.0*1024.0)) / (elapsed_seconds)); + LOGGER_INFO("BW (pread) Update: {} / {} = {} mb/s", + block_size / 1024.0, elapsed_seconds, + (block_size / (1024.0 * 1024.0)) / + (elapsed_seconds)); } bytes_per_rank += n; @@ -162,7 +162,7 @@ mpio_write::operator()() const { } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); return make_system_error(e.error_code()); - } catch (const std::system_error& e) { + } catch(const std::system_error& e) { LOGGER_ERROR("Unexpected system error: {}", e.what()); return make_system_error(e.code().value()); } catch(const std::exception& e) { @@ -172,5 +172,14 @@ mpio_write::operator()() const { return error_code::success; } +std::chrono::milliseconds +mpio_write::sleep_value() const { + return std::chrono::milliseconds{0}; +} + +void +mpio_write::sleep_value(std::int16_t incr) { + m_sleep_value += incr; +} } // namespace cargo diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 08ef35f..55b2b06 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -41,7 +41,7 @@ public: cargo::error_code operator()() const final; - std::chrono::milliseconds sleep_value(); + std::chrono::milliseconds sleep_value() const; // We pass a - or + value to decrease or increase the bw shaping. void sleep_value(std::int16_t incr); -- GitLab From d48109723cf457f217265e68059ddfe98e177679 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 31 Oct 2023 12:29:27 +0100 Subject: [PATCH 04/14] Moving info from master to worker progress --- src/worker/mpio_read.cpp | 17 ++++++----------- src/worker/mpio_read.hpp | 6 ++---- src/worker/mpio_write.cpp | 16 ++++++---------- src/worker/mpio_write.hpp | 8 ++++---- src/worker/ops.cpp | 39 +++++++++++++++++++++++++++++++++++++++ src/worker/ops.hpp | 26 +++++++++++++++++++++++++- src/worker/sequential.cpp | 5 +++++ src/worker/sequential.hpp | 2 ++ src/worker/worker.cpp | 32 ++++++++++++++++++++++++++------ 9 files changed, 115 insertions(+), 36 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 0ececfa..13c31b5 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -144,10 +144,11 @@ mpio_read::operator()() const { end - start) .count(); if((elapsed_seconds) > 0) { - LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s", + LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", block_size / 1024.0, elapsed_seconds, (block_size / (1024.0 * 1024.0)) / - (elapsed_seconds)); + (elapsed_seconds), + sleep_value()); } // Do sleep std::this_thread::sleep_for(sleep_value()); @@ -171,15 +172,9 @@ mpio_read::operator()() const { return error_code::success; } -std::chrono::milliseconds -mpio_read::sleep_value() const { - return std::chrono::milliseconds{0}; -} - -void -mpio_read::sleep_value(std::int16_t incr) { - m_sleep_value += incr; +cargo::error_code +mpio_read::progress() const { + return error_code::success; } - } // namespace cargo \ No newline at end of file diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 3486830..503c507 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -40,15 +40,13 @@ public: cargo::error_code operator()() const final; - std::chrono::milliseconds sleep_value() const; - // We pass a - or + value to decrease or increase the bw shaping. - void sleep_value(std::int16_t incr); + cargo::error_code + progress() const final; private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; - std::int16_t m_sleep_value; }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 90eb2d6..9c5c5c6 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -99,10 +99,11 @@ mpio_write::operator()() const { end - start) .count(); if((elapsed_seconds) > 0) { - LOGGER_INFO("BW (pread) Update: {} / {} = {} mb/s", + LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", block_size / 1024.0, elapsed_seconds, (block_size / (1024.0 * 1024.0)) / - (elapsed_seconds)); + (elapsed_seconds), + sleep_value()); } bytes_per_rank += n; @@ -172,14 +173,9 @@ mpio_write::operator()() const { return error_code::success; } -std::chrono::milliseconds -mpio_write::sleep_value() const { - return std::chrono::milliseconds{0}; -} - -void -mpio_write::sleep_value(std::int16_t incr) { - m_sleep_value += incr; +cargo::error_code +mpio_write::progress() const { + return error_code::success; } } // namespace cargo diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 55b2b06..d9f6631 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -41,15 +41,15 @@ public: cargo::error_code operator()() const final; - std::chrono::milliseconds sleep_value() const; - // We pass a - or + value to decrease or increase the bw shaping. - void sleep_value(std::int16_t incr); + + cargo::error_code + progress() const final; + private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; - std::int16_t m_sleep_value; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 17c16c9..1ad8f95 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -54,4 +54,43 @@ operation::make_operation(cargo::tag t, mpi::communicator workers, } } +std::chrono::milliseconds +operation::sleep_value() const { + if(m_sleep_value <= 0) + return std::chrono::milliseconds{0}; + else + return std::chrono::milliseconds{m_sleep_value * 100}; +} + +void +operation::set_bw_shaping(std::int16_t incr) { + m_sleep_value += incr; +} + +int +operation::source() { + return m_rank; +} +std::uint64_t +operation::tid() { + return m_tid; +} +std::uint32_t +operation::seqno() { + return m_seqno; +} + + +void +operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno) { + m_rank = rank; + m_tid = tid; + m_seqno = seqno; +} + +cargo::error_code +operation::progress() const { + return error_code::other; +} + } // namespace cargo diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index b9a20cf..c5770fa 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -30,7 +30,6 @@ #include #include "proto/mpi/message.hpp" #include "cargo.hpp" - namespace cargo { /** @@ -48,6 +47,31 @@ public: virtual cargo::error_code operator()() const = 0; + + + std::chrono::milliseconds + sleep_value() const; + // We pass a - or + value to decrease or increase the bw shaping. + void + set_bw_shaping(std::int16_t incr); + virtual cargo::error_code + progress() const = 0; + + int + source(); + std::uint64_t + tid(); + std::uint32_t + seqno(); + void + set_comm(int rank, std::uint64_t tid, std::uint32_t seqno); + + +private: + std::int16_t m_sleep_value; + int m_rank; + std::uint64_t m_tid; + std::uint32_t m_seqno; }; } // namespace cargo diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index a1cf8ed..66cc123 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -34,4 +34,9 @@ seq_operation::operator()() const { return cargo::error_code::not_implemented; } +cargo::error_code +seq_operation::progress() const { + return error_code::success; +} + } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 82fa619..223766b 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -41,6 +41,8 @@ public: cargo::error_code operator()() const final; + cargo::error_code + progress() const; private: mpi::communicator m_comm; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 41bf8be..0da0a80 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -67,6 +67,9 @@ update_state(int rank, std::uint64_t tid, std::uint32_t seqno, namespace cargo { + +std::shared_ptr op; + worker::worker(std::string name, int rank) : m_name(std::move(name)), m_rank(rank) {} @@ -109,6 +112,15 @@ worker::run() { if(!msg) { // FIXME: sleep time should be configurable + if(op.get()) { + cargo::error_code ec = op->progress(); + if(ec != cargo::error_code::other) { + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + ec); + } + } std::this_thread::sleep_for(150ms); continue; } @@ -123,18 +135,22 @@ worker::run() { world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); - const auto op = operation::make_operation( - t, workers, m.input_path(), m.output_path()); + op = operation::make_operation(t, workers, m.input_path(), + m.output_path()); - update_state(msg->source(), m.tid(), m.seqno(), - transfer_state::running); + op->set_comm(msg->source(), m.tid(), m.seqno()); + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running); + op->set_bw_shaping(1); cargo::error_code ec = (*op)(); - update_state(msg->source(), m.tid(), m.seqno(), + update_state(op->source(), op->tid(), op->seqno(), ec ? transfer_state::failed : transfer_state::completed, ec); + + op.reset(); break; } @@ -142,8 +158,12 @@ worker::run() { shaper_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + if(op.get()) { + op->set_bw_shaping(0); + } else { + LOGGER_INFO("Operation non existent", msg->source(), m); + } - // TODO: Do something with m.shaping; break; } -- GitLab From 5926eaf9474e70c3b674e9e3b34b64c15f854ca6 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 31 Oct 2023 12:41:42 +0100 Subject: [PATCH 05/14] loop update --- src/worker/worker.cpp | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 0da0a80..7231f58 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -119,6 +119,8 @@ worker::run() { ec ? transfer_state::failed : transfer_state::completed, ec); + // Transfer finished + op.reset(); } } std::this_thread::sleep_for(150ms); @@ -143,14 +145,7 @@ worker::run() { update_state(op->source(), op->tid(), op->seqno(), transfer_state::running); op->set_bw_shaping(1); - cargo::error_code ec = (*op)(); - - update_state(op->source(), op->tid(), op->seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - ec); - - op.reset(); + break; } -- GitLab From ba6f183242868c49010157689f1137210c81fa5d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 31 Oct 2023 12:46:11 +0100 Subject: [PATCH 06/14] rebase task --- cli/CMakeLists.txt | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index d349e97..68fea90 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -84,7 +84,24 @@ target_link_libraries(ccp cargo ) -install(TARGETS cargo_ping cargo_shutdown ccp +################################################################################ +## shaping: A CLI tool to request a Cargo server to slowdown transfers +add_executable(shaping) + +target_sources(shaping + PRIVATE + shaping.cpp +) + +target_link_libraries(shaping + PUBLIC + fmt::fmt + CLI11::CLI11 + net::rpc_client + cargo +) + +install(TARGETS cargo_ping cargo_shutdown ccp shaping RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) -- GitLab From a46edf594a1bc5f1877d66545953411e78439ce0 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 31 Oct 2023 13:05:38 +0100 Subject: [PATCH 07/14] added chrono fmt --- src/mpioxx.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 3518322..87244dc 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include // very simple RAII wrappers for some MPI types + utility functions -- GitLab From d4d9eccdf31d36666da028f10649294e5fadda44 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 2 Nov 2023 11:40:24 +0100 Subject: [PATCH 08/14] Progress loop extracted --- src/worker/mpio_read.cpp | 100 ++++++++++++++++++++++-------- src/worker/mpio_read.hpp | 18 +++++- src/worker/mpio_write.cpp | 125 +++++++++++++++++++++++++++----------- src/worker/mpio_write.hpp | 22 ++++++- src/worker/ops.hpp | 5 +- src/worker/sequential.cpp | 7 ++- src/worker/sequential.hpp | 5 +- src/worker/worker.cpp | 60 +++++++++++++----- src/worker/worker.hpp | 3 + 9 files changed, 263 insertions(+), 82 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 13c31b5..322d056 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -22,8 +22,7 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include -#include + #include "mpio_read.hpp" #include "mpioxx.hpp" #include "memory.hpp" @@ -38,11 +37,12 @@ mpio_read::mpio_read(mpi::communicator workers, m_output_path(std::move(output_path)) {} cargo::error_code -mpio_read::operator()() const { +mpio_read::operator()() { using posix_file::views::all_of; using posix_file::views::as_blocks; using posix_file::views::strided; + m_status = error_code::transfer_in_progress; try { const auto input_file = mpioxx::file::open( @@ -100,21 +100,20 @@ mpio_read::operator()() const { } // step 1. acquire buffers - memory_buffer buffer; - buffer.resize(blocks_per_rank * block_size); - std::vector buffer_regions; - buffer_regions.reserve(blocks_per_rank); + m_buffer.resize(blocks_per_rank * block_size); + + m_buffer_regions.reserve(blocks_per_rank); for(std::size_t i = 0; i < blocks_per_rank; ++i) { - buffer_regions.emplace_back(buffer.data() + i * block_size, - block_size); + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); } MPI_Datatype datatype = block_type; // step2. parallel read data into buffers - if(const auto ec = MPI_File_read_all(input_file, buffer.data(), + if(const auto ec = MPI_File_read_all(input_file, m_buffer.data(), static_cast(blocks_per_rank), datatype, MPI_STATUS_IGNORE); ec != MPI_SUCCESS) { @@ -124,19 +123,62 @@ mpio_read::operator()() const { } // step3. POSIX write data - const auto output_file = - posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR); + m_output_file = std::make_unique( + posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR)); + + m_output_file->fallocate(0, 0, file_size); + + + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + + + } catch(const mpioxx::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_mpi_error(e.error_code()); + return make_mpi_error(e.error_code()); + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } + m_status = error_code::success; + return error_code::success; +} - output_file.fallocate(0, 0, file_size); +int +mpio_read::progress(int ongoing_index) { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + try { int index = 0; for(const auto& file_range : - all_of(posix_file::file{m_input_path}) | as_blocks(block_size) | - strided(workers_size, workers_rank)) { - assert(buffer_regions[index].size() >= file_range.size()); + all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + + assert(m_buffer_regions[index].size() >= file_range.size()); auto start = std::chrono::steady_clock::now(); - output_file.pwrite(buffer_regions[index], file_range.offset(), - file_range.size()); + m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), + file_range.size()); auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -145,8 +187,8 @@ mpio_read::operator()() const { .count(); if((elapsed_seconds) > 0) { LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", - block_size / 1024.0, elapsed_seconds, - (block_size / (1024.0 * 1024.0)) / + m_block_size / 1024.0, elapsed_seconds, + (m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds), sleep_value()); } @@ -157,24 +199,30 @@ mpio_read::operator()() const { } } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); - return make_mpi_error(e.error_code()); + m_status = make_mpi_error(e.error_code()); + return -1; } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); - return make_system_error(e.error_code()); + m_status = make_system_error(e.error_code()); + return -1; } catch(const std::system_error& e) { LOGGER_ERROR("Unexpected system error: {}", e.what()); - return make_system_error(e.code().value()); + m_status = make_system_error(e.code().value()); + return -1; } catch(const std::exception& e) { LOGGER_ERROR("Unexpected exception: {}", e.what()); - return error_code::other; + m_status = error_code::other; + return -1; } - return error_code::success; + m_status = error_code::success; + return -1; } +// This needs to be go through different phases... cargo::error_code mpio_read::progress() const { - return error_code::success; + return m_status; } } // namespace cargo \ No newline at end of file diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 503c507..d135b05 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -26,6 +26,9 @@ #define CARGO_WORKER_MPIO_READ_HPP #include "ops.hpp" +#include "memory.hpp" +#include +#include namespace mpi = boost::mpi; @@ -38,15 +41,28 @@ public: std::filesystem::path output_path); cargo::error_code - operator()() const final; + operator()() final; cargo::error_code progress() const final; + int + progress(int ongoing_index ) final; + private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + cargo::error_code m_status; + + + std::unique_ptr m_output_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + memory_buffer m_buffer; + std::vector m_buffer_regions; + }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 9c5c5c6..8b48dd3 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -22,16 +22,15 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include -#include + #include "mpio_write.hpp" #include "mpioxx.hpp" -#include "memory.hpp" + #include namespace cargo { cargo::error_code -mpio_write::operator()() const { +mpio_write::operator()() { using posix_file::views::all_of; using posix_file::views::as_blocks; @@ -50,7 +49,7 @@ mpio_write::operator()() const { if(file_size % block_size != 0) { ++total_blocks; } - + m_status = error_code::transfer_in_progress; // find how many blocks this rank is responsible for std::size_t blocks_per_rank = total_blocks / workers_size; @@ -60,31 +59,78 @@ mpio_write::operator()() const { } // step 1. acquire buffers - memory_buffer buffer; - buffer.resize(blocks_per_rank * block_size); - std::vector buffer_regions; - buffer_regions.reserve(blocks_per_rank); + m_buffer.resize(blocks_per_rank * block_size); + m_buffer_regions.reserve(blocks_per_rank); for(std::size_t i = 0; i < blocks_per_rank; ++i) { - buffer_regions.emplace_back(buffer.data() + i * block_size, - block_size); + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); } - const auto input_file = posix_file::open(m_input_path, O_RDONLY); + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY)); - int index = 0; - std::size_t bytes_per_rank = 0; + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + m_file_size = file_size; + m_total_blocks = total_blocks; - for(const auto& file_range : - all_of(input_file) | as_blocks(block_size) | - strided(workers_size, workers_rank)) { + } catch(const mpioxx::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_mpi_error(e.error_code()); + return make_mpi_error(e.error_code()); + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } - assert(buffer_regions[index].size() >= file_range.size()); + return error_code::success; +} +cargo::error_code +mpio_write::progress() const { + return error_code::success; +} + +int +mpio_write::progress(int ongoing_index) { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + // compute the number of blocks in the file + int total_blocks = static_cast(m_file_size / m_block_size); + + int index = 0; + std::size_t bytes_per_rank = 0; + try { + for(const auto& file_range : + all_of(*m_input_file) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + assert(m_buffer_regions[index].size() >= file_range.size()); auto start = std::chrono::steady_clock::now(); const std::size_t n = - input_file.pread(buffer_regions[index], file_range.offset(), - file_range.size()); + m_input_file->pread(m_buffer_regions[index], + file_range.offset(), file_range.size()); LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", fmt::join(buffer_regions[index].begin(), @@ -100,8 +146,8 @@ mpio_write::operator()() const { .count(); if((elapsed_seconds) > 0) { LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", - block_size / 1024.0, elapsed_seconds, - (block_size / (1024.0 * 1024.0)) / + m_block_size / 1024.0, elapsed_seconds, + (m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds), sleep_value()); } @@ -112,6 +158,7 @@ mpio_write::operator()() const { ++index; } + // step 2. write buffer data in parallel to the PFS const auto output_file = mpioxx::file::open(m_workers, m_output_path, @@ -120,7 +167,7 @@ mpio_write::operator()() const { // create block type MPI_Datatype block_type; - MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, + MPI_Type_contiguous(static_cast(m_block_size), MPI_BYTE, &block_type); MPI_Type_commit(&block_type); @@ -133,49 +180,53 @@ mpio_write::operator()() const { * stride: number of `oldtype` elements between start of each block */ MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, - /* stride: */ workers_size, /* oldtype: */ block_type, + /* stride: */ m_workers_size, /* oldtype: */ block_type, &file_type); MPI_Type_commit(&file_type); if(const auto ec = MPI_File_set_view(output_file, - /* disp: */ workers_rank * block_size, + /* disp: */ m_workers_rank * m_block_size, /* elementary_type: */ block_type, file_type, "native", MPI_INFO_NULL); ec != MPI_SUCCESS) { LOGGER_ERROR("MPI_File_set_view() failed: {}", mpi::error_string(ec)); - return make_mpi_error(ec); + m_status = make_mpi_error(ec); + return -1; } // step 3. parallel write data from buffers - if(const auto ec = MPI_File_write_all(output_file, buffer.data(), + if(const auto ec = MPI_File_write_all(output_file, m_buffer.data(), static_cast(bytes_per_rank), MPI_BYTE, MPI_STATUS_IGNORE); ec != MPI_SUCCESS) { LOGGER_ERROR("MPI_File_write_all() failed: {}", mpi::error_string(ec)); - return make_mpi_error(ec); + m_status = make_mpi_error(ec); + return -1; } } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); - return make_mpi_error(e.error_code()); + m_status = make_mpi_error(e.error_code()); + return -1; } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); - return make_system_error(e.error_code()); + m_status = make_system_error(e.error_code()); + return -1; } catch(const std::system_error& e) { LOGGER_ERROR("Unexpected system error: {}", e.what()); - return make_system_error(e.code().value()); + m_status = make_system_error(e.code().value()); + return -1; } catch(const std::exception& e) { LOGGER_ERROR("Unexpected exception: {}", e.what()); - return error_code::other; + m_status = error_code::other; + return -1; } - return error_code::success; -} -cargo::error_code -mpio_write::progress() const { - return error_code::success; + m_status = error_code::success; + + return -1; } } // namespace cargo diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index d9f6631..fb57dea 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -25,7 +25,10 @@ #ifndef CARGO_WORKER_MPIO_WRITE_HPP #define CARGO_WORKER_MPIO_WRITE_HPP +#include +#include #include "ops.hpp" +#include "memory.hpp" namespace mpi = boost::mpi; @@ -40,16 +43,33 @@ public: m_output_path(std::move(output_path)) {} cargo::error_code - operator()() const final; + operator()() final; cargo::error_code progress() const final; + int + progress(int ongoing_index) final; + + private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + + cargo::error_code m_status; + + + std::unique_ptr m_input_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + std::size_t m_file_size; + int m_total_blocks; + + memory_buffer m_buffer; + std::vector m_buffer_regions; }; } // namespace cargo diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index c5770fa..61cccec 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -46,7 +46,7 @@ public: virtual ~operation() = default; virtual cargo::error_code - operator()() const = 0; + operator()() = 0; std::chrono::milliseconds @@ -56,6 +56,9 @@ public: set_bw_shaping(std::int16_t incr); virtual cargo::error_code progress() const = 0; + virtual int + progress(int index) = 0; + int source(); diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 66cc123..83d3da5 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -29,7 +29,7 @@ namespace cargo { cargo::error_code -seq_operation::operator()() const { +seq_operation::operator()() { LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); return cargo::error_code::not_implemented; } @@ -39,4 +39,9 @@ seq_operation::progress() const { return error_code::success; } +int +seq_operation::progress(int ongoing_index) { + return ++ongoing_index; +} + } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 223766b..f5d54da 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -40,10 +40,13 @@ public: m_output_path(std::move(output_path)) {} cargo::error_code - operator()() const final; + operator()() final; cargo::error_code progress() const; + int + progress(int ongoing_index ) final; + private: mpi::communicator m_comm; std::filesystem::path m_input_path; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 7231f58..6f27d79 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -29,7 +29,7 @@ #include #include "ops.hpp" #include "worker.hpp" -#include "proto/mpi/message.hpp" + #include "fmt_formatters.hpp" namespace mpi = boost::mpi; @@ -105,25 +105,55 @@ worker::run() { LOGGER_INFO("{:=>{}}", "", greeting.size()); bool done = false; - + int index = 0; while(!done) { auto msg = world.iprobe(); if(!msg) { // FIXME: sleep time should be configurable + if(op.get()) { - cargo::error_code ec = op->progress(); - if(ec != cargo::error_code::other) { - update_state(op->source(), op->tid(), op->seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - ec); - // Transfer finished - op.reset(); + if(m_t == tag::pread) { + LOGGER_INFO("progressing read? : {}", index); + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + LOGGER_INFO("transfer FINISHED (R) {}", ec); + if(ec == cargo::error_code::success) { + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + ec); + // Transfer finished + op.reset(); + } + } else { + // running + } + } else if(m_t == tag::pwrite) { + LOGGER_INFO("progressing write? : {}", index); + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + LOGGER_INFO("transfer FINISHED (W) {}", ec); + if(ec == cargo::error_code::success) { + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + ec); + // Transfer finished + op.reset(); + } + } else { + // running + } } + } else { + std::this_thread::sleep_for(150ms); } - std::this_thread::sleep_for(150ms); continue; } @@ -136,7 +166,7 @@ worker::run() { transfer_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); - + m_t = t; op = operation::make_operation(t, workers, m.input_path(), m.output_path()); @@ -144,8 +174,10 @@ worker::run() { update_state(op->source(), op->tid(), op->seqno(), transfer_state::running); - op->set_bw_shaping(1); - + // Different scenarios read -> write | write -> read + cargo::error_code ec = (*op)(); + // MPI part ? error code? + LOGGER_INFO("error? : {}", ec); break; } diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp index aaeea06..6881e8c 100644 --- a/src/worker/worker.hpp +++ b/src/worker/worker.hpp @@ -26,6 +26,8 @@ #ifndef CARGO_WORKER_HPP #define CARGO_WORKER_HPP +#include "proto/mpi/message.hpp" + namespace cargo { class worker { @@ -42,6 +44,7 @@ private: std::string m_name; int m_rank; std::optional m_output_file; + cargo::tag m_t; }; } // namespace cargo -- GitLab From c4c4c8f19b89440082831b417eabb93b3f4a7a6a Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 2 Nov 2023 17:48:38 +0100 Subject: [PATCH 09/14] Reduce simultaneous transfers --- src/worker/mpio_read.cpp | 5 ++- src/worker/ops.cpp | 8 +++- src/worker/ops.hpp | 8 ++-- src/worker/worker.cpp | 93 +++++++++++++++++++--------------------- src/worker/worker.hpp | 6 ++- tests/tests.cpp | 2 +- 6 files changed, 65 insertions(+), 57 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 322d056..6f858bc 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -151,8 +151,8 @@ mpio_read::operator()() { m_status = error_code::other; return error_code::other; } - m_status = error_code::success; - return error_code::success; + m_status = error_code::transfer_in_progress; + return error_code::transfer_in_progress; } int @@ -163,6 +163,7 @@ mpio_read::progress(int ongoing_index) { using posix_file::views::strided; try { int index = 0; + m_status = error_code::transfer_in_progress; for(const auto& file_range : all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 1ad8f95..3771ba0 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -80,12 +80,18 @@ operation::seqno() { return m_seqno; } +cargo::tag +operation::t() { + return m_t; +} void -operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno) { +operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, + cargo::tag t) { m_rank = rank; m_tid = tid; m_seqno = seqno; + m_t = t; } cargo::error_code diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 61cccec..52726aa 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -56,7 +56,7 @@ public: set_bw_shaping(std::int16_t incr); virtual cargo::error_code progress() const = 0; - virtual int + virtual int progress(int index) = 0; @@ -67,14 +67,16 @@ public: std::uint32_t seqno(); void - set_comm(int rank, std::uint64_t tid, std::uint32_t seqno); - + set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, cargo::tag t); + cargo::tag + t(); private: std::int16_t m_sleep_value; int m_rank; std::uint64_t m_tid; std::uint32_t m_seqno; + cargo::tag m_t; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 6f27d79..468b868 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -27,9 +27,8 @@ #include #include #include -#include "ops.hpp" -#include "worker.hpp" +#include "worker.hpp" #include "fmt_formatters.hpp" namespace mpi = boost::mpi; @@ -67,9 +66,6 @@ update_state(int rank, std::uint64_t tid, std::uint32_t seqno, namespace cargo { - -std::shared_ptr op; - worker::worker(std::string name, int rank) : m_name(std::move(name)), m_rank(rank) {} @@ -105,7 +101,6 @@ worker::run() { LOGGER_INFO("{:=>{}}", "", greeting.size()); bool done = false; - int index = 0; while(!done) { auto msg = world.iprobe(); @@ -113,45 +108,37 @@ worker::run() { if(!msg) { // FIXME: sleep time should be configurable - if(op.get()) { - if(m_t == tag::pread) { - LOGGER_INFO("progressing read? : {}", index); - index = op->progress(index); - if(index == -1) { - // operation finished - cargo::error_code ec = op->progress(); - LOGGER_INFO("transfer FINISHED (R) {}", ec); - if(ec == cargo::error_code::success) { - update_state(op->source(), op->tid(), op->seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - ec); - // Transfer finished - op.reset(); - } - } else { - // running - } - } else if(m_t == tag::pwrite) { - LOGGER_INFO("progressing write? : {}", index); - index = op->progress(index); - if(index == -1) { - // operation finished - cargo::error_code ec = op->progress(); - LOGGER_INFO("transfer FINISHED (W) {}", ec); - if(ec == cargo::error_code::success) { + // Progress through all transfers + for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + auto op = I->second.first.get(); + int index = I->second.second; + if(op) { + + if(op->t() == tag::pread or op->t() == tag::pwrite) { + + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + LOGGER_INFO("transfer FINISHED (R) {}", ec); update_state(op->source(), op->tid(), op->seqno(), ec ? transfer_state::failed : transfer_state::completed, ec); + // Transfer finished - op.reset(); + I = m_ops.erase(I); + if(I == m_ops.end()) { + break; + } + } else { + I->second.second = index; } - } else { - // running } } - } else { + } + + if(m_ops.size() == 0) { std::this_thread::sleep_for(150ms); } continue; @@ -166,18 +153,24 @@ worker::run() { transfer_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); - m_t = t; - op = operation::make_operation(t, workers, m.input_path(), - m.output_path()); + m_ops.emplace(std::make_pair( + make_pair(m.input_path(), m.output_path()), + make_pair(operation::make_operation(t, workers, + m.input_path(), + m.output_path()), + 0))); - op->set_comm(msg->source(), m.tid(), m.seqno()); + const auto op = + m_ops[make_pair(m.input_path(), m.output_path())] + .first.get(); + + op->set_comm(msg->source(), m.tid(), m.seqno(), t); update_state(op->source(), op->tid(), op->seqno(), transfer_state::running); // Different scenarios read -> write | write -> read - cargo::error_code ec = (*op)(); - // MPI part ? error code? - LOGGER_INFO("error? : {}", ec); + /*cargo::error_code ec =*/ (*op)(); + break; } @@ -185,10 +178,14 @@ worker::run() { shaper_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); - if(op.get()) { - op->set_bw_shaping(0); - } else { - LOGGER_INFO("Operation non existent", msg->source(), m); + for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + const auto op = I->second.first.get(); + if(op) { + + op->set_bw_shaping(0); + } else { + LOGGER_INFO("Operation non existent", msg->source(), m); + } } diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp index 6881e8c..1890974 100644 --- a/src/worker/worker.hpp +++ b/src/worker/worker.hpp @@ -27,7 +27,8 @@ #define CARGO_WORKER_HPP #include "proto/mpi/message.hpp" - +#include +#include "ops.hpp" namespace cargo { class worker { @@ -41,10 +42,11 @@ public: run(); private: + std::map, std::pair< std::unique_ptr, int> > m_ops; std::string m_name; int m_rank; std::optional m_output_file; - cargo::tag m_t; + }; } // namespace cargo diff --git a/tests/tests.cpp b/tests/tests.cpp index e2b2357..1ea359b 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -206,7 +206,7 @@ equal(const std::filesystem::path& filepath1, uint32_t catch2_seed; std::string server_address; -#define NDATASETS 10 +#define NDATASETS 4 SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { -- GitLab From eea8b63ab3f2505828c5028b5271df57e4e4606b Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 2 Nov 2023 17:59:13 +0100 Subject: [PATCH 10/14] wrong Error code in write --- src/worker/mpio_write.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 8b48dd3..e2b9af5 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -35,7 +35,7 @@ mpio_write::operator()() { using posix_file::views::all_of; using posix_file::views::as_blocks; using posix_file::views::strided; - + m_status = error_code::transfer_in_progress; try { const auto workers_size = m_workers.size(); @@ -49,7 +49,7 @@ mpio_write::operator()() { if(file_size % block_size != 0) { ++total_blocks; } - m_status = error_code::transfer_in_progress; + // find how many blocks this rank is responsible for std::size_t blocks_per_rank = total_blocks / workers_size; @@ -95,11 +95,11 @@ mpio_write::operator()() { return error_code::other; } - return error_code::success; + return error_code::transfer_in_progress; } cargo::error_code mpio_write::progress() const { - return error_code::success; + return m_status; } int @@ -126,6 +126,7 @@ mpio_write::progress(int ongoing_index) { return index; } } + m_status = error_code::transfer_in_progress; assert(m_buffer_regions[index].size() >= file_range.size()); auto start = std::chrono::steady_clock::now(); const std::size_t n = -- GitLab From f7cfe1c6f7332ee32ee83d671aecbeda75d65112 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 2 Nov 2023 18:44:11 +0100 Subject: [PATCH 11/14] error control --- src/worker/mpio_write.cpp | 4 +++- src/worker/worker.cpp | 11 +++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index e2b9af5..2641de5 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -119,7 +119,9 @@ mpio_write::progress(int ongoing_index) { strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { + bytes_per_rank += file_range.size(); ++index; + continue; } else { if(index > ongoing_index) { @@ -159,7 +161,7 @@ mpio_write::progress(int ongoing_index) { ++index; } - + LOGGER_INFO("Second Write part"); // step 2. write buffer data in parallel to the PFS const auto output_file = mpioxx::file::open(m_workers, m_output_path, diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 468b868..ab40808 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -113,9 +113,8 @@ worker::run() { auto op = I->second.first.get(); int index = I->second.second; if(op) { - if(op->t() == tag::pread or op->t() == tag::pwrite) { - + LOGGER_INFO("transfer progress (R) {} {}", index, op->seqno()); index = op->progress(index); if(index == -1) { // operation finished @@ -169,8 +168,12 @@ worker::run() { update_state(op->source(), op->tid(), op->seqno(), transfer_state::running); // Different scenarios read -> write | write -> read - /*cargo::error_code ec =*/ (*op)(); - + cargo::error_code ec = (*op)(); + if (ec != cargo::error_code::transfer_in_progress) { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::failed); + m_ops.erase(make_pair(m.input_path(), m.output_path())); + } break; } -- GitLab From 5c054b8acf1e7e83d615b4462855ef0b85c64e08 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 2 Nov 2023 19:03:37 +0100 Subject: [PATCH 12/14] Missing size for write --- src/worker/mpio_write.cpp | 14 ++++++-------- src/worker/mpio_write.hpp | 1 + 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 2641de5..e025ed1 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -109,19 +109,18 @@ mpio_write::progress(int ongoing_index) { using posix_file::views::strided; // compute the number of blocks in the file - int total_blocks = static_cast(m_file_size / m_block_size); int index = 0; - std::size_t bytes_per_rank = 0; + if(ongoing_index == 0) { + m_bytes_per_rank = 0; + } try { for(const auto& file_range : all_of(*m_input_file) | as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { - bytes_per_rank += file_range.size(); ++index; - continue; } else { if(index > ongoing_index) { @@ -155,13 +154,12 @@ mpio_write::progress(int ongoing_index) { sleep_value()); } - bytes_per_rank += n; + m_bytes_per_rank += n; // Do sleep std::this_thread::sleep_for(sleep_value()); ++index; } - LOGGER_INFO("Second Write part"); // step 2. write buffer data in parallel to the PFS const auto output_file = mpioxx::file::open(m_workers, m_output_path, @@ -182,7 +180,7 @@ mpio_write::progress(int ongoing_index) { * blocklen: number of `oldtype` elements in each block * stride: number of `oldtype` elements between start of each block */ - MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, + MPI_Type_vector(/* count: */ m_total_blocks, /* blocklength: */ 1, /* stride: */ m_workers_size, /* oldtype: */ block_type, &file_type); MPI_Type_commit(&file_type); @@ -201,7 +199,7 @@ mpio_write::progress(int ongoing_index) { // step 3. parallel write data from buffers if(const auto ec = MPI_File_write_all(output_file, m_buffer.data(), - static_cast(bytes_per_rank), + static_cast(m_bytes_per_rank), MPI_BYTE, MPI_STATUS_IGNORE); ec != MPI_SUCCESS) { LOGGER_ERROR("MPI_File_write_all() failed: {}", diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index fb57dea..9eb64cb 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -70,6 +70,7 @@ private: memory_buffer m_buffer; std::vector m_buffer_regions; + std::size_t m_bytes_per_rank; }; } // namespace cargo -- GitLab From 76b1476384d5d94bcec88acbe8f2493db36441b4 Mon Sep 17 00:00:00 2001 From: rnou Date: Fri, 3 Nov 2023 11:24:19 +0100 Subject: [PATCH 13/14] Added BW, solved sleep bug (intialization) --- src/proto/mpi/message.hpp | 20 ++++++++++++++------ src/worker/mpio_read.cpp | 6 ++---- src/worker/mpio_write.cpp | 16 ++++++++-------- src/worker/mpio_write.hpp | 1 - src/worker/ops.cpp | 8 ++++++++ src/worker/ops.hpp | 6 +++++- src/worker/worker.cpp | 17 +++++++++-------- 7 files changed, 46 insertions(+), 28 deletions(-) diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index b2818b4..7e057e3 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -95,9 +95,9 @@ public: status_message() = default; status_message(std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state state, + cargo::transfer_state state, std::float_t bw, std::optional error_code = std::nullopt) - : m_tid(tid), m_seqno(seqno), m_state(state), m_error_code(error_code) { + : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), m_error_code(error_code) { } [[nodiscard]] std::uint64_t @@ -115,6 +115,12 @@ public: return m_state; } + [[nodiscard]] std::float_t + bw() const { + return m_bw; + } + + [[nodiscard]] std::optional error_code() const { return m_error_code; @@ -129,12 +135,14 @@ private: ar & m_tid; ar & m_seqno; ar & m_state; + ar & m_bw; ar & m_error_code; } std::uint64_t m_tid{}; std::uint32_t m_seqno{}; cargo::transfer_state m_state{}; + std::float_t m_bw{}; std::optional m_error_code{}; }; @@ -212,12 +220,12 @@ struct fmt::formatter : formatter { format(const cargo::status_message& s, FormatContext& ctx) const { const auto str = s.error_code() - ? fmt::format("{{tid: {}, seqno: {}, state: {}, " + ? fmt::format("{{tid: {}, seqno: {}, state: {}, bw: {}, " "error_code: {}}}", - s.tid(), s.seqno(), s.state(), + s.tid(), s.seqno(), s.state(), s.bw(), *s.error_code()) - : fmt::format("{{tid: {}, seqno: {}, state: {}}}", - s.tid(), s.seqno(), s.state()); + : fmt::format("{{tid: {}, seqno: {}, state: {}, bw: {}}}", + s.tid(), s.seqno(), s.state(), s.bw()); return formatter::format(str, ctx); } }; diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 6f858bc..1e0a6ab 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -43,7 +43,6 @@ mpio_read::operator()() { using posix_file::views::as_blocks; using posix_file::views::strided; m_status = error_code::transfer_in_progress; - try { const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); @@ -187,10 +186,9 @@ mpio_read::progress(int ongoing_index) { end - start) .count(); if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", - m_block_size / 1024.0, elapsed_seconds, - (m_block_size / (1024.0 * 1024.0)) / - (elapsed_seconds), + m_block_size / 1024.0, elapsed_seconds, bw(), sleep_value()); } // Do sleep diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index e025ed1..ccf81e0 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -49,7 +49,7 @@ mpio_write::operator()() { if(file_size % block_size != 0) { ++total_blocks; } - + // find how many blocks this rank is responsible for std::size_t blocks_per_rank = total_blocks / workers_size; @@ -113,7 +113,7 @@ mpio_write::progress(int ongoing_index) { int index = 0; if(ongoing_index == 0) { m_bytes_per_rank = 0; - } + } try { for(const auto& file_range : all_of(*m_input_file) | as_blocks(m_block_size) | @@ -147,10 +147,9 @@ mpio_write::progress(int ongoing_index) { end - start) .count(); if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", - m_block_size / 1024.0, elapsed_seconds, - (m_block_size / (1024.0 * 1024.0)) / - (elapsed_seconds), + m_block_size / 1024.0, elapsed_seconds, bw(), sleep_value()); } @@ -198,9 +197,10 @@ mpio_write::progress(int ongoing_index) { } // step 3. parallel write data from buffers - if(const auto ec = MPI_File_write_all(output_file, m_buffer.data(), - static_cast(m_bytes_per_rank), - MPI_BYTE, MPI_STATUS_IGNORE); + if(const auto ec = + MPI_File_write_all(output_file, m_buffer.data(), + static_cast(m_bytes_per_rank), + MPI_BYTE, MPI_STATUS_IGNORE); ec != MPI_SUCCESS) { LOGGER_ERROR("MPI_File_write_all() failed: {}", mpi::error_string(ec)); diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 9eb64cb..8c3d5c1 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -48,7 +48,6 @@ public: cargo::error_code progress() const final; - int progress(int ongoing_index) final; diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 3771ba0..288991f 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -85,6 +85,14 @@ operation::t() { return m_t; } +float_t +operation::bw() { + return m_bw; +} + +void operation::bw(float_t bw) { + m_bw = bw; +} void operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, cargo::tag t) { diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 52726aa..9af7cbe 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -71,12 +71,16 @@ public: cargo::tag t(); + float_t bw(); + void bw(float_t bw); + private: - std::int16_t m_sleep_value; + std::int16_t m_sleep_value = 0; int m_rank; std::uint64_t m_tid; std::uint32_t m_seqno; cargo::tag m_t; + std::float_t m_bw; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index ab40808..8b97191 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -53,11 +53,11 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, void update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state st, + cargo::transfer_state st, std::float_t bw, std::optional ec = std::nullopt) { mpi::communicator world; - const cargo::status_message m{tid, seqno, st, ec}; + const cargo::status_message m{tid, seqno, st, bw, ec}; LOGGER_INFO("msg <= to: {} body: {{payload: {}}}", rank, m); world.send(rank, static_cast(cargo::tag::status), m); } @@ -114,16 +114,14 @@ worker::run() { int index = I->second.second; if(op) { if(op->t() == tag::pread or op->t() == tag::pwrite) { - LOGGER_INFO("transfer progress (R) {} {}", index, op->seqno()); index = op->progress(index); if(index == -1) { // operation finished cargo::error_code ec = op->progress(); - LOGGER_INFO("transfer FINISHED (R) {}", ec); update_state(op->source(), op->tid(), op->seqno(), ec ? transfer_state::failed : transfer_state::completed, - ec); + 0.0f, ec); // Transfer finished I = m_ops.erase(I); @@ -131,6 +129,8 @@ worker::run() { break; } } else { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running, op->bw()); I->second.second = index; } } @@ -166,12 +166,13 @@ worker::run() { op->set_comm(msg->source(), m.tid(), m.seqno(), t); update_state(op->source(), op->tid(), op->seqno(), - transfer_state::running); + transfer_state::running, -1.0f); // Different scenarios read -> write | write -> read + cargo::error_code ec = (*op)(); - if (ec != cargo::error_code::transfer_in_progress) { + if(ec != cargo::error_code::transfer_in_progress) { update_state(op->source(), op->tid(), op->seqno(), - transfer_state::failed); + transfer_state::failed, -1.0f, ec); m_ops.erase(make_pair(m.input_path(), m.output_path())); } break; -- GitLab From 471cee53597bb7e9de6b83f4eee0dfca9428c367 Mon Sep 17 00:00:00 2001 From: rnou Date: Fri, 3 Nov 2023 12:48:40 +0100 Subject: [PATCH 14/14] BW workflow finished --- lib/cargo.hpp | 11 +++++-- lib/libcargo.cpp | 15 ++++++---- src/master.cpp | 19 ++++++------ src/parallel_request.cpp | 21 ++++++++++--- src/parallel_request.hpp | 16 +++++++--- src/proto/mpi/message.hpp | 61 ++++++++++++++++++++++---------------- src/proto/rpc/response.hpp | 9 +++--- src/request_manager.cpp | 7 +++-- src/request_manager.hpp | 3 +- src/worker/ops.hpp | 8 +++-- src/worker/worker.cpp | 2 +- tests/tests.cpp | 5 ++-- 12 files changed, 112 insertions(+), 65 deletions(-) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index cd0f04d..04b0756 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -26,6 +26,7 @@ #ifndef CARGO_HPP #define CARGO_HPP +#include #include #include #include @@ -76,8 +77,8 @@ public: template void serialize(Archive& ar) { - ar & m_path; - ar & m_type; + ar& m_path; + ar& m_type; } private: @@ -149,7 +150,7 @@ class transfer_status { friend transfer_status transfer::status() const; - transfer_status(transfer_state status, error_code error) noexcept; + transfer_status(transfer_state status, float bw, error_code error) noexcept; public: /** @@ -187,8 +188,12 @@ public: [[nodiscard]] error_code error() const; + [[nodiscard]] float + bw() const; + private: transfer_state m_state; + float m_bw; error_code m_error; }; diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 634d73f..f084a0d 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -93,7 +93,7 @@ transfer::status() const { network::client rpc_client{m_srv.protocol()}; const auto rpc = network::rpc_info::create("transfer_status", m_srv.address()); - using response_type = status_response; + using response_type = status_response; if(const auto lookup_rv = rpc_client.lookup(m_srv.address()); lookup_rv.has_value()) { @@ -105,7 +105,7 @@ transfer::status() const { call_rv.has_value()) { const response_type resp{call_rv.value()}; - const auto& [s, ec] = resp.value(); + const auto& [s, bw, ec] = resp.value(); LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, @@ -116,16 +116,16 @@ transfer::status() const { fmt::format("rpc call failed: {}", resp.error_code())); } - return transfer_status{s, ec.value_or(error_code::success)}; + return transfer_status{s, bw, ec.value_or(error_code::success)}; } } throw std::runtime_error("rpc lookup failed"); } -transfer_status::transfer_status(transfer_state status, +transfer_status::transfer_status(transfer_state status, float bw, error_code error) noexcept - : m_state(status), m_error(error) {} + : m_state(status), m_bw(bw), m_error(error) {} transfer_state transfer_status::state() const noexcept { @@ -142,6 +142,11 @@ transfer_status::failed() const noexcept { return m_state == transfer_state::failed; } +float +transfer_status::bw() const { + return m_bw; +} + error_code transfer_status::error() const { switch(m_state) { diff --git a/src/master.cpp b/src/master.cpp index ea9fc9a..c9db8ae 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -126,7 +126,7 @@ master_server::mpi_listener_ult() { msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, - m.state(), m.error_code()); + m.state(), m.bw(), m.error_code()); break; } @@ -172,11 +172,12 @@ master_server::ping(const network::request& req) { void -master_server::bw_shaping(const network::request& req, std::uint64_t tid, std::int16_t shaping) { +master_server::bw_shaping(const network::request& req, std::uint64_t tid, + std::int16_t shaping) { using network::get_address; using network::rpc_info; using proto::generic_response; - mpi::communicator world; + mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{tid: {}, shaping: {}}}", rpc, tid, shaping); @@ -184,7 +185,8 @@ master_server::bw_shaping(const network::request& req, std::uint64_t tid, std::i for(int rank = 1; rank < world.size(); ++rank) { const auto m = cargo::shaper_message{tid, shaping}; LOGGER_INFO("msg <= to: {} body: {}", rank, m); - world.send(static_cast(rank), static_cast(tag::bw_shaping), m); + world.send(static_cast(rank), static_cast(tag::bw_shaping), + m); } const auto resp = generic_response{rpc.id(), error_code::success}; @@ -192,7 +194,6 @@ master_server::bw_shaping(const network::request& req, std::uint64_t tid, std::i LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); req.respond(resp); - } void @@ -258,7 +259,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { using proto::status_response; using response_type = - status_response; + status_response; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); @@ -274,9 +275,9 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { .map([&](auto&& rs) { LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc, error_code::success, rs); - req.respond( - response_type{rpc.id(), error_code::success, - std::make_pair(rs.state(), rs.error())}); + req.respond(response_type{ + rpc.id(), error_code::success, + std::make_tuple(rs.state(), rs.bw(), rs.error())}); }); } diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp index 15cbeab..5763198 100644 --- a/src/parallel_request.cpp +++ b/src/parallel_request.cpp @@ -47,10 +47,11 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) - : m_state(s.state()), m_error_code(s.error()) {} + : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} -request_status::request_status(transfer_state s, std::optional ec) - : m_state(s), m_error_code(ec) {} +request_status::request_status(transfer_state s, float bw, + std::optional ec) + : m_state(s), m_bw(bw), m_error_code(ec) {} transfer_state request_status::state() const { @@ -62,19 +63,31 @@ request_status::error() const { return m_error_code; } +float +request_status::bw() const { + return m_bw; +} + transfer_state part_status::state() const { return m_state; } +float +part_status::bw() const { + return m_bw; +} + std::optional part_status::error() const { return m_error_code; } void -part_status::update(transfer_state s, std::optional ec) noexcept { +part_status::update(transfer_state s, float bw, + std::optional ec) noexcept { m_state = s; + m_bw = bw; m_error_code = ec; } diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp index d2e78d9..222cd57 100644 --- a/src/parallel_request.hpp +++ b/src/parallel_request.hpp @@ -71,18 +71,22 @@ public: [[nodiscard]] std::optional error() const; + [[nodiscard]] float + bw() const; + void - update(transfer_state s, std::optional ec) noexcept; + update(transfer_state s, float bw, std::optional ec) noexcept; private: transfer_state m_state{transfer_state::pending}; + float m_bw; std::optional m_error_code{}; }; class request_status { public: request_status() = default; - explicit request_status(transfer_state s, + explicit request_status(transfer_state s, float bw, std::optional ec = {}); explicit request_status(part_status s); @@ -92,8 +96,12 @@ public: [[nodiscard]] std::optional error() const; + [[nodiscard]] float + bw() const; + private: transfer_state m_state{transfer_state::pending}; + float m_bw; std::optional m_error_code{}; }; @@ -121,8 +129,8 @@ struct fmt::formatter : formatter { } }; - const auto str = fmt::format("{{state: {}, error_code: {}}}", - state_name(s), s.error()); + const auto str = fmt::format("{{state: {}, bw: {}, error_code: {}}}", + state_name(s), s.bw(), s.error()); return formatter::format(str, ctx); } }; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index 7e057e3..e7cdc58 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -35,7 +35,14 @@ namespace cargo { -enum class tag : int { pread, pwrite, sequential, bw_shaping, status, shutdown }; +enum class tag : int { + pread, + pwrite, + sequential, + bw_shaping, + status, + shutdown +}; class transfer_message { @@ -75,10 +82,10 @@ private: serialize(Archive& ar, const unsigned int version) { (void) version; - ar & m_tid; - ar & m_seqno; - ar & m_input_path; - ar & m_output_path; + ar& m_tid; + ar& m_seqno; + ar& m_input_path; + ar& m_output_path; } std::uint64_t m_tid{}; @@ -95,10 +102,10 @@ public: status_message() = default; status_message(std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state state, std::float_t bw, + cargo::transfer_state state, float bw, std::optional error_code = std::nullopt) - : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), m_error_code(error_code) { - } + : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), + m_error_code(error_code) {} [[nodiscard]] std::uint64_t tid() const { @@ -115,7 +122,7 @@ public: return m_state; } - [[nodiscard]] std::float_t + [[nodiscard]] float bw() const { return m_bw; } @@ -132,17 +139,17 @@ private: serialize(Archive& ar, const unsigned int version) { (void) version; - ar & m_tid; - ar & m_seqno; - ar & m_state; - ar & m_bw; - ar & m_error_code; + ar& m_tid; + ar& m_seqno; + ar& m_state; + ar& m_bw; + ar& m_error_code; } std::uint64_t m_tid{}; std::uint32_t m_seqno{}; cargo::transfer_state m_state{}; - std::float_t m_bw{}; + float m_bw{}; std::optional m_error_code{}; }; @@ -154,8 +161,7 @@ public: shaper_message() = default; shaper_message(std::uint64_t tid, std::int16_t shaping) - : m_tid(tid), m_shaping(shaping) { - } + : m_tid(tid), m_shaping(shaping) {} [[nodiscard]] std::uint64_t tid() const { @@ -173,8 +179,8 @@ private: serialize(Archive& ar, const unsigned int version) { (void) version; - ar & m_tid; - ar & m_shaping; + ar& m_tid; + ar& m_shaping; } std::uint64_t m_tid{}; @@ -220,12 +226,14 @@ struct fmt::formatter : formatter { format(const cargo::status_message& s, FormatContext& ctx) const { const auto str = s.error_code() - ? fmt::format("{{tid: {}, seqno: {}, state: {}, bw: {}, " - "error_code: {}}}", - s.tid(), s.seqno(), s.state(), s.bw(), - *s.error_code()) - : fmt::format("{{tid: {}, seqno: {}, state: {}, bw: {}}}", - s.tid(), s.seqno(), s.state(), s.bw()); + ? fmt::format( + "{{tid: {}, seqno: {}, state: {}, bw: {}, " + "error_code: {}}}", + s.tid(), s.seqno(), s.state(), s.bw(), + *s.error_code()) + : fmt::format( + "{{tid: {}, seqno: {}, state: {}, bw: {}}}", + s.tid(), s.seqno(), s.state(), s.bw()); return formatter::format(str, ctx); } }; @@ -236,7 +244,8 @@ struct fmt::formatter : formatter { template auto format(const cargo::shaper_message& s, FormatContext& ctx) const { - const auto str = fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); + const auto str = + fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); return formatter::format(str, ctx); } }; diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp index 5aa487c..7257fb1 100644 --- a/src/proto/rpc/response.hpp +++ b/src/proto/rpc/response.hpp @@ -56,8 +56,8 @@ public: template constexpr void serialize(Archive&& ar) { - ar & m_op_id; - ar & m_error_code; + ar& m_op_id; + ar& m_error_code; } private: @@ -104,9 +104,10 @@ template using response_with_id = response_with_value; -template +template using status_response = - response_with_value>, Error>; + response_with_value>, + Error>; } // namespace cargo::proto diff --git a/src/request_manager.cpp b/src/request_manager.cpp index f39dea9..1496c03 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -57,14 +57,15 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { error_code request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, - transfer_state s, std::optional ec) { + transfer_state s, float bw, + std::optional ec) { abt::unique_lock lock(m_mutex); if(const auto it = m_requests.find(tid); it != m_requests.end()) { assert(seqno < it->second.size()); assert(wid < it->second[seqno].size()); - it->second[seqno][wid].update(s, ec); + it->second[seqno][wid].update(s, bw, ec); return error_code::success; } @@ -92,7 +93,7 @@ request_manager::lookup(std::uint64_t tid) { } } - return request_status{transfer_state::completed}; + return request_status{transfer_state::completed, 0.0f}; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); diff --git a/src/request_manager.hpp b/src/request_manager.hpp index ba26cee..1928d61 100644 --- a/src/request_manager.hpp +++ b/src/request_manager.hpp @@ -60,7 +60,8 @@ public: error_code update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, - transfer_state s, std::optional ec = std::nullopt); + transfer_state s, float bw, + std::optional ec = std::nullopt); tl::expected lookup(std::uint64_t tid); diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 9af7cbe..637cdc7 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -71,8 +71,10 @@ public: cargo::tag t(); - float_t bw(); - void bw(float_t bw); + float_t + bw(); + void + bw(float_t bw); private: std::int16_t m_sleep_value = 0; @@ -80,7 +82,7 @@ private: std::uint64_t m_tid; std::uint32_t m_seqno; cargo::tag m_t; - std::float_t m_bw; + float m_bw; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 8b97191..988a926 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -53,7 +53,7 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, void update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state st, std::float_t bw, + cargo::transfer_state st, float bw, std::optional ec = std::nullopt) { mpi::communicator world; diff --git a/tests/tests.cpp b/tests/tests.cpp index 1ea359b..c8b514a 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -206,7 +206,7 @@ equal(const std::filesystem::path& filepath1, uint32_t catch2_seed; std::string server_address; -#define NDATASETS 4 +#define NDATASETS 10 SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { @@ -238,6 +238,7 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { std::filesystem::remove(dataset.path()); }); + WHEN("Transferring datasets to a POSIX storage system") { const auto tx = cargo::transfer_datasets(server, sources, targets); @@ -269,7 +270,7 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { - std::size_t file_size = 10000; // GENERATE(1000, 10000); + std::size_t file_size = 1000; // GENERATE(1000, 10000); [[maybe_unused]] ascii_data_generator ascii_gen{512}; [[maybe_unused]] random_data_generator rng{ -- GitLab