From 577a01a17d3c432566fa97b7e289e7816bdb41d2 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 18 Dec 2023 12:42:13 +0100 Subject: [PATCH 1/5] Add statuses (WIP) --- lib/cargo.hpp | 24 ++++++++++++++++ lib/libcargo.cpp | 58 +++++++++++++++++++++++++++++++++++++- src/proto/rpc/response.hpp | 5 ++++ 3 files changed, 86 insertions(+), 1 deletion(-) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 1c62d1c..62a3e2f 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -125,6 +125,15 @@ public: status() const; + /** + * @brief Get all the statuses of the associated transfer. + * + * @return std::vector + */ + [[nodiscard]] std::vector + statuses() const; + + /** * @brief updates the bw control of the transfer * @@ -164,10 +173,22 @@ class transfer_status { friend transfer_status transfer::status() const; + transfer_status(transfer_state status, float bw, error_code error) noexcept; public: + + transfer_status(std::string name, transfer_state status, float bw, error_code error) noexcept; + + /** + * Get the name of the associated dataset. + * + * @return The name of the dataset. + */ + [[nodiscard]] std::string + name() const noexcept; + /** * Get the current status of the associated transfer. * @@ -207,11 +228,14 @@ public: bw() const; private: + std::string m_name; transfer_state m_state; float m_bw; error_code m_error; }; + + /** * Request the transfer of a dataset collection. * diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 7968eda..d11f02f 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -133,6 +133,53 @@ transfer::status() const { throw std::runtime_error("rpc lookup failed"); } +std::vector +transfer::statuses() const { + using proto::statuses_response; + + network::client rpc_client{m_srv.protocol()}; + const auto rpc = + network::rpc_info::create("transfer_statuses", m_srv.address()); + + using response_type = + statuses_response; + + if(const auto lookup_rv = rpc_client.lookup(m_srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{tid: {}}}", rpc, m_id); + + if(const auto call_rv = endp.call(rpc.name(), m_id); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + const auto& v = resp.value(); + + LOGGER_EVAL(resp.error_code(), ERROR, INFO, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); + + if(resp.error_code()) { + throw std::runtime_error( + fmt::format("rpc call failed: {}", resp.error_code())); + } + // convert vector of tuples to vector of transfer_status + // (for some reason it asks for a public constructor) + + std::vector v_statuses; + for(const auto& [name, s, bw, ec] : v) { + v_statuses.emplace_back(transfer_status{ + name, s, bw, ec.value_or(error_code::success)}); + } + + return v_statuses; + } + } + + throw std::runtime_error("rpc lookup failed"); +} + void transfer::bw_control(std::int16_t bw_control) const { @@ -170,13 +217,22 @@ transfer::bw_control(std::int16_t bw_control) const { transfer_status::transfer_status(transfer_state status, float bw, error_code error) noexcept - : m_state(status), m_bw(bw), m_error(error) {} + : m_name(""), m_state(status), m_bw(bw), m_error(error) {} + +transfer_status::transfer_status(std::string name, transfer_state status, + float bw, error_code error) noexcept + : m_name(name), m_state(status), m_bw(bw), m_error(error) {} transfer_state transfer_status::state() const noexcept { return m_state; } +std::string +transfer_status::name() const noexcept { + return m_name; +} + bool transfer_status::done() const noexcept { return m_state == transfer_state::completed; diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp index 7257fb1..6313344 100644 --- a/src/proto/rpc/response.hpp +++ b/src/proto/rpc/response.hpp @@ -109,6 +109,11 @@ using status_response = response_with_value>, Error>; +template +using statuses_response = + response_with_value > >, + Error>; + } // namespace cargo::proto #endif // CARGO_PROTO_RPC_RESPONSE_HPP -- GitLab From e3993fd540630edd60867722c8454c047f498b28 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 19 Dec 2023 14:30:55 +0100 Subject: [PATCH 2/5] update transfer status for list files --- src/master.cpp | 52 +++++++++++++++++++++++++++++++++++++- src/master.hpp | 3 +++ src/parallel_request.cpp | 18 ++++++++++--- src/parallel_request.hpp | 12 +++++++-- src/proto/mpi/message.hpp | 11 ++++++-- src/proto/rpc/response.hpp | 5 ++-- src/request_manager.cpp | 30 +++++++++++++++++++--- src/request_manager.hpp | 5 +++- src/worker/ops.cpp | 12 +++++++++ src/worker/ops.hpp | 9 +++++++ src/worker/seq_mixed.cpp | 13 +++++----- src/worker/sequential.cpp | 1 - src/worker/worker.cpp | 14 +++++----- 13 files changed, 154 insertions(+), 31 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index 594f6ac..882229b 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -96,6 +96,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); provider::define(EXPAND(bw_control)); + provider::define(EXPAND(transfer_statuses)); #undef EXPAND @@ -135,7 +136,8 @@ master_server::mpi_listener_ult() { msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, - m.state(), m.bw(), m.error_code()); + m.name(), m.state(), m.bw(), + m.error_code()); break; } @@ -367,4 +369,52 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { }); } + +void +master_server::transfer_statuses(const network::request& req, + std::uint64_t tid) { + + using network::get_address; + using network::rpc_info; + using proto::generic_response; + using proto::statuses_response; + + using response_type = statuses_response; + + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); + // Get all the statuses of the associated transfer. returns a vector + // of transfer_status objects + + m_request_manager.lookup_all(tid) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Failed to lookup request: {}", ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + req.respond(generic_response{rpc.id(), ec}); + }) + .map([&](auto&& rs) { + // We get a vector of request_status objects, we need to + // convert them to a vector of tuples with the same + // informations + std::vector>> + v{}; + for(auto& r : rs) { + v.push_back(std::make_tuple(r.name(), r.state(), r.bw(), + r.error().value())); + LOGGER_INFO( + "rpc {:<} body: {{retval: {}, name: {}, status: {}}}", + rpc, error_code::success, r.name(), r.state()); + } + // Generate a response type with the vector of tuples and + // respond + + + req.respond(response_type{rpc.id(), error_code::success, v}); + }); +} + } // namespace cargo diff --git a/src/master.hpp b/src/master.hpp index 0adfcb7..f671e3a 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -58,6 +58,9 @@ private: void transfer_status(const network::request& req, std::uint64_t tid); + void + transfer_statuses(const network::request& req, std::uint64_t tid); + // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp index 5763198..afc7d52 100644 --- a/src/parallel_request.cpp +++ b/src/parallel_request.cpp @@ -47,17 +47,22 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) - : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} + : m_name(s.name()), m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} -request_status::request_status(transfer_state s, float bw, +request_status::request_status(std::string name, transfer_state s, float bw, std::optional ec) - : m_state(s), m_bw(bw), m_error_code(ec) {} + : m_name(name), m_state(s), m_bw(bw), m_error_code(ec) {} transfer_state request_status::state() const { return m_state; } +std::string +request_status::name() const { + return m_name; +} + std::optional request_status::error() const { return m_error_code; @@ -67,6 +72,10 @@ float request_status::bw() const { return m_bw; } +std::string +part_status::name() const { + return m_name; +} transfer_state part_status::state() const { @@ -84,8 +93,9 @@ part_status::error() const { } void -part_status::update(transfer_state s, float bw, +part_status::update(std::string name, transfer_state s, float bw, std::optional ec) noexcept { + m_name = std::move(name); m_state = s; m_bw = bw; m_error_code = ec; diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp index 222cd57..6868e3f 100644 --- a/src/parallel_request.hpp +++ b/src/parallel_request.hpp @@ -65,6 +65,9 @@ class part_status { public: part_status() = default; + [[nodiscard]] std::string + name() const; + [[nodiscard]] transfer_state state() const; @@ -75,9 +78,10 @@ public: bw() const; void - update(transfer_state s, float bw, std::optional ec) noexcept; + update(std::string name, transfer_state s, float bw, std::optional ec) noexcept; private: + std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional m_error_code{}; @@ -86,10 +90,13 @@ private: class request_status { public: request_status() = default; - explicit request_status(transfer_state s, float bw, + explicit request_status(std::string name, transfer_state s, float bw, std::optional ec = {}); explicit request_status(part_status s); + [[nodiscard]] std::string + name() const; + [[nodiscard]] transfer_state state() const; @@ -100,6 +107,7 @@ public: bw() const; private: + std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional m_error_code{}; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index a6b1d2c..b323bc4 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -120,10 +120,10 @@ class status_message { public: status_message() = default; - status_message(std::uint64_t tid, std::uint32_t seqno, + status_message(std::uint64_t tid, std::uint32_t seqno, std::string name, cargo::transfer_state state, float bw, std::optional error_code = std::nullopt) - : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), + : m_tid(tid), m_seqno(seqno), m_name(name), m_state(state), m_bw(bw), m_error_code(error_code) {} [[nodiscard]] std::uint64_t @@ -136,6 +136,11 @@ public: return m_seqno; } + [[nodiscard]] const std::string& + name() const { + return m_name; + } + [[nodiscard]] cargo::transfer_state state() const { return m_state; @@ -160,6 +165,7 @@ private: ar& m_tid; ar& m_seqno; + ar& m_name; ar& m_state; ar& m_bw; ar& m_error_code; @@ -167,6 +173,7 @@ private: std::uint64_t m_tid{}; std::uint32_t m_seqno{}; + std::string m_name{}; cargo::transfer_state m_state{}; float m_bw{}; std::optional m_error_code{}; diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp index 6313344..4dd6f72 100644 --- a/src/proto/rpc/response.hpp +++ b/src/proto/rpc/response.hpp @@ -110,9 +110,8 @@ using status_response = Error>; template -using statuses_response = - response_with_value > >, - Error>; +using statuses_response = response_with_value< + std::vector>>, Error>; } // namespace cargo::proto diff --git a/src/request_manager.cpp b/src/request_manager.cpp index 1496c03..e4843ec 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -56,7 +56,7 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { } error_code -request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, +request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, std::optional ec) { @@ -65,7 +65,7 @@ request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, if(const auto it = m_requests.find(tid); it != m_requests.end()) { assert(seqno < it->second.size()); assert(wid < it->second[seqno].size()); - it->second[seqno][wid].update(s, bw, ec); + it->second[seqno][wid].update(name, s, bw, ec); return error_code::success; } @@ -92,8 +92,32 @@ request_manager::lookup(std::uint64_t tid) { return request_status{ps}; } } + // TODO : completed should have the name of the file if its not found + return request_status{"", transfer_state::completed, 0.0f}; + } + + LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); + return tl::make_unexpected(error_code::no_such_transfer); +} - return request_status{transfer_state::completed, 0.0f}; +tl::expected, error_code> +request_manager::lookup_all(std::uint64_t tid) { + + abt::shared_lock lock(m_mutex); + + std::vector result; + if(const auto it = m_requests.find(tid); it != m_requests.end()) { + + const auto& file_statuses = it->second; + + for(const auto& fs : file_statuses) { + for(const auto& ps : fs) { + + request_status rs{ps}; + result.push_back(rs); + } + } + return result; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); diff --git a/src/request_manager.hpp b/src/request_manager.hpp index 1928d61..46bedda 100644 --- a/src/request_manager.hpp +++ b/src/request_manager.hpp @@ -60,12 +60,15 @@ public: error_code update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, - transfer_state s, float bw, + std::string name, transfer_state s, float bw, std::optional ec = std::nullopt); tl::expected lookup(std::uint64_t tid); + tl::expected, error_code> + lookup_all(std::uint64_t tid); + error_code remove(std::uint64_t tid); diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 0d72c18..30d1f1f 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -39,6 +39,7 @@ operation::make_operation(cargo::tag t, mpi::communicator workers, std::uint64_t block_size, FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) { using cargo::tag; + switch(t) { case tag::pread: return std::make_unique( @@ -115,4 +116,15 @@ operation::progress() const { return error_code::other; } + +std::string +operation::output_path() { + return m_output_path; +} + +std::string +operation::input_path() { + return m_input_path; +} + } // namespace cargo diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index ba3c156..32b0bd5 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -78,6 +78,13 @@ public: void bw(float_t bw); + std::string + output_path(); + + std::string + input_path(); + + private: std::int16_t m_sleep_value = 0; int m_rank; @@ -85,6 +92,8 @@ private: std::uint32_t m_seqno; cargo::tag m_t; float m_bw; + std::filesystem::path m_input_path; + std::filesystem::path m_output_path; }; } // namespace cargo diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index b6dec2f..b087122 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -69,10 +69,10 @@ seq_mixed_operation::operator()() { m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - m_output_file = std::make_unique(posix_file::create( - m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); - m_output_file->fallocate(0, 0, file_size); + m_output_file->fallocate(0, 0, file_size); m_workers_size = workers_size; m_workers_rank = workers_rank; @@ -141,9 +141,9 @@ seq_mixed_operation::progress(int ongoing_index) { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); -/* Do write */ - m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), - file_range.size()); + /* Do write */ + m_output_file->pwrite(m_buffer_regions[index], + file_range.offset(), file_range.size()); m_bytes_per_rank += n; @@ -184,5 +184,4 @@ seq_mixed_operation::progress(int ongoing_index) { return -1; } - } // namespace cargo diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 4254d2c..b91d865 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -239,5 +239,4 @@ seq_operation::progress(int ongoing_index) { return -1; } - } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index a680c84..0178b96 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -53,11 +53,11 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, void update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state st, float bw, + std::string name, cargo::transfer_state st, float bw, std::optional ec = std::nullopt) { mpi::communicator world; - const cargo::status_message m{tid, seqno, st, bw, ec}; + const cargo::status_message m{tid, seqno, name, st, bw, ec}; LOGGER_DEBUG("msg <= to: {} body: {{payload: {}}}", rank, m); world.send(rank, static_cast(cargo::tag::status), m); } @@ -119,7 +119,7 @@ worker::run() { if(index == -1) { // operation finished cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), ec ? transfer_state::failed : transfer_state::completed, 0.0f, ec); @@ -129,7 +129,7 @@ worker::run() { } else { // update only if BW is set if(op->bw() > 0.0f) { - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), transfer_state::running, op->bw()); } I->second.second = index; @@ -144,7 +144,7 @@ worker::run() { if(!msg) { // Only wait if there are no pending operations and no messages if(m_ops.size() == 0) { - std::this_thread::sleep_for(150ms); + std::this_thread::sleep_for(10ms); } continue; } @@ -174,13 +174,13 @@ worker::run() { op->set_comm(msg->source(), m.tid(), m.seqno(), t); - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), transfer_state::running, -1.0f); // Different scenarios read -> write | write -> read cargo::error_code ec = (*op)(); if(ec != cargo::error_code::transfer_in_progress) { - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), transfer_state::failed, -1.0f, ec); m_ops.erase(make_pair(m.input_path(), m.output_path())); } -- GitLab From f7c4db338566b5a4718045824b44981e703977b5 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 20 Dec 2023 14:33:45 +0100 Subject: [PATCH 3/5] Implemented statuses (completed/running/failed) has the output filename BW shows mean BW for the file. --- src/master.cpp | 3 +-- src/parallel_request.cpp | 11 +++++++++-- src/parallel_request.hpp | 5 ++++- src/proto/mpi/message.hpp | 8 ++++---- src/request_manager.cpp | 15 +++++++++++---- src/worker/mpio_read.hpp | 15 ++++++++++++--- src/worker/mpio_write.hpp | 16 ++++++++++++---- src/worker/ops.cpp | 11 ----------- src/worker/ops.hpp | 10 ++++------ src/worker/seq_mixed.hpp | 23 +++++++++++++++++------ src/worker/sequential.hpp | 15 ++++++++++++--- 11 files changed, 86 insertions(+), 46 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index 882229b..e8321f1 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -309,7 +309,6 @@ master_server::transfer_datasets(const network::request& req, .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); - // For all the files for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; @@ -404,7 +403,7 @@ master_server::transfer_statuses(const network::request& req, v{}; for(auto& r : rs) { v.push_back(std::make_tuple(r.name(), r.state(), r.bw(), - r.error().value())); + r.error())); LOGGER_INFO( "rpc {:<} body: {{retval: {}, name: {}, status: {}}}", rpc, error_code::success, r.name(), r.state()); diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp index afc7d52..bd62cec 100644 --- a/src/parallel_request.cpp +++ b/src/parallel_request.cpp @@ -47,7 +47,8 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) - : m_name(s.name()), m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} + : m_name(s.name()), m_state(s.state()), m_bw(s.bw()), + m_error_code(s.error()) {} request_status::request_status(std::string name, transfer_state s, float bw, std::optional ec) @@ -72,6 +73,12 @@ float request_status::bw() const { return m_bw; } + +void +request_status::bw(float bw) { + m_bw = bw; +} + std::string part_status::name() const { return m_name; @@ -95,7 +102,7 @@ part_status::error() const { void part_status::update(std::string name, transfer_state s, float bw, std::optional ec) noexcept { - m_name = std::move(name); + m_name = name; m_state = s; m_bw = bw; m_error_code = ec; diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp index 6868e3f..41f271f 100644 --- a/src/parallel_request.hpp +++ b/src/parallel_request.hpp @@ -29,7 +29,7 @@ #include #include #include - +#include "../lib/cargo.hpp" namespace cargo { class dataset; @@ -106,6 +106,9 @@ public: [[nodiscard]] float bw() const; + void + bw (float bw); + private: std::string m_name; transfer_state m_state{transfer_state::pending}; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index b323bc4..b61705a 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -253,13 +253,13 @@ struct fmt::formatter : formatter { const auto str = s.error_code() ? fmt::format( - "{{tid: {}, seqno: {}, state: {}, bw: {}, " + "{{tid: {}, seqno: {}, name: {}, state: {}, bw: {}, " "error_code: {}}}", - s.tid(), s.seqno(), s.state(), s.bw(), + s.tid(), s.seqno(), s.name(), s.state(), s.bw(), *s.error_code()) : fmt::format( - "{{tid: {}, seqno: {}, state: {}, bw: {}}}", - s.tid(), s.seqno(), s.state(), s.bw()); + "{{tid: {}, seqno: {}, name: {}, state: {}, bw: {}}}", + s.tid(), s.seqno(), s.name(), s.state(), s.bw()); return formatter::format(str, ctx); } }; diff --git a/src/request_manager.cpp b/src/request_manager.cpp index e4843ec..567446d 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -109,13 +109,20 @@ request_manager::lookup_all(std::uint64_t tid) { if(const auto it = m_requests.find(tid); it != m_requests.end()) { const auto& file_statuses = it->second; - + // we calculate always the mean of the BW for(const auto& fs : file_statuses) { + float bw = 0; + request_status rs(*fs.begin()); for(const auto& ps : fs) { - - request_status rs{ps}; - result.push_back(rs); + bw += ps.bw(); + if(ps.state() == transfer_state::completed) { + continue; + } + // not finished + rs = request_status{ps}; } + rs.bw(bw/(double)fs.size()); + result.push_back(rs); } return result; } diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index bd72a6e..10984c6 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -50,12 +50,21 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; cargo::error_code m_status; - + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; std::unique_ptr m_output_file; int m_workers_size; int m_workers_rank; diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index eb50129..0edf735 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -54,14 +54,22 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; - cargo::error_code m_status; - + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; std::unique_ptr m_input_file; int m_workers_size; diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 30d1f1f..9a3cdf4 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -116,15 +116,4 @@ operation::progress() const { return error_code::other; } - -std::string -operation::output_path() { - return m_output_path; -} - -std::string -operation::input_path() { - return m_input_path; -} - } // namespace cargo diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 32b0bd5..2acd884 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -78,11 +78,11 @@ public: void bw(float_t bw); - std::string - output_path(); + virtual std::string + output_path() const = 0; - std::string - input_path(); + virtual std::string + input_path() const = 0; private: @@ -92,8 +92,6 @@ private: std::uint32_t m_seqno; cargo::tag m_t; float m_bw; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; }; } // namespace cargo diff --git a/src/worker/seq_mixed.hpp b/src/worker/seq_mixed.hpp index c3545cc..45b3ba6 100644 --- a/src/worker/seq_mixed.hpp +++ b/src/worker/seq_mixed.hpp @@ -38,9 +38,11 @@ namespace cargo { class seq_mixed_operation : public operation { public: - seq_mixed_operation(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, - FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + seq_mixed_operation(mpi::communicator workers, + std::filesystem::path input_path, + std::filesystem::path output_path, + std::uint64_t block_size, FSPlugin::type fs_i_type, + FSPlugin::type fs_o_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), @@ -54,11 +56,20 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; - + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; std::unique_ptr m_input_file; std::unique_ptr m_output_file; int m_workers_size; diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 508c197..8073ceb 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -54,13 +54,22 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; - std::unique_ptr m_input_file; std::unique_ptr m_output_file; + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; int m_workers_size; int m_workers_rank; std::size_t m_block_size; -- GitLab From ba7759b4b2123577d49fc2c98a7930aaee9ac3d8 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 27 Dec 2023 19:23:10 +0100 Subject: [PATCH 4/5] Readme changes for input/output --- README.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 6881db1..c43948a 100644 --- a/README.md +++ b/README.md @@ -185,9 +185,13 @@ There are a few utility command line programs that can be used to interact with cli/ccp --server ofi+tcp://127.0.0.1:62000 --input /directory/subdir --output /directorydst/subdirdst --if --of ``` `--input` and `--output` are required arguments, and can be a directory or a file path. -`--if` and `--of`select the specific transfer method, on V0.3.2 there are only to possibilities: +`--if` and `--of`select the specific transfer method, on V0.4.0 there are many combinations: -`--if mpio` (It will read in parallel from i.e. lustre using MPI, and write using posix calls.) -`--of mpio` (It will read using posix calls, and write using MPI (i.e. to lustre)) +`--if or --of` can be: posix, gekkofs, hercules, dataclay, expand and parallel (for MPIIO requests, but only one side is allowed). + +Typically you should use posix or parallel and then one specialized adhocfs. Posix is also able to be used with LD_PRELOAD, however +higher performance and flexibility can be obtained using the specific configuration. + +On the other hand, MPIIO (parallel) uses normally file locking so there is a performance imapact, and posix is faster (we supose no external modifications are done). Other commands are `ping`, `shutdown` and `shaping` (for bw control). \ No newline at end of file -- GitLab From 4c8cb068c0993da0eb7d49b6071ec1fdc5908d49 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 9 Jan 2024 13:31:09 +0100 Subject: [PATCH 5/5] Prepare Release 0.3.4 --- CMakeLists.txt | 2 +- spack/packages/cargo/package.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f3bc4b..8a1abdf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.3.3 + VERSION 0.3.4 LANGUAGES C CXX ) diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index ae1e451..bf5a4a9 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -39,7 +39,7 @@ class Cargo(CMakePackage): version("0.3.1", sha256="613485354e24c4b97cb6d045657569f94dc1d9bbb391b5a166f8d18b3595428b") version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960") version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd") - version("0.3.4", branch="rnou/fallocate") + # build variants variant('build_type', default='Release', -- GitLab