diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt
index d349e97143aa3d08f2d19f490edcdfda15be5f1b..68fea90ffd7cae09c4422a0be73facf9bcb65f93 100644
--- a/cli/CMakeLists.txt
+++ b/cli/CMakeLists.txt
@@ -84,7 +84,24 @@ target_link_libraries(ccp
cargo
)
-install(TARGETS cargo_ping cargo_shutdown ccp
+################################################################################
+## shaping: A CLI tool to request a Cargo server to slowdown transfers
+add_executable(shaping)
+
+target_sources(shaping
+ PRIVATE
+ shaping.cpp
+)
+
+target_link_libraries(shaping
+ PUBLIC
+ fmt::fmt
+ CLI11::CLI11
+ net::rpc_client
+ cargo
+)
+
+install(TARGETS cargo_ping cargo_shutdown ccp shaping
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
diff --git a/cli/shaping.cpp b/cli/shaping.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..edefb7cbc00616b64e9d560624cf0b9a0bb396a3
--- /dev/null
+++ b/cli/shaping.cpp
@@ -0,0 +1,114 @@
+/******************************************************************************
+ * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of Cargo.
+ *
+ * Cargo is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Cargo is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Cargo. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+struct shaping_config {
+ std::string progname;
+ std::string server_address;
+ std::int64_t tid;
+ std::int16_t shaping;
+};
+
+shaping_config
+parse_command_line(int argc, char* argv[]) {
+
+ shaping_config cfg;
+
+ cfg.progname = std::filesystem::path{argv[0]}.filename().string();
+
+ CLI::App app{"Cargo shaping client", cfg.progname};
+
+ app.add_option("-s,--server", cfg.server_address, "Server address")
+ ->option_text("ADDRESS")
+ ->required();
+
+ app.add_option("-i,--tid", cfg.tid, "transfer id")
+ ->option_text("integer")
+ ->required();
+
+ app.add_option("-b,--bw", cfg.shaping, "bw shaping")
+ ->option_text("integer")
+ ->required();
+
+
+ try {
+ app.parse(argc, argv);
+ return cfg;
+ } catch(const CLI::ParseError& ex) {
+ std::exit(app.exit(ex));
+ }
+}
+
+auto
+parse_address(const std::string& address) {
+ const auto pos = address.find("://");
+ if(pos == std::string::npos) {
+ throw std::runtime_error(fmt::format("Invalid address: {}", address));
+ }
+
+ const auto protocol = address.substr(0, pos);
+ return std::make_pair(protocol, address);
+}
+
+
+int
+main(int argc, char* argv[]) {
+
+ shaping_config cfg = parse_command_line(argc, argv);
+
+ try {
+ const auto [protocol, address] = parse_address(cfg.server_address);
+ network::client rpc_client{protocol};
+
+ if(const auto result = rpc_client.lookup(address); result.has_value()) {
+ const auto& endpoint = result.value();
+ const auto retval = endpoint.call("bw_shaping", cfg.tid, cfg.shaping);
+
+ if(retval.has_value()) {
+
+ auto error_code = int{retval.value()};
+
+ fmt::print("bw_shaping RPC was successful!\n");
+ fmt::print(" (server replied with: {})\n", error_code);
+ return EXIT_SUCCESS;
+ }
+
+ fmt::print(stderr, "bw_shaping RPC failed\n");
+ return EXIT_FAILURE;
+
+ } else {
+ fmt::print(stderr, "Failed to lookup address: {}\n", address);
+ return EXIT_FAILURE;
+ }
+ } catch(const std::exception& ex) {
+ fmt::print(stderr, "Error: {}\n", ex.what());
+ return EXIT_FAILURE;
+ }
+}
diff --git a/lib/cargo.hpp b/lib/cargo.hpp
index cd0f04dbd802ae692ff35365b66bae89616b2177..04b07565901d1fa236ea1a01efd6d0de39f3324b 100644
--- a/lib/cargo.hpp
+++ b/lib/cargo.hpp
@@ -26,6 +26,7 @@
#ifndef CARGO_HPP
#define CARGO_HPP
+#include
#include
#include
#include
@@ -76,8 +77,8 @@ public:
template
void
serialize(Archive& ar) {
- ar & m_path;
- ar & m_type;
+ ar& m_path;
+ ar& m_type;
}
private:
@@ -149,7 +150,7 @@ class transfer_status {
friend transfer_status
transfer::status() const;
- transfer_status(transfer_state status, error_code error) noexcept;
+ transfer_status(transfer_state status, float bw, error_code error) noexcept;
public:
/**
@@ -187,8 +188,12 @@ public:
[[nodiscard]] error_code
error() const;
+ [[nodiscard]] float
+ bw() const;
+
private:
transfer_state m_state;
+ float m_bw;
error_code m_error;
};
diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp
index 634d73f76ff1906e22e48c81cd00749c9607df8c..f084a0d3888674c28b749c57520dc2294ae022dd 100644
--- a/lib/libcargo.cpp
+++ b/lib/libcargo.cpp
@@ -93,7 +93,7 @@ transfer::status() const {
network::client rpc_client{m_srv.protocol()};
const auto rpc =
network::rpc_info::create("transfer_status", m_srv.address());
- using response_type = status_response;
+ using response_type = status_response;
if(const auto lookup_rv = rpc_client.lookup(m_srv.address());
lookup_rv.has_value()) {
@@ -105,7 +105,7 @@ transfer::status() const {
call_rv.has_value()) {
const response_type resp{call_rv.value()};
- const auto& [s, ec] = resp.value();
+ const auto& [s, bw, ec] = resp.value();
LOGGER_EVAL(resp.error_code(), INFO, ERROR,
"rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc,
@@ -116,16 +116,16 @@ transfer::status() const {
fmt::format("rpc call failed: {}", resp.error_code()));
}
- return transfer_status{s, ec.value_or(error_code::success)};
+ return transfer_status{s, bw, ec.value_or(error_code::success)};
}
}
throw std::runtime_error("rpc lookup failed");
}
-transfer_status::transfer_status(transfer_state status,
+transfer_status::transfer_status(transfer_state status, float bw,
error_code error) noexcept
- : m_state(status), m_error(error) {}
+ : m_state(status), m_bw(bw), m_error(error) {}
transfer_state
transfer_status::state() const noexcept {
@@ -142,6 +142,11 @@ transfer_status::failed() const noexcept {
return m_state == transfer_state::failed;
}
+float
+transfer_status::bw() const {
+ return m_bw;
+}
+
error_code
transfer_status::error() const {
switch(m_state) {
diff --git a/src/master.cpp b/src/master.cpp
index 4b106eb68f74f794f0e38fe67659f249cf549a36..c9db8aee7e4de8c1648fd313ba867e1d12f16e77 100644
--- a/src/master.cpp
+++ b/src/master.cpp
@@ -85,6 +85,7 @@ master_server::master_server(std::string name, std::string address,
provider::define(EXPAND(shutdown));
provider::define(EXPAND(transfer_datasets));
provider::define(EXPAND(transfer_status));
+ provider::define(EXPAND(bw_shaping));
#undef EXPAND
@@ -125,7 +126,7 @@ master_server::mpi_listener_ult() {
msg->source(), m);
m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1,
- m.state(), m.error_code());
+ m.state(), m.bw(), m.error_code());
break;
}
@@ -169,6 +170,32 @@ master_server::ping(const network::request& req) {
req.respond(resp);
}
+
+void
+master_server::bw_shaping(const network::request& req, std::uint64_t tid,
+ std::int16_t shaping) {
+ using network::get_address;
+ using network::rpc_info;
+ using proto::generic_response;
+ mpi::communicator world;
+ const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
+
+ LOGGER_INFO("rpc {:>} body: {{tid: {}, shaping: {}}}", rpc, tid, shaping);
+
+ for(int rank = 1; rank < world.size(); ++rank) {
+ const auto m = cargo::shaper_message{tid, shaping};
+ LOGGER_INFO("msg <= to: {} body: {}", rank, m);
+ world.send(static_cast(rank), static_cast(tag::bw_shaping),
+ m);
+ }
+
+ const auto resp = generic_response{rpc.id(), error_code::success};
+
+ LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());
+
+ req.respond(resp);
+}
+
void
master_server::shutdown(const network::request& req) {
using network::get_address;
@@ -232,7 +259,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
using proto::status_response;
using response_type =
- status_response;
+ status_response;
mpi::communicator world;
const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
@@ -248,9 +275,9 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
.map([&](auto&& rs) {
LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc,
error_code::success, rs);
- req.respond(
- response_type{rpc.id(), error_code::success,
- std::make_pair(rs.state(), rs.error())});
+ req.respond(response_type{
+ rpc.id(), error_code::success,
+ std::make_tuple(rs.state(), rs.bw(), rs.error())});
});
}
diff --git a/src/master.hpp b/src/master.hpp
index 3d5797ac8919965f8451138357890f9f382b50c4..6243dafa26f39b6547607197c08c0f6e884e2fdf 100644
--- a/src/master.hpp
+++ b/src/master.hpp
@@ -58,6 +58,11 @@ private:
void
transfer_status(const network::request& req, std::uint64_t tid);
+ // Receives a request to increase or decrease BW
+ // -1 faster, 0 , +1 slower
+ void
+ bw_shaping(const network::request& req, std::uint64_t tid, std::int16_t shaping);
+
private:
// Dedicated execution stream for the MPI listener ULT
thallium::managed m_mpi_listener_ess;
diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp
index 35183220c61de077271ff8872245e41bdbee0185..87244dc4d91adf94635cee8dc77243b83e4f65d6 100644
--- a/src/mpioxx.hpp
+++ b/src/mpioxx.hpp
@@ -30,6 +30,7 @@
#include
#include
#include
+#include
#include
// very simple RAII wrappers for some MPI types + utility functions
diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp
index 15cbeabecbd86a87d7d1616883ef93bc47da19e9..57631988ad9f60b28464cf3792f7331c9dfb4a30 100644
--- a/src/parallel_request.cpp
+++ b/src/parallel_request.cpp
@@ -47,10 +47,11 @@ parallel_request::nworkers() const {
}
request_status::request_status(part_status s)
- : m_state(s.state()), m_error_code(s.error()) {}
+ : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {}
-request_status::request_status(transfer_state s, std::optional ec)
- : m_state(s), m_error_code(ec) {}
+request_status::request_status(transfer_state s, float bw,
+ std::optional ec)
+ : m_state(s), m_bw(bw), m_error_code(ec) {}
transfer_state
request_status::state() const {
@@ -62,19 +63,31 @@ request_status::error() const {
return m_error_code;
}
+float
+request_status::bw() const {
+ return m_bw;
+}
+
transfer_state
part_status::state() const {
return m_state;
}
+float
+part_status::bw() const {
+ return m_bw;
+}
+
std::optional
part_status::error() const {
return m_error_code;
}
void
-part_status::update(transfer_state s, std::optional ec) noexcept {
+part_status::update(transfer_state s, float bw,
+ std::optional ec) noexcept {
m_state = s;
+ m_bw = bw;
m_error_code = ec;
}
diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp
index d2e78d9a1fbe6a30620217d4a170e1ee925e9ad8..222cd57de1927360d6e1e9f4eea7bef1fe4d94a5 100644
--- a/src/parallel_request.hpp
+++ b/src/parallel_request.hpp
@@ -71,18 +71,22 @@ public:
[[nodiscard]] std::optional
error() const;
+ [[nodiscard]] float
+ bw() const;
+
void
- update(transfer_state s, std::optional ec) noexcept;
+ update(transfer_state s, float bw, std::optional ec) noexcept;
private:
transfer_state m_state{transfer_state::pending};
+ float m_bw;
std::optional m_error_code{};
};
class request_status {
public:
request_status() = default;
- explicit request_status(transfer_state s,
+ explicit request_status(transfer_state s, float bw,
std::optional ec = {});
explicit request_status(part_status s);
@@ -92,8 +96,12 @@ public:
[[nodiscard]] std::optional
error() const;
+ [[nodiscard]] float
+ bw() const;
+
private:
transfer_state m_state{transfer_state::pending};
+ float m_bw;
std::optional m_error_code{};
};
@@ -121,8 +129,8 @@ struct fmt::formatter : formatter {
}
};
- const auto str = fmt::format("{{state: {}, error_code: {}}}",
- state_name(s), s.error());
+ const auto str = fmt::format("{{state: {}, bw: {}, error_code: {}}}",
+ state_name(s), s.bw(), s.error());
return formatter::format(str, ctx);
}
};
diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp
index bc8c5a1a88ced96ca1020e619f86177354900b7d..e7cdc583132fba03653a960e5bfa24b4c4d3ec83 100644
--- a/src/proto/mpi/message.hpp
+++ b/src/proto/mpi/message.hpp
@@ -35,7 +35,14 @@
namespace cargo {
-enum class tag : int { pread, pwrite, sequential, status, shutdown };
+enum class tag : int {
+ pread,
+ pwrite,
+ sequential,
+ bw_shaping,
+ status,
+ shutdown
+};
class transfer_message {
@@ -75,10 +82,10 @@ private:
serialize(Archive& ar, const unsigned int version) {
(void) version;
- ar & m_tid;
- ar & m_seqno;
- ar & m_input_path;
- ar & m_output_path;
+ ar& m_tid;
+ ar& m_seqno;
+ ar& m_input_path;
+ ar& m_output_path;
}
std::uint64_t m_tid{};
@@ -95,10 +102,10 @@ public:
status_message() = default;
status_message(std::uint64_t tid, std::uint32_t seqno,
- cargo::transfer_state state,
+ cargo::transfer_state state, float bw,
std::optional error_code = std::nullopt)
- : m_tid(tid), m_seqno(seqno), m_state(state), m_error_code(error_code) {
- }
+ : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw),
+ m_error_code(error_code) {}
[[nodiscard]] std::uint64_t
tid() const {
@@ -115,6 +122,12 @@ public:
return m_state;
}
+ [[nodiscard]] float
+ bw() const {
+ return m_bw;
+ }
+
+
[[nodiscard]] std::optional
error_code() const {
return m_error_code;
@@ -126,18 +139,55 @@ private:
serialize(Archive& ar, const unsigned int version) {
(void) version;
- ar & m_tid;
- ar & m_seqno;
- ar & m_state;
- ar & m_error_code;
+ ar& m_tid;
+ ar& m_seqno;
+ ar& m_state;
+ ar& m_bw;
+ ar& m_error_code;
}
std::uint64_t m_tid{};
std::uint32_t m_seqno{};
cargo::transfer_state m_state{};
+ float m_bw{};
std::optional m_error_code{};
};
+class shaper_message {
+
+ friend class boost::serialization::access;
+
+public:
+ shaper_message() = default;
+
+ shaper_message(std::uint64_t tid, std::int16_t shaping)
+ : m_tid(tid), m_shaping(shaping) {}
+
+ [[nodiscard]] std::uint64_t
+ tid() const {
+ return m_tid;
+ }
+
+ [[nodiscard]] std::int16_t
+ shaping() const {
+ return m_shaping;
+ }
+
+private:
+ template
+ void
+ serialize(Archive& ar, const unsigned int version) {
+ (void) version;
+
+ ar& m_tid;
+ ar& m_shaping;
+ }
+
+ std::uint64_t m_tid{};
+ std::uint16_t m_shaping{};
+};
+
+
class shutdown_message {
friend class boost::serialization::access;
@@ -176,12 +226,26 @@ struct fmt::formatter : formatter {
format(const cargo::status_message& s, FormatContext& ctx) const {
const auto str =
s.error_code()
- ? fmt::format("{{tid: {}, seqno: {}, state: {}, "
- "error_code: {}}}",
- s.tid(), s.seqno(), s.state(),
- *s.error_code())
- : fmt::format("{{tid: {}, seqno: {}, state: {}}}",
- s.tid(), s.seqno(), s.state());
+ ? fmt::format(
+ "{{tid: {}, seqno: {}, state: {}, bw: {}, "
+ "error_code: {}}}",
+ s.tid(), s.seqno(), s.state(), s.bw(),
+ *s.error_code())
+ : fmt::format(
+ "{{tid: {}, seqno: {}, state: {}, bw: {}}}",
+ s.tid(), s.seqno(), s.state(), s.bw());
+ return formatter::format(str, ctx);
+ }
+};
+
+template <>
+struct fmt::formatter : formatter {
+ // parse is inherited from formatter.
+ template
+ auto
+ format(const cargo::shaper_message& s, FormatContext& ctx) const {
+ const auto str =
+ fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping());
return formatter::format(str, ctx);
}
};
diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp
index 5aa487c3fd0d5b4eed8e94a6be1954087837eb35..7257fb1fc86e8ee828f4f7494585f6f3d6751559 100644
--- a/src/proto/rpc/response.hpp
+++ b/src/proto/rpc/response.hpp
@@ -56,8 +56,8 @@ public:
template
constexpr void
serialize(Archive&& ar) {
- ar & m_op_id;
- ar & m_error_code;
+ ar& m_op_id;
+ ar& m_error_code;
}
private:
@@ -104,9 +104,10 @@ template
using response_with_id = response_with_value;
-template
+template
using status_response =
- response_with_value>, Error>;
+ response_with_value>,
+ Error>;
} // namespace cargo::proto
diff --git a/src/request_manager.cpp b/src/request_manager.cpp
index f39dea9237db15086596fa03c7bd5cb9da6a6415..1496c03d4ef8ccd8d438e53e5767af5c8759a074 100644
--- a/src/request_manager.cpp
+++ b/src/request_manager.cpp
@@ -57,14 +57,15 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) {
error_code
request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid,
- transfer_state s, std::optional ec) {
+ transfer_state s, float bw,
+ std::optional ec) {
abt::unique_lock lock(m_mutex);
if(const auto it = m_requests.find(tid); it != m_requests.end()) {
assert(seqno < it->second.size());
assert(wid < it->second[seqno].size());
- it->second[seqno][wid].update(s, ec);
+ it->second[seqno][wid].update(s, bw, ec);
return error_code::success;
}
@@ -92,7 +93,7 @@ request_manager::lookup(std::uint64_t tid) {
}
}
- return request_status{transfer_state::completed};
+ return request_status{transfer_state::completed, 0.0f};
}
LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid);
diff --git a/src/request_manager.hpp b/src/request_manager.hpp
index ba26cee5765708b7dceebdee2cf32e757eeb4377..1928d61b819bd1669a6b1ad547eeda93c8b3f2ec 100644
--- a/src/request_manager.hpp
+++ b/src/request_manager.hpp
@@ -60,7 +60,8 @@ public:
error_code
update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid,
- transfer_state s, std::optional ec = std::nullopt);
+ transfer_state s, float bw,
+ std::optional ec = std::nullopt);
tl::expected
lookup(std::uint64_t tid);
diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp
index 1c6bbd0e4b187bd13069bd8c677f99dbfb41de44..1e0a6ab5b64ec1a7a4fcc8aabea6fa69a000b39e 100644
--- a/src/worker/mpio_read.cpp
+++ b/src/worker/mpio_read.cpp
@@ -22,11 +22,11 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
-#include
-#include
+
#include "mpio_read.hpp"
#include "mpioxx.hpp"
#include "memory.hpp"
+#include
namespace cargo {
@@ -37,18 +37,18 @@ mpio_read::mpio_read(mpi::communicator workers,
m_output_path(std::move(output_path)) {}
cargo::error_code
-mpio_read::operator()() const {
+mpio_read::operator()() {
using posix_file::views::all_of;
using posix_file::views::as_blocks;
using posix_file::views::strided;
-
+ m_status = error_code::transfer_in_progress;
try {
const auto input_file = mpioxx::file::open(
m_workers, m_input_path, mpioxx::file_open_mode::rdonly);
mpioxx::offset file_size = input_file.size();
- std::size_t block_size = 512u;
+ std::size_t block_size = 512 * 1024u;
// create block type
MPI_Datatype block_type;
@@ -99,21 +99,20 @@ mpio_read::operator()() const {
}
// step 1. acquire buffers
- memory_buffer buffer;
- buffer.resize(blocks_per_rank * block_size);
- std::vector buffer_regions;
- buffer_regions.reserve(blocks_per_rank);
+ m_buffer.resize(blocks_per_rank * block_size);
+
+ m_buffer_regions.reserve(blocks_per_rank);
for(std::size_t i = 0; i < blocks_per_rank; ++i) {
- buffer_regions.emplace_back(buffer.data() + i * block_size,
- block_size);
+ m_buffer_regions.emplace_back(m_buffer.data() + i * block_size,
+ block_size);
}
MPI_Datatype datatype = block_type;
// step2. parallel read data into buffers
- if(const auto ec = MPI_File_read_all(input_file, buffer.data(),
+ if(const auto ec = MPI_File_read_all(input_file, m_buffer.data(),
static_cast(blocks_per_rank),
datatype, MPI_STATUS_IGNORE);
ec != MPI_SUCCESS) {
@@ -123,36 +122,106 @@ mpio_read::operator()() const {
}
// step3. POSIX write data
- const auto output_file =
- posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR);
+ m_output_file = std::make_unique(
+ posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR));
- output_file.fallocate(0, 0, file_size);
+ m_output_file->fallocate(0, 0, file_size);
+
+
+ m_workers_size = workers_size;
+ m_workers_rank = workers_rank;
+ m_block_size = block_size;
- int index = 0;
- for(const auto& file_range :
- all_of(posix_file::file{m_input_path}) | as_blocks(block_size) |
- strided(workers_size, workers_rank)) {
- assert(buffer_regions[index].size() >= file_range.size());
- output_file.pwrite(buffer_regions[index], file_range.offset(),
- file_range.size());
- ++index;
- }
} catch(const mpioxx::io_error& e) {
LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
+ m_status = make_mpi_error(e.error_code());
return make_mpi_error(e.error_code());
} catch(const posix_file::io_error& e) {
LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
+ m_status = make_system_error(e.error_code());
return make_system_error(e.error_code());
- } catch (const std::system_error& e) {
+ } catch(const std::system_error& e) {
LOGGER_ERROR("Unexpected system error: {}", e.what());
+ m_status = make_system_error(e.code().value());
return make_system_error(e.code().value());
} catch(const std::exception& e) {
LOGGER_ERROR("Unexpected exception: {}", e.what());
+ m_status = error_code::other;
return error_code::other;
}
+ m_status = error_code::transfer_in_progress;
+ return error_code::transfer_in_progress;
+}
+
+int
+mpio_read::progress(int ongoing_index) {
+
+ using posix_file::views::all_of;
+ using posix_file::views::as_blocks;
+ using posix_file::views::strided;
+ try {
+ int index = 0;
+ m_status = error_code::transfer_in_progress;
+ for(const auto& file_range :
+ all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) |
+ strided(m_workers_size, m_workers_rank)) {
+ if(index < ongoing_index) {
+ ++index;
+ continue;
+ } else {
+ if(index > ongoing_index) {
+ return index;
+ }
+ }
+
+ assert(m_buffer_regions[index].size() >= file_range.size());
+ auto start = std::chrono::steady_clock::now();
+ m_output_file->pwrite(m_buffer_regions[index], file_range.offset(),
+ file_range.size());
+ auto end = std::chrono::steady_clock::now();
+ // Send transfer bw
+ double elapsed_seconds =
+ std::chrono::duration_cast>(
+ end - start)
+ .count();
+ if((elapsed_seconds) > 0) {
+ bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds));
+ LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]",
+ m_block_size / 1024.0, elapsed_seconds, bw(),
+ sleep_value());
+ }
+ // Do sleep
+ std::this_thread::sleep_for(sleep_value());
+
+ ++index;
+ }
+ } catch(const mpioxx::io_error& e) {
+ LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
+ m_status = make_mpi_error(e.error_code());
+ return -1;
+ } catch(const posix_file::io_error& e) {
+ LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
+ m_status = make_system_error(e.error_code());
+ return -1;
+ } catch(const std::system_error& e) {
+ LOGGER_ERROR("Unexpected system error: {}", e.what());
+ m_status = make_system_error(e.code().value());
+ return -1;
+ } catch(const std::exception& e) {
+ LOGGER_ERROR("Unexpected exception: {}", e.what());
+ m_status = error_code::other;
+ return -1;
+ }
+
+ m_status = error_code::success;
+ return -1;
+}
- return error_code::success;
+// This needs to be go through different phases...
+cargo::error_code
+mpio_read::progress() const {
+ return m_status;
}
} // namespace cargo
\ No newline at end of file
diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp
index 5ce311dfb1f7efae78b8889acbf163e06505887f..d135b05bbbc82c85f6e71af60678adbe3c7c0cc5 100644
--- a/src/worker/mpio_read.hpp
+++ b/src/worker/mpio_read.hpp
@@ -26,6 +26,9 @@
#define CARGO_WORKER_MPIO_READ_HPP
#include "ops.hpp"
+#include "memory.hpp"
+#include
+#include
namespace mpi = boost::mpi;
@@ -38,12 +41,28 @@ public:
std::filesystem::path output_path);
cargo::error_code
- operator()() const final;
+ operator()() final;
+
+ cargo::error_code
+ progress() const final;
+
+ int
+ progress(int ongoing_index ) final;
private:
mpi::communicator m_workers;
std::filesystem::path m_input_path;
std::filesystem::path m_output_path;
+ cargo::error_code m_status;
+
+
+ std::unique_ptr m_output_file;
+ int m_workers_size;
+ int m_workers_rank;
+ std::size_t m_block_size;
+ memory_buffer m_buffer;
+ std::vector m_buffer_regions;
+
};
} // namespace cargo
diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp
index 5f40048faaecf7c60d3a632446c0f19bb6e04e2e..ccf81e017b6de3efd65d17faff29f524436222f0 100644
--- a/src/worker/mpio_write.cpp
+++ b/src/worker/mpio_write.cpp
@@ -22,26 +22,25 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
-#include
-#include
+
#include "mpio_write.hpp"
#include "mpioxx.hpp"
-#include "memory.hpp"
+#include
namespace cargo {
cargo::error_code
-mpio_write::operator()() const {
+mpio_write::operator()() {
using posix_file::views::all_of;
using posix_file::views::as_blocks;
using posix_file::views::strided;
-
+ m_status = error_code::transfer_in_progress;
try {
const auto workers_size = m_workers.size();
const auto workers_rank = m_workers.rank();
- std::size_t block_size = 512u;
+ std::size_t block_size = 512 * 1024u; // 512 kb
std::size_t file_size = std::filesystem::file_size(m_input_path);
// compute the number of blocks in the file
@@ -60,30 +59,80 @@ mpio_write::operator()() const {
}
// step 1. acquire buffers
- memory_buffer buffer;
- buffer.resize(blocks_per_rank * block_size);
- std::vector buffer_regions;
- buffer_regions.reserve(blocks_per_rank);
+ m_buffer.resize(blocks_per_rank * block_size);
+ m_buffer_regions.reserve(blocks_per_rank);
for(std::size_t i = 0; i < blocks_per_rank; ++i) {
- buffer_regions.emplace_back(buffer.data() + i * block_size,
- block_size);
+ m_buffer_regions.emplace_back(m_buffer.data() + i * block_size,
+ block_size);
}
- const auto input_file = posix_file::open(m_input_path, O_RDONLY);
+ m_input_file = std::make_unique(
+ posix_file::open(m_input_path, O_RDONLY));
- int index = 0;
- std::size_t bytes_per_rank = 0;
+ m_workers_size = workers_size;
+ m_workers_rank = workers_rank;
+ m_block_size = block_size;
+ m_file_size = file_size;
+ m_total_blocks = total_blocks;
- for(const auto& file_range :
- all_of(input_file) | as_blocks(block_size) |
- strided(workers_size, workers_rank)) {
+ } catch(const mpioxx::io_error& e) {
+ LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
+ m_status = make_mpi_error(e.error_code());
+ return make_mpi_error(e.error_code());
+ } catch(const posix_file::io_error& e) {
+ LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
+ m_status = make_system_error(e.error_code());
+ return make_system_error(e.error_code());
+ } catch(const std::system_error& e) {
+ LOGGER_ERROR("Unexpected system error: {}", e.what());
+ m_status = make_system_error(e.code().value());
+ return make_system_error(e.code().value());
+ } catch(const std::exception& e) {
+ LOGGER_ERROR("Unexpected exception: {}", e.what());
+ m_status = error_code::other;
+ return error_code::other;
+ }
- assert(buffer_regions[index].size() >= file_range.size());
+ return error_code::transfer_in_progress;
+}
+cargo::error_code
+mpio_write::progress() const {
+ return m_status;
+}
+
+int
+mpio_write::progress(int ongoing_index) {
+ using posix_file::views::all_of;
+ using posix_file::views::as_blocks;
+ using posix_file::views::strided;
+
+ // compute the number of blocks in the file
+
+ int index = 0;
+ if(ongoing_index == 0) {
+ m_bytes_per_rank = 0;
+ }
+ try {
+ for(const auto& file_range :
+ all_of(*m_input_file) | as_blocks(m_block_size) |
+ strided(m_workers_size, m_workers_rank)) {
+
+ if(index < ongoing_index) {
+ ++index;
+ continue;
+ } else {
+ if(index > ongoing_index) {
+ return index;
+ }
+ }
+ m_status = error_code::transfer_in_progress;
+ assert(m_buffer_regions[index].size() >= file_range.size());
+ auto start = std::chrono::steady_clock::now();
const std::size_t n =
- input_file.pread(buffer_regions[index], file_range.offset(),
- file_range.size());
+ m_input_file->pread(m_buffer_regions[index],
+ file_range.offset(), file_range.size());
LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]",
fmt::join(buffer_regions[index].begin(),
@@ -91,7 +140,22 @@ mpio_write::operator()() const {
fmt::join(buffer_regions[index].end() - 10,
buffer_regions[index].end(), ""));
- bytes_per_rank += n;
+ auto end = std::chrono::steady_clock::now();
+ // Send transfer bw
+ double elapsed_seconds =
+ std::chrono::duration_cast>(
+ end - start)
+ .count();
+ if((elapsed_seconds) > 0) {
+ bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds));
+ LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]",
+ m_block_size / 1024.0, elapsed_seconds, bw(),
+ sleep_value());
+ }
+
+ m_bytes_per_rank += n;
+ // Do sleep
+ std::this_thread::sleep_for(sleep_value());
++index;
}
@@ -103,7 +167,7 @@ mpio_write::operator()() const {
// create block type
MPI_Datatype block_type;
- MPI_Type_contiguous(static_cast(block_size), MPI_BYTE,
+ MPI_Type_contiguous(static_cast(m_block_size), MPI_BYTE,
&block_type);
MPI_Type_commit(&block_type);
@@ -115,46 +179,55 @@ mpio_write::operator()() const {
* blocklen: number of `oldtype` elements in each block
* stride: number of `oldtype` elements between start of each block
*/
- MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1,
- /* stride: */ workers_size, /* oldtype: */ block_type,
+ MPI_Type_vector(/* count: */ m_total_blocks, /* blocklength: */ 1,
+ /* stride: */ m_workers_size, /* oldtype: */ block_type,
&file_type);
MPI_Type_commit(&file_type);
if(const auto ec =
MPI_File_set_view(output_file,
- /* disp: */ workers_rank * block_size,
+ /* disp: */ m_workers_rank * m_block_size,
/* elementary_type: */ block_type,
file_type, "native", MPI_INFO_NULL);
ec != MPI_SUCCESS) {
LOGGER_ERROR("MPI_File_set_view() failed: {}",
mpi::error_string(ec));
- return make_mpi_error(ec);
+ m_status = make_mpi_error(ec);
+ return -1;
}
// step 3. parallel write data from buffers
- if(const auto ec = MPI_File_write_all(output_file, buffer.data(),
- static_cast(bytes_per_rank),
- MPI_BYTE, MPI_STATUS_IGNORE);
+ if(const auto ec =
+ MPI_File_write_all(output_file, m_buffer.data(),
+ static_cast(m_bytes_per_rank),
+ MPI_BYTE, MPI_STATUS_IGNORE);
ec != MPI_SUCCESS) {
LOGGER_ERROR("MPI_File_write_all() failed: {}",
mpi::error_string(ec));
- return make_mpi_error(ec);
+ m_status = make_mpi_error(ec);
+ return -1;
}
} catch(const mpioxx::io_error& e) {
LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
- return make_mpi_error(e.error_code());
+ m_status = make_mpi_error(e.error_code());
+ return -1;
} catch(const posix_file::io_error& e) {
LOGGER_ERROR("{}() failed: {}", e.where(), e.what());
- return make_system_error(e.error_code());
- } catch (const std::system_error& e) {
+ m_status = make_system_error(e.error_code());
+ return -1;
+ } catch(const std::system_error& e) {
LOGGER_ERROR("Unexpected system error: {}", e.what());
- return make_system_error(e.code().value());
+ m_status = make_system_error(e.code().value());
+ return -1;
} catch(const std::exception& e) {
LOGGER_ERROR("Unexpected exception: {}", e.what());
- return error_code::other;
+ m_status = error_code::other;
+ return -1;
}
- return error_code::success;
+ m_status = error_code::success;
+
+ return -1;
}
} // namespace cargo
diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp
index 69b2ced46e6c3770e599780ee100919ac41e4063..8c3d5c1cb054332ed15b26564314b11bc570e606 100644
--- a/src/worker/mpio_write.hpp
+++ b/src/worker/mpio_write.hpp
@@ -25,7 +25,10 @@
#ifndef CARGO_WORKER_MPIO_WRITE_HPP
#define CARGO_WORKER_MPIO_WRITE_HPP
+#include
+#include
#include "ops.hpp"
+#include "memory.hpp"
namespace mpi = boost::mpi;
@@ -40,12 +43,33 @@ public:
m_output_path(std::move(output_path)) {}
cargo::error_code
- operator()() const final;
+ operator()() final;
+
+ cargo::error_code
+ progress() const final;
+
+ int
+ progress(int ongoing_index) final;
+
private:
mpi::communicator m_workers;
std::filesystem::path m_input_path;
std::filesystem::path m_output_path;
+
+ cargo::error_code m_status;
+
+
+ std::unique_ptr m_input_file;
+ int m_workers_size;
+ int m_workers_rank;
+ std::size_t m_block_size;
+ std::size_t m_file_size;
+ int m_total_blocks;
+
+ memory_buffer m_buffer;
+ std::vector m_buffer_regions;
+ std::size_t m_bytes_per_rank;
};
} // namespace cargo
diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp
index 17c16c9bfe4627b04777790d1bba7f69fca6be61..288991fc10324dcd3003c7cfd408b3f3e052151f 100644
--- a/src/worker/ops.cpp
+++ b/src/worker/ops.cpp
@@ -54,4 +54,57 @@ operation::make_operation(cargo::tag t, mpi::communicator workers,
}
}
+std::chrono::milliseconds
+operation::sleep_value() const {
+ if(m_sleep_value <= 0)
+ return std::chrono::milliseconds{0};
+ else
+ return std::chrono::milliseconds{m_sleep_value * 100};
+}
+
+void
+operation::set_bw_shaping(std::int16_t incr) {
+ m_sleep_value += incr;
+}
+
+int
+operation::source() {
+ return m_rank;
+}
+std::uint64_t
+operation::tid() {
+ return m_tid;
+}
+std::uint32_t
+operation::seqno() {
+ return m_seqno;
+}
+
+cargo::tag
+operation::t() {
+ return m_t;
+}
+
+float_t
+operation::bw() {
+ return m_bw;
+}
+
+void operation::bw(float_t bw) {
+ m_bw = bw;
+}
+void
+operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno,
+ cargo::tag t) {
+ m_rank = rank;
+ m_tid = tid;
+ m_seqno = seqno;
+ m_t = t;
+}
+
+cargo::error_code
+operation::progress() const {
+ return error_code::other;
+}
+
} // namespace cargo
diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp
index b9a20cfada8a627cd08e87d5bf796e757c9b09e0..637cdc7a45a553890883d89d1cfef9503292d021 100644
--- a/src/worker/ops.hpp
+++ b/src/worker/ops.hpp
@@ -30,7 +30,6 @@
#include
#include "proto/mpi/message.hpp"
#include "cargo.hpp"
-
namespace cargo {
/**
@@ -47,7 +46,43 @@ public:
virtual ~operation() = default;
virtual cargo::error_code
- operator()() const = 0;
+ operator()() = 0;
+
+
+ std::chrono::milliseconds
+ sleep_value() const;
+ // We pass a - or + value to decrease or increase the bw shaping.
+ void
+ set_bw_shaping(std::int16_t incr);
+ virtual cargo::error_code
+ progress() const = 0;
+ virtual int
+ progress(int index) = 0;
+
+
+ int
+ source();
+ std::uint64_t
+ tid();
+ std::uint32_t
+ seqno();
+ void
+ set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, cargo::tag t);
+ cargo::tag
+ t();
+
+ float_t
+ bw();
+ void
+ bw(float_t bw);
+
+private:
+ std::int16_t m_sleep_value = 0;
+ int m_rank;
+ std::uint64_t m_tid;
+ std::uint32_t m_seqno;
+ cargo::tag m_t;
+ float m_bw;
};
} // namespace cargo
diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp
index a1cf8ed9f60707ba676981d84520acfae8810992..83d3da5289b04cfcefd51a571b1130d4250ce589 100644
--- a/src/worker/sequential.cpp
+++ b/src/worker/sequential.cpp
@@ -29,9 +29,19 @@
namespace cargo {
cargo::error_code
-seq_operation::operator()() const {
+seq_operation::operator()() {
LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
return cargo::error_code::not_implemented;
}
+cargo::error_code
+seq_operation::progress() const {
+ return error_code::success;
+}
+
+int
+seq_operation::progress(int ongoing_index) {
+ return ++ongoing_index;
+}
+
} // namespace cargo
diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp
index 82fa6196cecd80ba4f600e43b3306b34bb194545..f5d54da17ca642235d19da348c8b77f2edc47d6e 100644
--- a/src/worker/sequential.hpp
+++ b/src/worker/sequential.hpp
@@ -40,7 +40,12 @@ public:
m_output_path(std::move(output_path)) {}
cargo::error_code
- operator()() const final;
+ operator()() final;
+ cargo::error_code
+ progress() const;
+
+ int
+ progress(int ongoing_index ) final;
private:
mpi::communicator m_comm;
diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp
index 484f3b0e3142b3529ddafd9ae0141f63254ea9e8..988a926aa98e1af5da5f23b028ebef059016282c 100644
--- a/src/worker/worker.cpp
+++ b/src/worker/worker.cpp
@@ -27,9 +27,8 @@
#include
#include
#include
-#include "ops.hpp"
+
#include "worker.hpp"
-#include "proto/mpi/message.hpp"
#include "fmt_formatters.hpp"
namespace mpi = boost::mpi;
@@ -54,11 +53,11 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group,
void
update_state(int rank, std::uint64_t tid, std::uint32_t seqno,
- cargo::transfer_state st,
+ cargo::transfer_state st, float bw,
std::optional ec = std::nullopt) {
mpi::communicator world;
- const cargo::status_message m{tid, seqno, st, ec};
+ const cargo::status_message m{tid, seqno, st, bw, ec};
LOGGER_INFO("msg <= to: {} body: {{payload: {}}}", rank, m);
world.send(rank, static_cast(cargo::tag::status), m);
}
@@ -102,14 +101,45 @@ worker::run() {
LOGGER_INFO("{:=>{}}", "", greeting.size());
bool done = false;
-
while(!done) {
auto msg = world.iprobe();
if(!msg) {
// FIXME: sleep time should be configurable
- std::this_thread::sleep_for(150ms);
+
+ // Progress through all transfers
+ for(auto I = m_ops.begin(); I != m_ops.end(); I++) {
+ auto op = I->second.first.get();
+ int index = I->second.second;
+ if(op) {
+ if(op->t() == tag::pread or op->t() == tag::pwrite) {
+ index = op->progress(index);
+ if(index == -1) {
+ // operation finished
+ cargo::error_code ec = op->progress();
+ update_state(op->source(), op->tid(), op->seqno(),
+ ec ? transfer_state::failed
+ : transfer_state::completed,
+ 0.0f, ec);
+
+ // Transfer finished
+ I = m_ops.erase(I);
+ if(I == m_ops.end()) {
+ break;
+ }
+ } else {
+ update_state(op->source(), op->tid(), op->seqno(),
+ transfer_state::running, op->bw());
+ I->second.second = index;
+ }
+ }
+ }
+ }
+
+ if(m_ops.size() == 0) {
+ std::this_thread::sleep_for(150ms);
+ }
continue;
}
@@ -122,19 +152,47 @@ worker::run() {
transfer_message m;
world.recv(msg->source(), msg->tag(), m);
LOGGER_INFO("msg => from: {} body: {}", msg->source(), m);
+ m_ops.emplace(std::make_pair(
+ make_pair(m.input_path(), m.output_path()),
+ make_pair(operation::make_operation(t, workers,
+ m.input_path(),
+ m.output_path()),
+ 0)));
+
+ const auto op =
+ m_ops[make_pair(m.input_path(), m.output_path())]
+ .first.get();
- const auto op = operation::make_operation(
- t, workers, m.input_path(), m.output_path());
+ op->set_comm(msg->source(), m.tid(), m.seqno(), t);
- update_state(msg->source(), m.tid(), m.seqno(),
- transfer_state::running);
+ update_state(op->source(), op->tid(), op->seqno(),
+ transfer_state::running, -1.0f);
+ // Different scenarios read -> write | write -> read
cargo::error_code ec = (*op)();
+ if(ec != cargo::error_code::transfer_in_progress) {
+ update_state(op->source(), op->tid(), op->seqno(),
+ transfer_state::failed, -1.0f, ec);
+ m_ops.erase(make_pair(m.input_path(), m.output_path()));
+ }
+ break;
+ }
+
+ case tag::bw_shaping: {
+ shaper_message m;
+ world.recv(msg->source(), msg->tag(), m);
+ LOGGER_INFO("msg => from: {} body: {}", msg->source(), m);
+ for(auto I = m_ops.begin(); I != m_ops.end(); I++) {
+ const auto op = I->second.first.get();
+ if(op) {
+
+ op->set_bw_shaping(0);
+ } else {
+ LOGGER_INFO("Operation non existent", msg->source(), m);
+ }
+ }
+
- update_state(msg->source(), m.tid(), m.seqno(),
- ec ? transfer_state::failed
- : transfer_state::completed,
- ec);
break;
}
diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp
index aaeea068e40b0ef7812bb9d9d4dda264262b8257..1890974bd11aae2bd92fe63279df800963e31509 100644
--- a/src/worker/worker.hpp
+++ b/src/worker/worker.hpp
@@ -26,6 +26,9 @@
#ifndef CARGO_WORKER_HPP
#define CARGO_WORKER_HPP
+#include "proto/mpi/message.hpp"
+#include