diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index d349e97143aa3d08f2d19f490edcdfda15be5f1b..68fea90ffd7cae09c4422a0be73facf9bcb65f93 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -84,7 +84,24 @@ target_link_libraries(ccp cargo ) -install(TARGETS cargo_ping cargo_shutdown ccp +################################################################################ +## shaping: A CLI tool to request a Cargo server to slowdown transfers +add_executable(shaping) + +target_sources(shaping + PRIVATE + shaping.cpp +) + +target_link_libraries(shaping + PUBLIC + fmt::fmt + CLI11::CLI11 + net::rpc_client + cargo +) + +install(TARGETS cargo_ping cargo_shutdown ccp shaping RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) diff --git a/cli/shaping.cpp b/cli/shaping.cpp new file mode 100644 index 0000000000000000000000000000000000000000..edefb7cbc00616b64e9d560624cf0b9a0bb396a3 --- /dev/null +++ b/cli/shaping.cpp @@ -0,0 +1,114 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +struct shaping_config { + std::string progname; + std::string server_address; + std::int64_t tid; + std::int16_t shaping; +}; + +shaping_config +parse_command_line(int argc, char* argv[]) { + + shaping_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Cargo shaping client", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + + app.add_option("-i,--tid", cfg.tid, "transfer id") + ->option_text("integer") + ->required(); + + app.add_option("-b,--bw", cfg.shaping, "bw shaping") + ->option_text("integer") + ->required(); + + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + shaping_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + network::client rpc_client{protocol}; + + if(const auto result = rpc_client.lookup(address); result.has_value()) { + const auto& endpoint = result.value(); + const auto retval = endpoint.call("bw_shaping", cfg.tid, cfg.shaping); + + if(retval.has_value()) { + + auto error_code = int{retval.value()}; + + fmt::print("bw_shaping RPC was successful!\n"); + fmt::print(" (server replied with: {})\n", error_code); + return EXIT_SUCCESS; + } + + fmt::print(stderr, "bw_shaping RPC failed\n"); + return EXIT_FAILURE; + + } else { + fmt::print(stderr, "Failed to lookup address: {}\n", address); + return EXIT_FAILURE; + } + } catch(const std::exception& ex) { + fmt::print(stderr, "Error: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/lib/cargo.hpp b/lib/cargo.hpp index cd0f04dbd802ae692ff35365b66bae89616b2177..04b07565901d1fa236ea1a01efd6d0de39f3324b 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -26,6 +26,7 @@ #ifndef CARGO_HPP #define CARGO_HPP +#include #include #include #include @@ -76,8 +77,8 @@ public: template void serialize(Archive& ar) { - ar & m_path; - ar & m_type; + ar& m_path; + ar& m_type; } private: @@ -149,7 +150,7 @@ class transfer_status { friend transfer_status transfer::status() const; - transfer_status(transfer_state status, error_code error) noexcept; + transfer_status(transfer_state status, float bw, error_code error) noexcept; public: /** @@ -187,8 +188,12 @@ public: [[nodiscard]] error_code error() const; + [[nodiscard]] float + bw() const; + private: transfer_state m_state; + float m_bw; error_code m_error; }; diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 634d73f76ff1906e22e48c81cd00749c9607df8c..f084a0d3888674c28b749c57520dc2294ae022dd 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -93,7 +93,7 @@ transfer::status() const { network::client rpc_client{m_srv.protocol()}; const auto rpc = network::rpc_info::create("transfer_status", m_srv.address()); - using response_type = status_response; + using response_type = status_response; if(const auto lookup_rv = rpc_client.lookup(m_srv.address()); lookup_rv.has_value()) { @@ -105,7 +105,7 @@ transfer::status() const { call_rv.has_value()) { const response_type resp{call_rv.value()}; - const auto& [s, ec] = resp.value(); + const auto& [s, bw, ec] = resp.value(); LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, @@ -116,16 +116,16 @@ transfer::status() const { fmt::format("rpc call failed: {}", resp.error_code())); } - return transfer_status{s, ec.value_or(error_code::success)}; + return transfer_status{s, bw, ec.value_or(error_code::success)}; } } throw std::runtime_error("rpc lookup failed"); } -transfer_status::transfer_status(transfer_state status, +transfer_status::transfer_status(transfer_state status, float bw, error_code error) noexcept - : m_state(status), m_error(error) {} + : m_state(status), m_bw(bw), m_error(error) {} transfer_state transfer_status::state() const noexcept { @@ -142,6 +142,11 @@ transfer_status::failed() const noexcept { return m_state == transfer_state::failed; } +float +transfer_status::bw() const { + return m_bw; +} + error_code transfer_status::error() const { switch(m_state) { diff --git a/src/master.cpp b/src/master.cpp index 4b106eb68f74f794f0e38fe67659f249cf549a36..c9db8aee7e4de8c1648fd313ba867e1d12f16e77 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -85,6 +85,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(shutdown)); provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); + provider::define(EXPAND(bw_shaping)); #undef EXPAND @@ -125,7 +126,7 @@ master_server::mpi_listener_ult() { msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, - m.state(), m.error_code()); + m.state(), m.bw(), m.error_code()); break; } @@ -169,6 +170,32 @@ master_server::ping(const network::request& req) { req.respond(resp); } + +void +master_server::bw_shaping(const network::request& req, std::uint64_t tid, + std::int16_t shaping) { + using network::get_address; + using network::rpc_info; + using proto::generic_response; + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{tid: {}, shaping: {}}}", rpc, tid, shaping); + + for(int rank = 1; rank < world.size(); ++rank) { + const auto m = cargo::shaper_message{tid, shaping}; + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), static_cast(tag::bw_shaping), + m); + } + + const auto resp = generic_response{rpc.id(), error_code::success}; + + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + + req.respond(resp); +} + void master_server::shutdown(const network::request& req) { using network::get_address; @@ -232,7 +259,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { using proto::status_response; using response_type = - status_response; + status_response; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); @@ -248,9 +275,9 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { .map([&](auto&& rs) { LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc, error_code::success, rs); - req.respond( - response_type{rpc.id(), error_code::success, - std::make_pair(rs.state(), rs.error())}); + req.respond(response_type{ + rpc.id(), error_code::success, + std::make_tuple(rs.state(), rs.bw(), rs.error())}); }); } diff --git a/src/master.hpp b/src/master.hpp index 3d5797ac8919965f8451138357890f9f382b50c4..6243dafa26f39b6547607197c08c0f6e884e2fdf 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -58,6 +58,11 @@ private: void transfer_status(const network::request& req, std::uint64_t tid); + // Receives a request to increase or decrease BW + // -1 faster, 0 , +1 slower + void + bw_shaping(const network::request& req, std::uint64_t tid, std::int16_t shaping); + private: // Dedicated execution stream for the MPI listener ULT thallium::managed m_mpi_listener_ess; diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 35183220c61de077271ff8872245e41bdbee0185..87244dc4d91adf94635cee8dc77243b83e4f65d6 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -30,6 +30,7 @@ #include #include #include +#include #include // very simple RAII wrappers for some MPI types + utility functions diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp index 15cbeabecbd86a87d7d1616883ef93bc47da19e9..57631988ad9f60b28464cf3792f7331c9dfb4a30 100644 --- a/src/parallel_request.cpp +++ b/src/parallel_request.cpp @@ -47,10 +47,11 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) - : m_state(s.state()), m_error_code(s.error()) {} + : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} -request_status::request_status(transfer_state s, std::optional ec) - : m_state(s), m_error_code(ec) {} +request_status::request_status(transfer_state s, float bw, + std::optional ec) + : m_state(s), m_bw(bw), m_error_code(ec) {} transfer_state request_status::state() const { @@ -62,19 +63,31 @@ request_status::error() const { return m_error_code; } +float +request_status::bw() const { + return m_bw; +} + transfer_state part_status::state() const { return m_state; } +float +part_status::bw() const { + return m_bw; +} + std::optional part_status::error() const { return m_error_code; } void -part_status::update(transfer_state s, std::optional ec) noexcept { +part_status::update(transfer_state s, float bw, + std::optional ec) noexcept { m_state = s; + m_bw = bw; m_error_code = ec; } diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp index d2e78d9a1fbe6a30620217d4a170e1ee925e9ad8..222cd57de1927360d6e1e9f4eea7bef1fe4d94a5 100644 --- a/src/parallel_request.hpp +++ b/src/parallel_request.hpp @@ -71,18 +71,22 @@ public: [[nodiscard]] std::optional error() const; + [[nodiscard]] float + bw() const; + void - update(transfer_state s, std::optional ec) noexcept; + update(transfer_state s, float bw, std::optional ec) noexcept; private: transfer_state m_state{transfer_state::pending}; + float m_bw; std::optional m_error_code{}; }; class request_status { public: request_status() = default; - explicit request_status(transfer_state s, + explicit request_status(transfer_state s, float bw, std::optional ec = {}); explicit request_status(part_status s); @@ -92,8 +96,12 @@ public: [[nodiscard]] std::optional error() const; + [[nodiscard]] float + bw() const; + private: transfer_state m_state{transfer_state::pending}; + float m_bw; std::optional m_error_code{}; }; @@ -121,8 +129,8 @@ struct fmt::formatter : formatter { } }; - const auto str = fmt::format("{{state: {}, error_code: {}}}", - state_name(s), s.error()); + const auto str = fmt::format("{{state: {}, bw: {}, error_code: {}}}", + state_name(s), s.bw(), s.error()); return formatter::format(str, ctx); } }; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index bc8c5a1a88ced96ca1020e619f86177354900b7d..e7cdc583132fba03653a960e5bfa24b4c4d3ec83 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -35,7 +35,14 @@ namespace cargo { -enum class tag : int { pread, pwrite, sequential, status, shutdown }; +enum class tag : int { + pread, + pwrite, + sequential, + bw_shaping, + status, + shutdown +}; class transfer_message { @@ -75,10 +82,10 @@ private: serialize(Archive& ar, const unsigned int version) { (void) version; - ar & m_tid; - ar & m_seqno; - ar & m_input_path; - ar & m_output_path; + ar& m_tid; + ar& m_seqno; + ar& m_input_path; + ar& m_output_path; } std::uint64_t m_tid{}; @@ -95,10 +102,10 @@ public: status_message() = default; status_message(std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state state, + cargo::transfer_state state, float bw, std::optional error_code = std::nullopt) - : m_tid(tid), m_seqno(seqno), m_state(state), m_error_code(error_code) { - } + : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), + m_error_code(error_code) {} [[nodiscard]] std::uint64_t tid() const { @@ -115,6 +122,12 @@ public: return m_state; } + [[nodiscard]] float + bw() const { + return m_bw; + } + + [[nodiscard]] std::optional error_code() const { return m_error_code; @@ -126,18 +139,55 @@ private: serialize(Archive& ar, const unsigned int version) { (void) version; - ar & m_tid; - ar & m_seqno; - ar & m_state; - ar & m_error_code; + ar& m_tid; + ar& m_seqno; + ar& m_state; + ar& m_bw; + ar& m_error_code; } std::uint64_t m_tid{}; std::uint32_t m_seqno{}; cargo::transfer_state m_state{}; + float m_bw{}; std::optional m_error_code{}; }; +class shaper_message { + + friend class boost::serialization::access; + +public: + shaper_message() = default; + + shaper_message(std::uint64_t tid, std::int16_t shaping) + : m_tid(tid), m_shaping(shaping) {} + + [[nodiscard]] std::uint64_t + tid() const { + return m_tid; + } + + [[nodiscard]] std::int16_t + shaping() const { + return m_shaping; + } + +private: + template + void + serialize(Archive& ar, const unsigned int version) { + (void) version; + + ar& m_tid; + ar& m_shaping; + } + + std::uint64_t m_tid{}; + std::uint16_t m_shaping{}; +}; + + class shutdown_message { friend class boost::serialization::access; @@ -176,12 +226,26 @@ struct fmt::formatter : formatter { format(const cargo::status_message& s, FormatContext& ctx) const { const auto str = s.error_code() - ? fmt::format("{{tid: {}, seqno: {}, state: {}, " - "error_code: {}}}", - s.tid(), s.seqno(), s.state(), - *s.error_code()) - : fmt::format("{{tid: {}, seqno: {}, state: {}}}", - s.tid(), s.seqno(), s.state()); + ? fmt::format( + "{{tid: {}, seqno: {}, state: {}, bw: {}, " + "error_code: {}}}", + s.tid(), s.seqno(), s.state(), s.bw(), + *s.error_code()) + : fmt::format( + "{{tid: {}, seqno: {}, state: {}, bw: {}}}", + s.tid(), s.seqno(), s.state(), s.bw()); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::shaper_message& s, FormatContext& ctx) const { + const auto str = + fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); return formatter::format(str, ctx); } }; diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp index 5aa487c3fd0d5b4eed8e94a6be1954087837eb35..7257fb1fc86e8ee828f4f7494585f6f3d6751559 100644 --- a/src/proto/rpc/response.hpp +++ b/src/proto/rpc/response.hpp @@ -56,8 +56,8 @@ public: template constexpr void serialize(Archive&& ar) { - ar & m_op_id; - ar & m_error_code; + ar& m_op_id; + ar& m_error_code; } private: @@ -104,9 +104,10 @@ template using response_with_id = response_with_value; -template +template using status_response = - response_with_value>, Error>; + response_with_value>, + Error>; } // namespace cargo::proto diff --git a/src/request_manager.cpp b/src/request_manager.cpp index f39dea9237db15086596fa03c7bd5cb9da6a6415..1496c03d4ef8ccd8d438e53e5767af5c8759a074 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -57,14 +57,15 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { error_code request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, - transfer_state s, std::optional ec) { + transfer_state s, float bw, + std::optional ec) { abt::unique_lock lock(m_mutex); if(const auto it = m_requests.find(tid); it != m_requests.end()) { assert(seqno < it->second.size()); assert(wid < it->second[seqno].size()); - it->second[seqno][wid].update(s, ec); + it->second[seqno][wid].update(s, bw, ec); return error_code::success; } @@ -92,7 +93,7 @@ request_manager::lookup(std::uint64_t tid) { } } - return request_status{transfer_state::completed}; + return request_status{transfer_state::completed, 0.0f}; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); diff --git a/src/request_manager.hpp b/src/request_manager.hpp index ba26cee5765708b7dceebdee2cf32e757eeb4377..1928d61b819bd1669a6b1ad547eeda93c8b3f2ec 100644 --- a/src/request_manager.hpp +++ b/src/request_manager.hpp @@ -60,7 +60,8 @@ public: error_code update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, - transfer_state s, std::optional ec = std::nullopt); + transfer_state s, float bw, + std::optional ec = std::nullopt); tl::expected lookup(std::uint64_t tid); diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 1c6bbd0e4b187bd13069bd8c677f99dbfb41de44..1e0a6ab5b64ec1a7a4fcc8aabea6fa69a000b39e 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -22,11 +22,11 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include -#include + #include "mpio_read.hpp" #include "mpioxx.hpp" #include "memory.hpp" +#include namespace cargo { @@ -37,18 +37,18 @@ mpio_read::mpio_read(mpi::communicator workers, m_output_path(std::move(output_path)) {} cargo::error_code -mpio_read::operator()() const { +mpio_read::operator()() { using posix_file::views::all_of; using posix_file::views::as_blocks; using posix_file::views::strided; - + m_status = error_code::transfer_in_progress; try { const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = input_file.size(); - std::size_t block_size = 512u; + std::size_t block_size = 512 * 1024u; // create block type MPI_Datatype block_type; @@ -99,21 +99,20 @@ mpio_read::operator()() const { } // step 1. acquire buffers - memory_buffer buffer; - buffer.resize(blocks_per_rank * block_size); - std::vector buffer_regions; - buffer_regions.reserve(blocks_per_rank); + m_buffer.resize(blocks_per_rank * block_size); + + m_buffer_regions.reserve(blocks_per_rank); for(std::size_t i = 0; i < blocks_per_rank; ++i) { - buffer_regions.emplace_back(buffer.data() + i * block_size, - block_size); + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); } MPI_Datatype datatype = block_type; // step2. parallel read data into buffers - if(const auto ec = MPI_File_read_all(input_file, buffer.data(), + if(const auto ec = MPI_File_read_all(input_file, m_buffer.data(), static_cast(blocks_per_rank), datatype, MPI_STATUS_IGNORE); ec != MPI_SUCCESS) { @@ -123,36 +122,106 @@ mpio_read::operator()() const { } // step3. POSIX write data - const auto output_file = - posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR); + m_output_file = std::make_unique( + posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR)); - output_file.fallocate(0, 0, file_size); + m_output_file->fallocate(0, 0, file_size); + + + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; - int index = 0; - for(const auto& file_range : - all_of(posix_file::file{m_input_path}) | as_blocks(block_size) | - strided(workers_size, workers_rank)) { - assert(buffer_regions[index].size() >= file_range.size()); - output_file.pwrite(buffer_regions[index], file_range.offset(), - file_range.size()); - ++index; - } } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_mpi_error(e.error_code()); return make_mpi_error(e.error_code()); } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); return make_system_error(e.error_code()); - } catch (const std::system_error& e) { + } catch(const std::system_error& e) { LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); return make_system_error(e.code().value()); } catch(const std::exception& e) { LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; return error_code::other; } + m_status = error_code::transfer_in_progress; + return error_code::transfer_in_progress; +} + +int +mpio_read::progress(int ongoing_index) { + + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + try { + int index = 0; + m_status = error_code::transfer_in_progress; + for(const auto& file_range : + all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), + file_range.size()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast>( + end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + // Do sleep + std::this_thread::sleep_for(sleep_value()); + + ++index; + } + } catch(const mpioxx::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_mpi_error(e.error_code()); + return -1; + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + + m_status = error_code::success; + return -1; +} - return error_code::success; +// This needs to be go through different phases... +cargo::error_code +mpio_read::progress() const { + return m_status; } } // namespace cargo \ No newline at end of file diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 5ce311dfb1f7efae78b8889acbf163e06505887f..d135b05bbbc82c85f6e71af60678adbe3c7c0cc5 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -26,6 +26,9 @@ #define CARGO_WORKER_MPIO_READ_HPP #include "ops.hpp" +#include "memory.hpp" +#include +#include namespace mpi = boost::mpi; @@ -38,12 +41,28 @@ public: std::filesystem::path output_path); cargo::error_code - operator()() const final; + operator()() final; + + cargo::error_code + progress() const final; + + int + progress(int ongoing_index ) final; private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + cargo::error_code m_status; + + + std::unique_ptr m_output_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + memory_buffer m_buffer; + std::vector m_buffer_regions; + }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 5f40048faaecf7c60d3a632446c0f19bb6e04e2e..ccf81e017b6de3efd65d17faff29f524436222f0 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -22,26 +22,25 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include -#include + #include "mpio_write.hpp" #include "mpioxx.hpp" -#include "memory.hpp" +#include namespace cargo { cargo::error_code -mpio_write::operator()() const { +mpio_write::operator()() { using posix_file::views::all_of; using posix_file::views::as_blocks; using posix_file::views::strided; - + m_status = error_code::transfer_in_progress; try { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); - std::size_t block_size = 512u; + std::size_t block_size = 512 * 1024u; // 512 kb std::size_t file_size = std::filesystem::file_size(m_input_path); // compute the number of blocks in the file @@ -60,30 +59,80 @@ mpio_write::operator()() const { } // step 1. acquire buffers - memory_buffer buffer; - buffer.resize(blocks_per_rank * block_size); - std::vector buffer_regions; - buffer_regions.reserve(blocks_per_rank); + m_buffer.resize(blocks_per_rank * block_size); + m_buffer_regions.reserve(blocks_per_rank); for(std::size_t i = 0; i < blocks_per_rank; ++i) { - buffer_regions.emplace_back(buffer.data() + i * block_size, - block_size); + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); } - const auto input_file = posix_file::open(m_input_path, O_RDONLY); + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY)); - int index = 0; - std::size_t bytes_per_rank = 0; + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + m_file_size = file_size; + m_total_blocks = total_blocks; - for(const auto& file_range : - all_of(input_file) | as_blocks(block_size) | - strided(workers_size, workers_rank)) { + } catch(const mpioxx::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_mpi_error(e.error_code()); + return make_mpi_error(e.error_code()); + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } - assert(buffer_regions[index].size() >= file_range.size()); + return error_code::transfer_in_progress; +} +cargo::error_code +mpio_write::progress() const { + return m_status; +} + +int +mpio_write::progress(int ongoing_index) { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + // compute the number of blocks in the file + + int index = 0; + if(ongoing_index == 0) { + m_bytes_per_rank = 0; + } + try { + for(const auto& file_range : + all_of(*m_input_file) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + m_status = error_code::transfer_in_progress; + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); const std::size_t n = - input_file.pread(buffer_regions[index], file_range.offset(), - file_range.size()); + m_input_file->pread(m_buffer_regions[index], + file_range.offset(), file_range.size()); LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", fmt::join(buffer_regions[index].begin(), @@ -91,7 +140,22 @@ mpio_write::operator()() const { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); - bytes_per_rank += n; + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast>( + end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); ++index; } @@ -103,7 +167,7 @@ mpio_write::operator()() const { // create block type MPI_Datatype block_type; - MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, + MPI_Type_contiguous(static_cast(m_block_size), MPI_BYTE, &block_type); MPI_Type_commit(&block_type); @@ -115,46 +179,55 @@ mpio_write::operator()() const { * blocklen: number of `oldtype` elements in each block * stride: number of `oldtype` elements between start of each block */ - MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, - /* stride: */ workers_size, /* oldtype: */ block_type, + MPI_Type_vector(/* count: */ m_total_blocks, /* blocklength: */ 1, + /* stride: */ m_workers_size, /* oldtype: */ block_type, &file_type); MPI_Type_commit(&file_type); if(const auto ec = MPI_File_set_view(output_file, - /* disp: */ workers_rank * block_size, + /* disp: */ m_workers_rank * m_block_size, /* elementary_type: */ block_type, file_type, "native", MPI_INFO_NULL); ec != MPI_SUCCESS) { LOGGER_ERROR("MPI_File_set_view() failed: {}", mpi::error_string(ec)); - return make_mpi_error(ec); + m_status = make_mpi_error(ec); + return -1; } // step 3. parallel write data from buffers - if(const auto ec = MPI_File_write_all(output_file, buffer.data(), - static_cast(bytes_per_rank), - MPI_BYTE, MPI_STATUS_IGNORE); + if(const auto ec = + MPI_File_write_all(output_file, m_buffer.data(), + static_cast(m_bytes_per_rank), + MPI_BYTE, MPI_STATUS_IGNORE); ec != MPI_SUCCESS) { LOGGER_ERROR("MPI_File_write_all() failed: {}", mpi::error_string(ec)); - return make_mpi_error(ec); + m_status = make_mpi_error(ec); + return -1; } } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); - return make_mpi_error(e.error_code()); + m_status = make_mpi_error(e.error_code()); + return -1; } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); - return make_system_error(e.error_code()); - } catch (const std::system_error& e) { + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { LOGGER_ERROR("Unexpected system error: {}", e.what()); - return make_system_error(e.code().value()); + m_status = make_system_error(e.code().value()); + return -1; } catch(const std::exception& e) { LOGGER_ERROR("Unexpected exception: {}", e.what()); - return error_code::other; + m_status = error_code::other; + return -1; } - return error_code::success; + m_status = error_code::success; + + return -1; } } // namespace cargo diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 69b2ced46e6c3770e599780ee100919ac41e4063..8c3d5c1cb054332ed15b26564314b11bc570e606 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -25,7 +25,10 @@ #ifndef CARGO_WORKER_MPIO_WRITE_HPP #define CARGO_WORKER_MPIO_WRITE_HPP +#include +#include #include "ops.hpp" +#include "memory.hpp" namespace mpi = boost::mpi; @@ -40,12 +43,33 @@ public: m_output_path(std::move(output_path)) {} cargo::error_code - operator()() const final; + operator()() final; + + cargo::error_code + progress() const final; + + int + progress(int ongoing_index) final; + private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + + cargo::error_code m_status; + + + std::unique_ptr m_input_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + std::size_t m_file_size; + int m_total_blocks; + + memory_buffer m_buffer; + std::vector m_buffer_regions; + std::size_t m_bytes_per_rank; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 17c16c9bfe4627b04777790d1bba7f69fca6be61..288991fc10324dcd3003c7cfd408b3f3e052151f 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -54,4 +54,57 @@ operation::make_operation(cargo::tag t, mpi::communicator workers, } } +std::chrono::milliseconds +operation::sleep_value() const { + if(m_sleep_value <= 0) + return std::chrono::milliseconds{0}; + else + return std::chrono::milliseconds{m_sleep_value * 100}; +} + +void +operation::set_bw_shaping(std::int16_t incr) { + m_sleep_value += incr; +} + +int +operation::source() { + return m_rank; +} +std::uint64_t +operation::tid() { + return m_tid; +} +std::uint32_t +operation::seqno() { + return m_seqno; +} + +cargo::tag +operation::t() { + return m_t; +} + +float_t +operation::bw() { + return m_bw; +} + +void operation::bw(float_t bw) { + m_bw = bw; +} +void +operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, + cargo::tag t) { + m_rank = rank; + m_tid = tid; + m_seqno = seqno; + m_t = t; +} + +cargo::error_code +operation::progress() const { + return error_code::other; +} + } // namespace cargo diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index b9a20cfada8a627cd08e87d5bf796e757c9b09e0..637cdc7a45a553890883d89d1cfef9503292d021 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -30,7 +30,6 @@ #include #include "proto/mpi/message.hpp" #include "cargo.hpp" - namespace cargo { /** @@ -47,7 +46,43 @@ public: virtual ~operation() = default; virtual cargo::error_code - operator()() const = 0; + operator()() = 0; + + + std::chrono::milliseconds + sleep_value() const; + // We pass a - or + value to decrease or increase the bw shaping. + void + set_bw_shaping(std::int16_t incr); + virtual cargo::error_code + progress() const = 0; + virtual int + progress(int index) = 0; + + + int + source(); + std::uint64_t + tid(); + std::uint32_t + seqno(); + void + set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, cargo::tag t); + cargo::tag + t(); + + float_t + bw(); + void + bw(float_t bw); + +private: + std::int16_t m_sleep_value = 0; + int m_rank; + std::uint64_t m_tid; + std::uint32_t m_seqno; + cargo::tag m_t; + float m_bw; }; } // namespace cargo diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index a1cf8ed9f60707ba676981d84520acfae8810992..83d3da5289b04cfcefd51a571b1130d4250ce589 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -29,9 +29,19 @@ namespace cargo { cargo::error_code -seq_operation::operator()() const { +seq_operation::operator()() { LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); return cargo::error_code::not_implemented; } +cargo::error_code +seq_operation::progress() const { + return error_code::success; +} + +int +seq_operation::progress(int ongoing_index) { + return ++ongoing_index; +} + } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 82fa6196cecd80ba4f600e43b3306b34bb194545..f5d54da17ca642235d19da348c8b77f2edc47d6e 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -40,7 +40,12 @@ public: m_output_path(std::move(output_path)) {} cargo::error_code - operator()() const final; + operator()() final; + cargo::error_code + progress() const; + + int + progress(int ongoing_index ) final; private: mpi::communicator m_comm; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 484f3b0e3142b3529ddafd9ae0141f63254ea9e8..988a926aa98e1af5da5f23b028ebef059016282c 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -27,9 +27,8 @@ #include #include #include -#include "ops.hpp" + #include "worker.hpp" -#include "proto/mpi/message.hpp" #include "fmt_formatters.hpp" namespace mpi = boost::mpi; @@ -54,11 +53,11 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, void update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state st, + cargo::transfer_state st, float bw, std::optional ec = std::nullopt) { mpi::communicator world; - const cargo::status_message m{tid, seqno, st, ec}; + const cargo::status_message m{tid, seqno, st, bw, ec}; LOGGER_INFO("msg <= to: {} body: {{payload: {}}}", rank, m); world.send(rank, static_cast(cargo::tag::status), m); } @@ -102,14 +101,45 @@ worker::run() { LOGGER_INFO("{:=>{}}", "", greeting.size()); bool done = false; - while(!done) { auto msg = world.iprobe(); if(!msg) { // FIXME: sleep time should be configurable - std::this_thread::sleep_for(150ms); + + // Progress through all transfers + for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + auto op = I->second.first.get(); + int index = I->second.second; + if(op) { + if(op->t() == tag::pread or op->t() == tag::pwrite) { + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + 0.0f, ec); + + // Transfer finished + I = m_ops.erase(I); + if(I == m_ops.end()) { + break; + } + } else { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running, op->bw()); + I->second.second = index; + } + } + } + } + + if(m_ops.size() == 0) { + std::this_thread::sleep_for(150ms); + } continue; } @@ -122,19 +152,47 @@ worker::run() { transfer_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + m_ops.emplace(std::make_pair( + make_pair(m.input_path(), m.output_path()), + make_pair(operation::make_operation(t, workers, + m.input_path(), + m.output_path()), + 0))); + + const auto op = + m_ops[make_pair(m.input_path(), m.output_path())] + .first.get(); - const auto op = operation::make_operation( - t, workers, m.input_path(), m.output_path()); + op->set_comm(msg->source(), m.tid(), m.seqno(), t); - update_state(msg->source(), m.tid(), m.seqno(), - transfer_state::running); + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running, -1.0f); + // Different scenarios read -> write | write -> read cargo::error_code ec = (*op)(); + if(ec != cargo::error_code::transfer_in_progress) { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::failed, -1.0f, ec); + m_ops.erase(make_pair(m.input_path(), m.output_path())); + } + break; + } + + case tag::bw_shaping: { + shaper_message m; + world.recv(msg->source(), msg->tag(), m); + LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + const auto op = I->second.first.get(); + if(op) { + + op->set_bw_shaping(0); + } else { + LOGGER_INFO("Operation non existent", msg->source(), m); + } + } + - update_state(msg->source(), m.tid(), m.seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - ec); break; } diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp index aaeea068e40b0ef7812bb9d9d4dda264262b8257..1890974bd11aae2bd92fe63279df800963e31509 100644 --- a/src/worker/worker.hpp +++ b/src/worker/worker.hpp @@ -26,6 +26,9 @@ #ifndef CARGO_WORKER_HPP #define CARGO_WORKER_HPP +#include "proto/mpi/message.hpp" +#include +#include "ops.hpp" namespace cargo { class worker { @@ -39,9 +42,11 @@ public: run(); private: + std::map, std::pair< std::unique_ptr, int> > m_ops; std::string m_name; int m_rank; std::optional m_output_file; + }; } // namespace cargo diff --git a/tests/tests.cpp b/tests/tests.cpp index e2b235784eb32c4301ab4aa590b62e57709069b5..c8b514a88e0f3d08f830c5983c1c2c35ac3c3cfb 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -238,6 +238,7 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { std::filesystem::remove(dataset.path()); }); + WHEN("Transferring datasets to a POSIX storage system") { const auto tx = cargo::transfer_datasets(server, sources, targets); @@ -269,7 +270,7 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { - std::size_t file_size = 10000; // GENERATE(1000, 10000); + std::size_t file_size = 1000; // GENERATE(1000, 10000); [[maybe_unused]] ascii_data_generator ascii_gen{512}; [[maybe_unused]] random_data_generator rng{