From bbb15c7054639ef1858512577d3a57be6cd21307 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 27 Oct 2023 14:32:37 +0200 Subject: [PATCH 1/3] WIP BW Shaping messages --- src/proto/mpi/message.hpp | 11 +++++++++++ src/worker/worker.cpp | 10 ++++++++++ 2 files changed, 21 insertions(+) diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index e7cdc58..a638b2f 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -250,6 +250,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::shaper_message& s, FormatContext& ctx) const { + const auto str = fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 988a926..c1d4e82 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -196,6 +196,16 @@ worker::run() { break; } + case tag::bw_shaping: { + shaper_message m; + world.recv(msg->source(), msg->tag(), m); + LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + + // TODO: Do something with m.shaping; + + break; + } + case tag::shutdown: LOGGER_INFO("msg => from: {} body: {{shutdown}}", msg->source()); -- GitLab From 1c80c7a9683c2a8e06007c1d2b3c3675806b1ff6 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 30 Oct 2023 12:54:38 +0100 Subject: [PATCH 2/3] WIP sleep updated thread Moving info from master to worker progress loop update rebase task added chrono fmt Progress loop extracted Reduce simultaneous transfers wrong Error code in write error control Missing size for write Added BW, solved sleep bug (intialization) BW workflow finished Support for creating directories Directory Support Reduce CPU usage with dynamic wait Only call create directories, if the path is not 0 Updated Spack for 0.4.0 --- CMakeLists.txt | 2 +- lib/cargo.hpp | 4 ++ lib/libcargo.cpp | 5 +++ spack/packages/cargo/package.py | 1 + src/master.cpp | 65 +++++++++++++++++++++++++++++---- src/mpioxx.hpp | 15 ++++++++ src/proto/mpi/message.hpp | 3 +- src/worker/mpio_read.cpp | 12 ++++-- src/worker/mpio_write.cpp | 11 ++++-- src/worker/worker.cpp | 12 +++++- 10 files changed, 113 insertions(+), 17 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d5fade9..f51619c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.2.0 + VERSION 0.4.0 LANGUAGES C CXX ) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 04b0756..e10d897 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -32,6 +32,7 @@ #include #include +constexpr const uint64_t TIMES = 100; namespace cargo { using transfer_id = std::uint64_t; @@ -74,6 +75,9 @@ public: [[nodiscard]] bool supports_parallel_transfer() const noexcept; + void + path(std::string path); + template void serialize(Archive& ar) { diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index f084a0d..4cf89d2 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -72,6 +72,11 @@ dataset::path() const noexcept { return m_path; }; +void +dataset::path(std::string path) { + m_path = std::move(path); +}; + bool dataset::supports_parallel_transfer() const noexcept { return m_type == dataset::type::parallel; diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index 591f8b7..b2f2d60 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -36,6 +36,7 @@ class Cargo(CMakePackage): version("latest", branch="main") version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True) version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f") + version("0.4.0", sha256="7a3de25165a6c6ce9dc356634d89f7052f8d2bef") # build variants variant('build_type', diff --git a/src/master.cpp b/src/master.cpp index c9db8ae..54b5d70 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -108,16 +108,19 @@ void master_server::mpi_listener_ult() { mpi::communicator world; - + uint64_t times = 0; while(!m_shutting_down) { auto msg = world.iprobe(); if(!msg) { - thallium::thread::self().sleep(m_network_engine, 150); + thallium::thread::self().sleep(m_network_engine, 150*times); + if (times < TIMES) { + times++; + } continue; } - + times=0; switch(static_cast(msg->tag())) { case tag::status: { status_message m; @@ -233,13 +236,61 @@ master_server::transfer_datasets(const network::request& req, assert(sources.size() == targets.size()); for(auto i = 0u; i < sources.size(); ++i) { + const auto& s = sources[i]; const auto& d = targets[i]; - for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { - const auto [t, m] = make_message(r.tid(), i, s, d); - LOGGER_INFO("msg <= to: {} body: {}", rank, m); - world.send(static_cast(rank), t, m); + + // We need to expand directories to single files on the s + // Then create a new message for each file and append the + // file to the d prefix + // We will asume that the path is the original relative + // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then + // bbb/xxx -> ttt/xxx + const auto& p = s.path(); + std::vector files; + if(std::filesystem::is_directory(p)) { + LOGGER_INFO("Expanding input directory {}", p); + for(const auto& f : + std::filesystem::recursive_directory_iterator(p)) { + if (std::filesystem::is_regular_file(f)) { + files.push_back(f.path()); + } + } + + /* + We have all the files expanded. Now create a new + cargo::dataset for each file as s and a new + cargo::dataset appending the base directory in d to the + file name. + */ + for(const auto& f : files) { + cargo::dataset s_new(s); + cargo::dataset d_new(d); + s_new.path(f); + // We need to get filename from the original root path (d.path) plus the path from f, removing the initial path p + d_new.path(d.path() / std::filesystem::path(f.string().substr(p.size() + 1))); + + LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), + d_new.path()); + for(std::size_t rank = 1; rank <= r.nworkers(); + ++rank) { + const auto [t, m] = + make_message(r.tid(), i, s_new, d_new); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } + } + + } else { + // normal use case, we are specifying files + + for(std::size_t rank = 1; rank <= r.nworkers(); + ++rank) { + const auto [t, m] = make_message(r.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } } } diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 87244dc..4138e48 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -135,6 +135,21 @@ public: MPI_File result; + // At this point we may face the possibility of an unexistent directory + // The File open semantics will not create the directory and fail. + // As the operation are done in the prolog, we may not been able to create + // such directory in the parallel filesystem nor the adhoc fs. + + // We will create the needed directories if we are writing. + + if (mode == file_open_mode::wronly) { + // Decompose the filepath and create the needed directories. + const std::filesystem::path dir = filepath.parent_path(); + if (!std::filesystem::exists(dir)) { + std::filesystem::create_directories(dir); + } + } + if(const auto ec = MPI_File_open(comm, filepath.c_str(), static_cast(mode), MPI_INFO_NULL, &result); diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index a638b2f..5660ecc 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -256,7 +256,8 @@ struct fmt::formatter : formatter { template auto format(const cargo::shaper_message& s, FormatContext& ctx) const { - const auto str = fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); + const auto str = + fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); return formatter::format(str, ctx); } }; diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 1e0a6ab..ad531d0 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -121,6 +121,11 @@ mpio_read::operator()() { return make_mpi_error(ec); } + + // step-pre3. Create needed directories + if(!m_output_path.parent_path().empty()) { + std::filesystem::create_directories(m_output_path.parent_path()); + } // step3. POSIX write data m_output_file = std::make_unique( posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR)); @@ -187,9 +192,10 @@ mpio_read::progress(int ongoing_index) { .count(); if((elapsed_seconds) > 0) { bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); - LOGGER_INFO("BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", - m_block_size / 1024.0, elapsed_seconds, bw(), - sleep_value()); + LOGGER_DEBUG( + "BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); } // Do sleep std::this_thread::sleep_for(sleep_value()); diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index ccf81e0..46865ae 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -148,9 +148,9 @@ mpio_write::progress(int ongoing_index) { .count(); if((elapsed_seconds) > 0) { bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); - LOGGER_INFO("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", - m_block_size / 1024.0, elapsed_seconds, bw(), - sleep_value()); + LOGGER_DEBUG("BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); } m_bytes_per_rank += n; @@ -159,6 +159,11 @@ mpio_write::progress(int ongoing_index) { ++index; } + + // step pre-2 Create the directory if it does not exist + if(!m_output_path.parent_path().empty()) { + std::filesystem::create_directories(m_output_path.parent_path()); + } // step 2. write buffer data in parallel to the PFS const auto output_file = mpioxx::file::open(m_workers, m_output_path, diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index c1d4e82..0aba755 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -142,7 +142,7 @@ worker::run() { } continue; } - + times = 0; switch(const auto t = static_cast(msg->tag())) { case tag::pread: [[fallthrough]]; @@ -200,8 +200,16 @@ worker::run() { shaper_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + const auto op = I->second.first.get(); + if(op) { + + op->set_bw_shaping(0); + } else { + LOGGER_INFO("Operation non existent", msg->source(), m); + } + } - // TODO: Do something with m.shaping; break; } -- GitLab From 360f472799ee0ed09d9cc31fe4fd96c1d6844763 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 7 Nov 2023 20:02:35 +0100 Subject: [PATCH 3/3] Solved MPIopen bug --- CMakeLists.txt | 2 +- lib/cargo.hpp | 1 - spack/packages/cargo/package.py | 2 +- src/master.cpp | 145 +++++++++++++++++--------------- src/mpioxx.hpp | 16 +--- src/proto/mpi/message.hpp | 11 --- src/worker/mpio_read.cpp | 5 -- src/worker/mpio_write.cpp | 5 -- src/worker/worker.cpp | 27 ++---- 9 files changed, 87 insertions(+), 127 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f51619c..fe8e8d4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.4.0 + VERSION 0.3.0 LANGUAGES C CXX ) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index e10d897..90d0c9a 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -32,7 +32,6 @@ #include #include -constexpr const uint64_t TIMES = 100; namespace cargo { using transfer_id = std::uint64_t; diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index b2f2d60..ebae2b7 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -36,7 +36,7 @@ class Cargo(CMakePackage): version("latest", branch="main") version("0.1.0", sha256="981d00adefbc2ea530f57f8428bd7980e4aab2993a86d8ae4274334c8f055bdb", deprecated=True) version("0.2.0", sha256="fd7fa31891b3961dcb376556ec5fa028bf512d96a7c688a160f9dade58dae36f") - version("0.4.0", sha256="7a3de25165a6c6ce9dc356634d89f7052f8d2bef") + version("0.3.0", branch="rnou/directory_support") # build variants variant('build_type', diff --git a/src/master.cpp b/src/master.cpp index 54b5d70..8583b3e 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -108,19 +108,15 @@ void master_server::mpi_listener_ult() { mpi::communicator world; - uint64_t times = 0; while(!m_shutting_down) { auto msg = world.iprobe(); if(!msg) { - thallium::thread::self().sleep(m_network_engine, 150*times); - if (times < TIMES) { - times++; - } + thallium::thread::self().sleep(m_network_engine, 150); continue; } - times=0; + switch(static_cast(msg->tag())) { case tag::status: { status_message m; @@ -226,74 +222,91 @@ master_server::transfer_datasets(const network::request& req, LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources, targets); - m_request_manager.create(sources.size(), world.size() - 1) + + // As we accept directories expanding directories should be done before and + // update sources and targets. + + std::vector v_s_new; + std::vector v_d_new; + + for(auto i = 0u; i < sources.size(); ++i) { + + const auto& s = sources[i]; + const auto& d = targets[i]; + + + // We need to expand directories to single files on the s + // Then create a new message for each file and append the + // file to the d prefix + // We will asume that the path is the original relative + // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then + // bbb/xxx -> ttt/xxx + const auto& p = s.path(); + std::vector files; + if(std::filesystem::is_directory(p)) { + LOGGER_INFO("Expanding input directory {}", p); + for(const auto& f : + std::filesystem::recursive_directory_iterator(p)) { + if(std::filesystem::is_regular_file(f)) { + files.push_back(f.path()); + } + } + + /* + We have all the files expanded. Now create a new + cargo::dataset for each file as s and a new + cargo::dataset appending the base directory in d to the + file name. + */ + for(const auto& f : files) { + cargo::dataset s_new(s); + cargo::dataset d_new(d); + s_new.path(f); + // We need to get filename from the original root + // path (d.path) plus the path from f, removing the + // initial path p + d_new.path(d.path() / std::filesystem::path( + f.string().substr(p.size() + 1))); + + LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), + d_new.path()); + v_s_new.push_back(s_new); + v_d_new.push_back(d_new); + } + + } else { + v_s_new.push_back(s); + v_d_new.push_back(d); + } + } + + m_request_manager.create(v_s_new.size(), world.size() - 1) .or_else([&](auto&& ec) { LOGGER_ERROR("Failed to create request: {}", ec); LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(generic_response{rpc.id(), ec}); }) .map([&](auto&& r) { - assert(sources.size() == targets.size()); - - for(auto i = 0u; i < sources.size(); ++i) { - - const auto& s = sources[i]; - const auto& d = targets[i]; - - - // We need to expand directories to single files on the s - // Then create a new message for each file and append the - // file to the d prefix - // We will asume that the path is the original relative - // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then - // bbb/xxx -> ttt/xxx - const auto& p = s.path(); - std::vector files; - if(std::filesystem::is_directory(p)) { - LOGGER_INFO("Expanding input directory {}", p); - for(const auto& f : - std::filesystem::recursive_directory_iterator(p)) { - if (std::filesystem::is_regular_file(f)) { - files.push_back(f.path()); - } - } - - /* - We have all the files expanded. Now create a new - cargo::dataset for each file as s and a new - cargo::dataset appending the base directory in d to the - file name. - */ - for(const auto& f : files) { - cargo::dataset s_new(s); - cargo::dataset d_new(d); - s_new.path(f); - // We need to get filename from the original root path (d.path) plus the path from f, removing the initial path p - d_new.path(d.path() / std::filesystem::path(f.string().substr(p.size() + 1))); - - LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), - d_new.path()); - for(std::size_t rank = 1; rank <= r.nworkers(); - ++rank) { - const auto [t, m] = - make_message(r.tid(), i, s_new, d_new); - LOGGER_INFO("msg <= to: {} body: {}", rank, m); - world.send(static_cast(rank), t, m); - } - } - - } else { - // normal use case, we are specifying files - - for(std::size_t rank = 1; rank <= r.nworkers(); - ++rank) { - const auto [t, m] = make_message(r.tid(), i, s, d); - LOGGER_INFO("msg <= to: {} body: {}", rank, m); - world.send(static_cast(rank), t, m); - } + assert(v_s_new.size() == v_d_new.size()); + + + // For all the files + for(std::size_t i = 0; i < v_s_new.size(); ++i) { + const auto& s = v_s_new[i]; + const auto& d = v_d_new[i]; + + // Create the directory if it does not exist + if(!std::filesystem::path(d.path()).parent_path().empty()) { + std::filesystem::create_directories( + std::filesystem::path(d.path()).parent_path()); } - } + for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { + const auto [t, m] = make_message(r.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } + } LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, error_code::success, r.tid()); req.respond(response_with_id{rpc.id(), error_code::success, diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 4138e48..7566534 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -135,21 +135,7 @@ public: MPI_File result; - // At this point we may face the possibility of an unexistent directory - // The File open semantics will not create the directory and fail. - // As the operation are done in the prolog, we may not been able to create - // such directory in the parallel filesystem nor the adhoc fs. - - // We will create the needed directories if we are writing. - - if (mode == file_open_mode::wronly) { - // Decompose the filepath and create the needed directories. - const std::filesystem::path dir = filepath.parent_path(); - if (!std::filesystem::exists(dir)) { - std::filesystem::create_directories(dir); - } - } - + if(const auto ec = MPI_File_open(comm, filepath.c_str(), static_cast(mode), MPI_INFO_NULL, &result); diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index 5660ecc..bc618f2 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -250,17 +250,6 @@ struct fmt::formatter : formatter { } }; -template <> -struct fmt::formatter : formatter { - // parse is inherited from formatter. - template - auto - format(const cargo::shaper_message& s, FormatContext& ctx) const { - const auto str = - fmt::format("{{tid: {}, shaping: {}}}", s.tid(), s.shaping()); - return formatter::format(str, ctx); - } -}; template <> struct fmt::formatter : formatter { diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index ad531d0..e16064d 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -121,11 +121,6 @@ mpio_read::operator()() { return make_mpi_error(ec); } - - // step-pre3. Create needed directories - if(!m_output_path.parent_path().empty()) { - std::filesystem::create_directories(m_output_path.parent_path()); - } // step3. POSIX write data m_output_file = std::make_unique( posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR)); diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 46865ae..c4b51e8 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -159,11 +159,6 @@ mpio_write::progress(int ongoing_index) { ++index; } - - // step pre-2 Create the directory if it does not exist - if(!m_output_path.parent_path().empty()) { - std::filesystem::create_directories(m_output_path.parent_path()); - } // step 2. write buffer data in parallel to the PFS const auto output_file = mpioxx::file::open(m_workers, m_output_path, diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 0aba755..b8f218f 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -109,7 +109,10 @@ worker::run() { // FIXME: sleep time should be configurable // Progress through all transfers - for(auto I = m_ops.begin(); I != m_ops.end(); I++) { + + auto I = m_ops.begin(); + auto IE = m_ops.end(); + if (I != IE) { auto op = I->second.first.get(); int index = I->second.second; if(op) { @@ -125,9 +128,6 @@ worker::run() { // Transfer finished I = m_ops.erase(I); - if(I == m_ops.end()) { - break; - } } else { update_state(op->source(), op->tid(), op->seqno(), transfer_state::running, op->bw()); @@ -142,7 +142,7 @@ worker::run() { } continue; } - times = 0; + switch(const auto t = static_cast(msg->tag())) { case tag::pread: [[fallthrough]]; @@ -196,23 +196,6 @@ worker::run() { break; } - case tag::bw_shaping: { - shaper_message m; - world.recv(msg->source(), msg->tag(), m); - LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); - for(auto I = m_ops.begin(); I != m_ops.end(); I++) { - const auto op = I->second.first.get(); - if(op) { - - op->set_bw_shaping(0); - } else { - LOGGER_INFO("Operation non existent", msg->source(), m); - } - } - - - break; - } case tag::shutdown: LOGGER_INFO("msg => from: {} body: {{shutdown}}", -- GitLab