diff --git a/CMakeLists.txt b/CMakeLists.txt index d5fade9109e63c79dd6910acfba75cf0aeb5cd57..fe8e8d4921b9fea89984ef2f8ef51a7faeb876d1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.2.0 + VERSION 0.3.0 LANGUAGES C CXX ) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 04b07565901d1fa236ea1a01efd6d0de39f3324b..90d0c9af7312856847838a68eda12e238deb4811 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -74,6 +74,9 @@ public: [[nodiscard]] bool supports_parallel_transfer() const noexcept; + void + path(std::string path); + template void serialize(Archive& ar) { diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index f084a0d3888674c28b749c57520dc2294ae022dd..4cf89d24442f461a31cd02f0aa4c9ab2f61c5e7a 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -72,6 +72,11 @@ dataset::path() const noexcept { return m_path; }; +void +dataset::path(std::string path) { + m_path = std::move(path); +}; + bool dataset::supports_parallel_transfer() const noexcept { return m_type == dataset::type::parallel; diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index 591f8b75644ae6a862a2a0dece080546d88f1cdc..ebae2b763bc83203cf25771580c686a5bfe41a38 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -36,6 +36,7 @@ class Cargo(CMakePackage): version("latest", branch="main") version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True) version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f") + version("0.3.0", branch="rnou/directory_support") # build variants variant('build_type', diff --git a/src/master.cpp b/src/master.cpp index c9db8aee7e4de8c1648fd313ba867e1d12f16e77..8583b3ed492fce963082f1aa4d5cec0aa13a7821 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -108,7 +108,6 @@ void master_server::mpi_listener_ult() { mpi::communicator world; - while(!m_shutting_down) { auto msg = world.iprobe(); @@ -223,18 +222,84 @@ master_server::transfer_datasets(const network::request& req, LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources, targets); - m_request_manager.create(sources.size(), world.size() - 1) + + // As we accept directories expanding directories should be done before and + // update sources and targets. + + std::vector v_s_new; + std::vector v_d_new; + + for(auto i = 0u; i < sources.size(); ++i) { + + const auto& s = sources[i]; + const auto& d = targets[i]; + + + // We need to expand directories to single files on the s + // Then create a new message for each file and append the + // file to the d prefix + // We will asume that the path is the original relative + // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then + // bbb/xxx -> ttt/xxx + const auto& p = s.path(); + std::vector files; + if(std::filesystem::is_directory(p)) { + LOGGER_INFO("Expanding input directory {}", p); + for(const auto& f : + std::filesystem::recursive_directory_iterator(p)) { + if(std::filesystem::is_regular_file(f)) { + files.push_back(f.path()); + } + } + + /* + We have all the files expanded. Now create a new + cargo::dataset for each file as s and a new + cargo::dataset appending the base directory in d to the + file name. + */ + for(const auto& f : files) { + cargo::dataset s_new(s); + cargo::dataset d_new(d); + s_new.path(f); + // We need to get filename from the original root + // path (d.path) plus the path from f, removing the + // initial path p + d_new.path(d.path() / std::filesystem::path( + f.string().substr(p.size() + 1))); + + LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), + d_new.path()); + v_s_new.push_back(s_new); + v_d_new.push_back(d_new); + } + + } else { + v_s_new.push_back(s); + v_d_new.push_back(d); + } + } + + m_request_manager.create(v_s_new.size(), world.size() - 1) .or_else([&](auto&& ec) { LOGGER_ERROR("Failed to create request: {}", ec); LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(generic_response{rpc.id(), ec}); }) .map([&](auto&& r) { - assert(sources.size() == targets.size()); + assert(v_s_new.size() == v_d_new.size()); - for(auto i = 0u; i < sources.size(); ++i) { - const auto& s = sources[i]; - const auto& d = targets[i]; + + // For all the files + for(std::size_t i = 0; i < v_s_new.size(); ++i) { + const auto& s = v_s_new[i]; + const auto& d = v_d_new[i]; + + // Create the directory if it does not exist + if(!std::filesystem::path(d.path()).parent_path().empty()) { + std::filesystem::create_directories( + std::filesystem::path(d.path()).parent_path()); + } for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), i, s, d); @@ -242,7 +307,6 @@ master_server::transfer_datasets(const network::request& req, world.send(static_cast(rank), t, m); } } - LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, error_code::success, r.tid()); req.respond(response_with_id{rpc.id(), error_code::success, diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 87244dc4d91adf94635cee8dc77243b83e4f65d6..756653483d06d477906e5dfb8bcad98477f32188 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -135,6 +135,7 @@ public: MPI_File result; + if(const auto ec = MPI_File_open(comm, filepath.c_str(), static_cast(mode), MPI_INFO_NULL, &result); diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index e7cdc583132fba03653a960e5bfa24b4c4d3ec83..bc618f2ec05bc6acfb8ecbecac662e00bbcacfbe 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -250,6 +250,7 @@ struct fmt::formatter : formatter { } }; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 1e0a6ab5b64ec1a7a4fcc8aabea6fa69a000b39e..e16064d8d73856b57e3f112ff4f843d26026d758 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -187,9 +187,10 @@ mpio_read::progress(int ongoing_index) { .count(); if((elapsed_seconds) > 0) { bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); - LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", - m_block_size / 1024.0, elapsed_seconds, bw(), - sleep_value()); + LOGGER_DEBUG( + "BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); } // Do sleep std::this_thread::sleep_for(sleep_value()); diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index ccf81e017b6de3efd65d17faff29f524436222f0..c4b51e8e4d361772c0ef137177306743e2908bb4 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -148,9 +148,9 @@ mpio_write::progress(int ongoing_index) { .count(); if((elapsed_seconds) > 0) { bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); - LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", - m_block_size / 1024.0, elapsed_seconds, bw(), - sleep_value()); + LOGGER_DEBUG("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); } m_bytes_per_rank += n; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 988a926aa98e1af5da5f23b028ebef059016282c..b8f218fb77dc7a734603b1d9a908314b246d4a93 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -109,7 +109,10 @@ worker::run() { // FIXME: sleep time should be configurable // Progress through all transfers - for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + + auto I = m_ops.begin(); + auto IE = m_ops.end(); + if (I != IE) { auto op = I->second.first.get(); int index = I->second.second; if(op) { @@ -125,9 +128,6 @@ worker::run() { // Transfer finished I = m_ops.erase(I); - if(I == m_ops.end()) { - break; - } } else { update_state(op->source(), op->tid(), op->seqno(), transfer_state::running, op->bw()); @@ -196,6 +196,7 @@ worker::run() { break; } + case tag::shutdown: LOGGER_INFO("msg => from: {} body: {{shutdown}}", msg->source());