diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f3bc4b74883a946130aa14e94a20dff1077f039..8a1abdf130fa04c5465a2b263221e56a8f2d38a4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.3.3 + VERSION 0.3.4 LANGUAGES C CXX ) diff --git a/README.md b/README.md index 6881db1f92fb209c01eab531fd9bdf23da201395..c43948aa6d1df5809ec4ca35cc1842c357e594a1 100644 --- a/README.md +++ b/README.md @@ -185,9 +185,13 @@ There are a few utility command line programs that can be used to interact with cli/ccp --server ofi+tcp://127.0.0.1:62000 --input /directory/subdir --output /directorydst/subdirdst --if --of ``` `--input` and `--output` are required arguments, and can be a directory or a file path. -`--if` and `--of`select the specific transfer method, on V0.3.2 there are only to possibilities: +`--if` and `--of`select the specific transfer method, on V0.4.0 there are many combinations: -`--if mpio` (It will read in parallel from i.e. lustre using MPI, and write using posix calls.) -`--of mpio` (It will read using posix calls, and write using MPI (i.e. to lustre)) +`--if or --of` can be: posix, gekkofs, hercules, dataclay, expand and parallel (for MPIIO requests, but only one side is allowed). + +Typically you should use posix or parallel and then one specialized adhocfs. Posix is also able to be used with LD_PRELOAD, however +higher performance and flexibility can be obtained using the specific configuration. + +On the other hand, MPIIO (parallel) uses normally file locking so there is a performance imapact, and posix is faster (we supose no external modifications are done). Other commands are `ping`, `shutdown` and `shaping` (for bw control). \ No newline at end of file diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 1c62d1c493eba638df118b8948bc621f64240558..62a3e2f2bd3bfacb98bf0c7974e8935d3b04896f 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -125,6 +125,15 @@ public: status() const; + /** + * @brief Get all the statuses of the associated transfer. + * + * @return std::vector + */ + [[nodiscard]] std::vector + statuses() const; + + /** * @brief updates the bw control of the transfer * @@ -164,10 +173,22 @@ class transfer_status { friend transfer_status transfer::status() const; + transfer_status(transfer_state status, float bw, error_code error) noexcept; public: + + transfer_status(std::string name, transfer_state status, float bw, error_code error) noexcept; + + /** + * Get the name of the associated dataset. + * + * @return The name of the dataset. + */ + [[nodiscard]] std::string + name() const noexcept; + /** * Get the current status of the associated transfer. * @@ -207,11 +228,14 @@ public: bw() const; private: + std::string m_name; transfer_state m_state; float m_bw; error_code m_error; }; + + /** * Request the transfer of a dataset collection. * diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 7968edaf55d5e4b098f6c94fb2f6724ac72868a3..d11f02f23c0d7dd20352640e3def81826592fedf 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -133,6 +133,53 @@ transfer::status() const { throw std::runtime_error("rpc lookup failed"); } +std::vector +transfer::statuses() const { + using proto::statuses_response; + + network::client rpc_client{m_srv.protocol()}; + const auto rpc = + network::rpc_info::create("transfer_statuses", m_srv.address()); + + using response_type = + statuses_response; + + if(const auto lookup_rv = rpc_client.lookup(m_srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{tid: {}}}", rpc, m_id); + + if(const auto call_rv = endp.call(rpc.name(), m_id); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + const auto& v = resp.value(); + + LOGGER_EVAL(resp.error_code(), ERROR, INFO, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); + + if(resp.error_code()) { + throw std::runtime_error( + fmt::format("rpc call failed: {}", resp.error_code())); + } + // convert vector of tuples to vector of transfer_status + // (for some reason it asks for a public constructor) + + std::vector v_statuses; + for(const auto& [name, s, bw, ec] : v) { + v_statuses.emplace_back(transfer_status{ + name, s, bw, ec.value_or(error_code::success)}); + } + + return v_statuses; + } + } + + throw std::runtime_error("rpc lookup failed"); +} + void transfer::bw_control(std::int16_t bw_control) const { @@ -170,13 +217,22 @@ transfer::bw_control(std::int16_t bw_control) const { transfer_status::transfer_status(transfer_state status, float bw, error_code error) noexcept - : m_state(status), m_bw(bw), m_error(error) {} + : m_name(""), m_state(status), m_bw(bw), m_error(error) {} + +transfer_status::transfer_status(std::string name, transfer_state status, + float bw, error_code error) noexcept + : m_name(name), m_state(status), m_bw(bw), m_error(error) {} transfer_state transfer_status::state() const noexcept { return m_state; } +std::string +transfer_status::name() const noexcept { + return m_name; +} + bool transfer_status::done() const noexcept { return m_state == transfer_state::completed; diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index ae1e451bd9af6a7cd784c6eea6792f979c06ef5f..bf5a4a934407082fea663c6e4b04da2bd523c352 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -39,7 +39,7 @@ class Cargo(CMakePackage): version("0.3.1", sha256="613485354e24c4b97cb6d045657569f94dc1d9bbb391b5a166f8d18b3595428b") version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960") version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd") - version("0.3.4", branch="rnou/fallocate") + # build variants variant('build_type', default='Release', diff --git a/src/master.cpp b/src/master.cpp index 594f6ac4bebf05de3b2c645ec823931853befaea..e8321f173ce977db449609d00c4e5cb0c1dc028d 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -96,6 +96,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); provider::define(EXPAND(bw_control)); + provider::define(EXPAND(transfer_statuses)); #undef EXPAND @@ -135,7 +136,8 @@ master_server::mpi_listener_ult() { msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, - m.state(), m.bw(), m.error_code()); + m.name(), m.state(), m.bw(), + m.error_code()); break; } @@ -307,7 +309,6 @@ master_server::transfer_datasets(const network::request& req, .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); - // For all the files for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; @@ -367,4 +368,52 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { }); } + +void +master_server::transfer_statuses(const network::request& req, + std::uint64_t tid) { + + using network::get_address; + using network::rpc_info; + using proto::generic_response; + using proto::statuses_response; + + using response_type = statuses_response; + + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); + // Get all the statuses of the associated transfer. returns a vector + // of transfer_status objects + + m_request_manager.lookup_all(tid) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Failed to lookup request: {}", ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + req.respond(generic_response{rpc.id(), ec}); + }) + .map([&](auto&& rs) { + // We get a vector of request_status objects, we need to + // convert them to a vector of tuples with the same + // informations + std::vector>> + v{}; + for(auto& r : rs) { + v.push_back(std::make_tuple(r.name(), r.state(), r.bw(), + r.error())); + LOGGER_INFO( + "rpc {:<} body: {{retval: {}, name: {}, status: {}}}", + rpc, error_code::success, r.name(), r.state()); + } + // Generate a response type with the vector of tuples and + // respond + + + req.respond(response_type{rpc.id(), error_code::success, v}); + }); +} + } // namespace cargo diff --git a/src/master.hpp b/src/master.hpp index 0adfcb73550b646d3c540df75453eb3e533bf049..f671e3a67f95d37ae07991f682e15744838e6996 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -58,6 +58,9 @@ private: void transfer_status(const network::request& req, std::uint64_t tid); + void + transfer_statuses(const network::request& req, std::uint64_t tid); + // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp index 57631988ad9f60b28464cf3792f7331c9dfb4a30..bd62cecd8f66052800d75397c881c079cc78f465 100644 --- a/src/parallel_request.cpp +++ b/src/parallel_request.cpp @@ -47,17 +47,23 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) - : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} + : m_name(s.name()), m_state(s.state()), m_bw(s.bw()), + m_error_code(s.error()) {} -request_status::request_status(transfer_state s, float bw, +request_status::request_status(std::string name, transfer_state s, float bw, std::optional ec) - : m_state(s), m_bw(bw), m_error_code(ec) {} + : m_name(name), m_state(s), m_bw(bw), m_error_code(ec) {} transfer_state request_status::state() const { return m_state; } +std::string +request_status::name() const { + return m_name; +} + std::optional request_status::error() const { return m_error_code; @@ -68,6 +74,16 @@ request_status::bw() const { return m_bw; } +void +request_status::bw(float bw) { + m_bw = bw; +} + +std::string +part_status::name() const { + return m_name; +} + transfer_state part_status::state() const { return m_state; @@ -84,8 +100,9 @@ part_status::error() const { } void -part_status::update(transfer_state s, float bw, +part_status::update(std::string name, transfer_state s, float bw, std::optional ec) noexcept { + m_name = name; m_state = s; m_bw = bw; m_error_code = ec; diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp index 222cd57de1927360d6e1e9f4eea7bef1fe4d94a5..41f271faebdf48809322c9bc289a0692aa68be83 100644 --- a/src/parallel_request.hpp +++ b/src/parallel_request.hpp @@ -29,7 +29,7 @@ #include #include #include - +#include "../lib/cargo.hpp" namespace cargo { class dataset; @@ -65,6 +65,9 @@ class part_status { public: part_status() = default; + [[nodiscard]] std::string + name() const; + [[nodiscard]] transfer_state state() const; @@ -75,9 +78,10 @@ public: bw() const; void - update(transfer_state s, float bw, std::optional ec) noexcept; + update(std::string name, transfer_state s, float bw, std::optional ec) noexcept; private: + std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional m_error_code{}; @@ -86,10 +90,13 @@ private: class request_status { public: request_status() = default; - explicit request_status(transfer_state s, float bw, + explicit request_status(std::string name, transfer_state s, float bw, std::optional ec = {}); explicit request_status(part_status s); + [[nodiscard]] std::string + name() const; + [[nodiscard]] transfer_state state() const; @@ -99,7 +106,11 @@ public: [[nodiscard]] float bw() const; + void + bw (float bw); + private: + std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional m_error_code{}; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index a6b1d2c9ec1000d48206517806d52ba785159683..b61705a3af9e7427e34995395117cec8d7e8a30d 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -120,10 +120,10 @@ class status_message { public: status_message() = default; - status_message(std::uint64_t tid, std::uint32_t seqno, + status_message(std::uint64_t tid, std::uint32_t seqno, std::string name, cargo::transfer_state state, float bw, std::optional error_code = std::nullopt) - : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), + : m_tid(tid), m_seqno(seqno), m_name(name), m_state(state), m_bw(bw), m_error_code(error_code) {} [[nodiscard]] std::uint64_t @@ -136,6 +136,11 @@ public: return m_seqno; } + [[nodiscard]] const std::string& + name() const { + return m_name; + } + [[nodiscard]] cargo::transfer_state state() const { return m_state; @@ -160,6 +165,7 @@ private: ar& m_tid; ar& m_seqno; + ar& m_name; ar& m_state; ar& m_bw; ar& m_error_code; @@ -167,6 +173,7 @@ private: std::uint64_t m_tid{}; std::uint32_t m_seqno{}; + std::string m_name{}; cargo::transfer_state m_state{}; float m_bw{}; std::optional m_error_code{}; @@ -246,13 +253,13 @@ struct fmt::formatter : formatter { const auto str = s.error_code() ? fmt::format( - "{{tid: {}, seqno: {}, state: {}, bw: {}, " + "{{tid: {}, seqno: {}, name: {}, state: {}, bw: {}, " "error_code: {}}}", - s.tid(), s.seqno(), s.state(), s.bw(), + s.tid(), s.seqno(), s.name(), s.state(), s.bw(), *s.error_code()) : fmt::format( - "{{tid: {}, seqno: {}, state: {}, bw: {}}}", - s.tid(), s.seqno(), s.state(), s.bw()); + "{{tid: {}, seqno: {}, name: {}, state: {}, bw: {}}}", + s.tid(), s.seqno(), s.name(), s.state(), s.bw()); return formatter::format(str, ctx); } }; diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp index 7257fb1fc86e8ee828f4f7494585f6f3d6751559..4dd6f722b1c8857e2ed5fde4cff8a382ad1ee28a 100644 --- a/src/proto/rpc/response.hpp +++ b/src/proto/rpc/response.hpp @@ -109,6 +109,10 @@ using status_response = response_with_value>, Error>; +template +using statuses_response = response_with_value< + std::vector>>, Error>; + } // namespace cargo::proto #endif // CARGO_PROTO_RPC_RESPONSE_HPP diff --git a/src/request_manager.cpp b/src/request_manager.cpp index 1496c03d4ef8ccd8d438e53e5767af5c8759a074..567446d4cab3aa898a29a4063a215f51857089ff 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -56,7 +56,7 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { } error_code -request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, +request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, std::optional ec) { @@ -65,7 +65,7 @@ request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, if(const auto it = m_requests.find(tid); it != m_requests.end()) { assert(seqno < it->second.size()); assert(wid < it->second[seqno].size()); - it->second[seqno][wid].update(s, bw, ec); + it->second[seqno][wid].update(name, s, bw, ec); return error_code::success; } @@ -92,8 +92,39 @@ request_manager::lookup(std::uint64_t tid) { return request_status{ps}; } } + // TODO : completed should have the name of the file if its not found + return request_status{"", transfer_state::completed, 0.0f}; + } + + LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); + return tl::make_unexpected(error_code::no_such_transfer); +} + +tl::expected, error_code> +request_manager::lookup_all(std::uint64_t tid) { + + abt::shared_lock lock(m_mutex); + + std::vector result; + if(const auto it = m_requests.find(tid); it != m_requests.end()) { - return request_status{transfer_state::completed, 0.0f}; + const auto& file_statuses = it->second; + // we calculate always the mean of the BW + for(const auto& fs : file_statuses) { + float bw = 0; + request_status rs(*fs.begin()); + for(const auto& ps : fs) { + bw += ps.bw(); + if(ps.state() == transfer_state::completed) { + continue; + } + // not finished + rs = request_status{ps}; + } + rs.bw(bw/(double)fs.size()); + result.push_back(rs); + } + return result; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); diff --git a/src/request_manager.hpp b/src/request_manager.hpp index 1928d61b819bd1669a6b1ad547eeda93c8b3f2ec..46beddad3a703ec4c98f4f1ff668ea4a58848996 100644 --- a/src/request_manager.hpp +++ b/src/request_manager.hpp @@ -60,12 +60,15 @@ public: error_code update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, - transfer_state s, float bw, + std::string name, transfer_state s, float bw, std::optional ec = std::nullopt); tl::expected lookup(std::uint64_t tid); + tl::expected, error_code> + lookup_all(std::uint64_t tid); + error_code remove(std::uint64_t tid); diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index bd72a6e4a38a4860bc6a1faeb2b7635f5ded0ff5..10984c6fc97bd57f03ad815185d57a313a4def4f 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -50,12 +50,21 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; cargo::error_code m_status; - + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; std::unique_ptr m_output_file; int m_workers_size; int m_workers_rank; diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index eb50129e7a14514e3986eac469a1d81f4f26fec8..0edf7356b8e0f70d78494d78f0296c9212fad77e 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -54,14 +54,22 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; - cargo::error_code m_status; - + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; std::unique_ptr m_input_file; int m_workers_size; diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 0d72c18542115371156fc5d4c9bd5967d6f89f0b..9a3cdf4195b692329f5f1b6b38cf15a5e1769e68 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -39,6 +39,7 @@ operation::make_operation(cargo::tag t, mpi::communicator workers, std::uint64_t block_size, FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) { using cargo::tag; + switch(t) { case tag::pread: return std::make_unique( diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index ba3c1565cbf6ac892c049fbf35b469eda1f1dab9..2acd884cd56d2d7945f66c3b9bffbd17b93bab92 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -78,6 +78,13 @@ public: void bw(float_t bw); + virtual std::string + output_path() const = 0; + + virtual std::string + input_path() const = 0; + + private: std::int16_t m_sleep_value = 0; int m_rank; diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index b6dec2f28d49ca501d3e2d50fe23b28347ac6388..b0871223c5bbd99cef45321813a74f2ba734494f 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -69,10 +69,10 @@ seq_mixed_operation::operator()() { m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - m_output_file = std::make_unique(posix_file::create( - m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); - m_output_file->fallocate(0, 0, file_size); + m_output_file->fallocate(0, 0, file_size); m_workers_size = workers_size; m_workers_rank = workers_rank; @@ -141,9 +141,9 @@ seq_mixed_operation::progress(int ongoing_index) { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); -/* Do write */ - m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), - file_range.size()); + /* Do write */ + m_output_file->pwrite(m_buffer_regions[index], + file_range.offset(), file_range.size()); m_bytes_per_rank += n; @@ -184,5 +184,4 @@ seq_mixed_operation::progress(int ongoing_index) { return -1; } - } // namespace cargo diff --git a/src/worker/seq_mixed.hpp b/src/worker/seq_mixed.hpp index c3545ccfada3145587e51ac7ae5b68fa80955eb0..45b3ba6445ac3dc71bc6c7e9acd6117fea96f34e 100644 --- a/src/worker/seq_mixed.hpp +++ b/src/worker/seq_mixed.hpp @@ -38,9 +38,11 @@ namespace cargo { class seq_mixed_operation : public operation { public: - seq_mixed_operation(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, - FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + seq_mixed_operation(mpi::communicator workers, + std::filesystem::path input_path, + std::filesystem::path output_path, + std::uint64_t block_size, FSPlugin::type fs_i_type, + FSPlugin::type fs_o_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), @@ -54,11 +56,20 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; - + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; std::unique_ptr m_input_file; std::unique_ptr m_output_file; int m_workers_size; diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 4254d2c64256362ffee0369eae873ab0364be6cc..b91d865a902e4099b319e0914f6beaf84cc73ee2 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -239,5 +239,4 @@ seq_operation::progress(int ongoing_index) { return -1; } - } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 508c1979fb6f22690ee1b60b8ff33795e3922b26..8073cebb73c48d9c7629a297e85341354eb934af 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -54,13 +54,22 @@ public: int progress(int ongoing_index) final; + std::string + output_path() const { + return m_output_path; + } + + std::string + input_path() const { + return m_input_path; + } + private: mpi::communicator m_workers; - std::filesystem::path m_input_path; - std::filesystem::path m_output_path; - std::unique_ptr m_input_file; std::unique_ptr m_output_file; + std::filesystem::path m_input_path{}; + std::filesystem::path m_output_path{}; int m_workers_size; int m_workers_rank; std::size_t m_block_size; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index a680c840603be908b83db24627597a734f688675..0178b961c5206a60e77a30d3fa0fdd5a78192c53 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -53,11 +53,11 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, void update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - cargo::transfer_state st, float bw, + std::string name, cargo::transfer_state st, float bw, std::optional ec = std::nullopt) { mpi::communicator world; - const cargo::status_message m{tid, seqno, st, bw, ec}; + const cargo::status_message m{tid, seqno, name, st, bw, ec}; LOGGER_DEBUG("msg <= to: {} body: {{payload: {}}}", rank, m); world.send(rank, static_cast(cargo::tag::status), m); } @@ -119,7 +119,7 @@ worker::run() { if(index == -1) { // operation finished cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), ec ? transfer_state::failed : transfer_state::completed, 0.0f, ec); @@ -129,7 +129,7 @@ worker::run() { } else { // update only if BW is set if(op->bw() > 0.0f) { - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), transfer_state::running, op->bw()); } I->second.second = index; @@ -144,7 +144,7 @@ worker::run() { if(!msg) { // Only wait if there are no pending operations and no messages if(m_ops.size() == 0) { - std::this_thread::sleep_for(150ms); + std::this_thread::sleep_for(10ms); } continue; } @@ -174,13 +174,13 @@ worker::run() { op->set_comm(msg->source(), m.tid(), m.seqno(), t); - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), transfer_state::running, -1.0f); // Different scenarios read -> write | write -> read cargo::error_code ec = (*op)(); if(ec != cargo::error_code::transfer_in_progress) { - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), transfer_state::failed, -1.0f, ec); m_ops.erase(make_pair(m.input_path(), m.output_path())); }