From e105dc101ba031cad0136677d7743f4343c04c1f Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 9 Apr 2025 20:33:43 +0200 Subject: [PATCH 01/10] gcc update --- CMakeLists.txt | 6 +++--- lib/fmt_formatters.hpp | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5ea946c..10c0327 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -213,7 +213,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building {fmt}") FetchContent_Declare( fmt GIT_REPOSITORY https://github.com/fmtlib/fmt - GIT_TAG 10.2.1 + GIT_TAG 11.1.4 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -226,7 +226,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building spdlog") FetchContent_Declare( spdlog GIT_REPOSITORY https://github.com/gabime/spdlog - GIT_TAG v1.14.0 + GIT_TAG v1.15.2 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -329,7 +329,7 @@ if (CARGO_BUILD_TESTS) FetchContent_Declare( Catch2 GIT_REPOSITORY https://github.com/catchorg/Catch2.git - GIT_TAG 605a34765aa5d5ecbf476b4598a862ada971b0cc # v3.0.1 + GIT_TAG v3.8.0 # v3.0.1 GIT_SHALLOW ON GIT_PROGRESS ON ) diff --git a/lib/fmt_formatters.hpp b/lib/fmt_formatters.hpp index 7ab61be..288cebf 100644 --- a/lib/fmt_formatters.hpp +++ b/lib/fmt_formatters.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #include "cargo/error.hpp" -- GitLab From bf854d00a77ee17f99469cd6a5fc5717642ea061 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 9 Apr 2025 22:24:38 +0200 Subject: [PATCH 02/10] close file --- CMakeLists.txt | 3 ++- lib/CMakeLists.txt | 1 + src/master.cpp | 5 +++-- src/posix_file/posix_file/file.hpp | 5 ++++- src/worker/mpio_write.cpp | 10 ++++++---- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 10c0327..0fe1497 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -238,7 +238,7 @@ set_target_properties(spdlog PROPERTIES POSITION_INDEPENDENT_CODE ON) message(STATUS "[${PROJECT_NAME}] Searching for CLI11") FetchContent_Declare(cli11 GIT_REPOSITORY https://github.com/CLIUtils/CLI11 -GIT_TAG 291c58789c031208f08f4f261a858b5b7083e8e2 # v2.3.2 +GIT_TAG v2.5.0 # v2.3.2 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -293,6 +293,7 @@ endif() ### Threads: required by ASIO find_package(Threads REQUIRED) +find_package(Python3 COMPONENTS Development) ### ASIO: used for signal handling ### diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index ea292ac..e35cce5 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -45,6 +45,7 @@ target_link_libraries(cargo PRIVATE Boost::serialization Boost::mpi net::rpc_client + Python3::Python ) ## Install library + targets ################################################### diff --git a/src/master.cpp b/src/master.cpp index 2b88b7e..cdb2c41 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -242,8 +242,9 @@ master_server::ftio_scheduling_ult() { continue; } - LOGGER_INFO("Checking if there is work to do in {}", - m_pending_transfer.m_sources); + LOGGER_INFO("Checking if there is work to do in {} (should be empty : {})", + m_pending_transfer.m_sources, m_pending_transfer.m_expanded_sources ); + m_pending_transfer.m_expanded_sources = {}; m_pending_transfer.m_expanded_targets = {}; diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 8b12b14..2742423 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -288,7 +288,10 @@ public: return bytes_written; } - + int handle() const noexcept { + return m_handle.native(); + } + protected: const std::filesystem::path m_path; file_handle m_handle; diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 7e2c051..62e2e11 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -50,7 +50,7 @@ mpio_write::operator()() { // We need to open the file and ask size (using fs_plugin) m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - + LOGGER_INFO("Worker: Opening {} --> handler {} ", m_input_path, m_input_file->handle()); std::size_t file_size = m_file_size; // compute the number of blocks in the file @@ -117,7 +117,7 @@ mpio_write::progress(int ongoing_index) { using posix_file::views::strided; // compute the number of blocks in the file - + // LOGGER_INFO("Worker: Progress {}/{} --> handler {} ", ongoing_index, m_input_path, m_input_file->handle()); int index = 0; if(ongoing_index == 0) { m_bytes_per_rank = 0; @@ -177,7 +177,7 @@ mpio_write::progress(int ongoing_index) { ++index; } - + m_input_file->close(); // step 2. write buffer data in parallel to the PFS //LOGGER_INFO("START WRITING file {}", m_output_path); const auto output_file = @@ -227,6 +227,8 @@ mpio_write::progress(int ongoing_index) { m_status = make_mpi_error(ec); return -1; } + + } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_mpi_error(e.error_code()); @@ -247,7 +249,7 @@ mpio_write::progress(int ongoing_index) { //LOGGER_INFO("END WRITING file {}", m_output_path); m_status = error_code::success; - + return -1; } -- GitLab From 08597452c568faef7a9416431594a8c11b90562c Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 10:46:44 +0200 Subject: [PATCH 03/10] updated other backends --- src/worker/mpio_read.cpp | 3 ++- src/worker/seq_mixed.cpp | 1 + src/worker/sequential.cpp | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index ee9c572..6739780 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -128,7 +128,8 @@ mpio_read::operator()() { mpi::error_string(ec)); return make_mpi_error(ec); } - + + // step3. POSIX write data // We need to create the directory if it does not exists (using // FSPlugin) diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index 7663759..b18bd4a 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -185,6 +185,7 @@ seq_mixed_operation::progress(int ongoing_index) { m_status = error_code::success; + m_input_file->close(); m_output_file->close(); return -1; } diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 42e3b9a..4ddeaf8 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -185,7 +185,7 @@ seq_operation::progress(int ongoing_index) { // step3. POSIX write data // We need to create the directory if it does not exists (using // FSPlugin) - + m_input_file->close(); if(write and ongoing_index == 0) { m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); @@ -230,7 +230,7 @@ seq_operation::progress(int ongoing_index) { ++index; } - + m_output_file->close(); } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_system_error(e.error_code()); -- GitLab From 6a898b681841fac7d5ef6444ccd5071b82a00210 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 10:50:51 +0200 Subject: [PATCH 04/10] solve python on ci --- CMakeLists.txt | 2 +- lib/CMakeLists.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0fe1497..5820afe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -293,7 +293,7 @@ endif() ### Threads: required by ASIO find_package(Threads REQUIRED) -find_package(Python3 COMPONENTS Development) +find_package(Python COMPONENTS Development) ### ASIO: used for signal handling ### diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index e35cce5..cc03b0d 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -45,7 +45,7 @@ target_link_libraries(cargo PRIVATE Boost::serialization Boost::mpi net::rpc_client - Python3::Python + Python::Python ) ## Install library + targets ################################################### -- GitLab From 5517dc0ea2a1ebbdaf9b75cf483f33ca7f68e0eb Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 11:00:41 +0200 Subject: [PATCH 05/10] conditional python --- lib/CMakeLists.txt | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index cc03b0d..4f3d416 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -45,9 +45,15 @@ target_link_libraries(cargo PRIVATE Boost::serialization Boost::mpi net::rpc_client - Python::Python ) +## If Python::Python available. update link libraries +if (Python_FOUND) + target_link_libraries(cargo PRIVATE Python::Python) +endif () + + + ## Install library + targets ################################################### set_target_properties( -- GitLab From 595f00329bf21881f1d2dd33c446c19ba7477cb7 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 11:50:05 +0200 Subject: [PATCH 06/10] remove output file --- src/posix_file/posix_file/file.hpp | 1 + src/worker/mpio_write.cpp | 4 +++- src/worker/seq_mixed.cpp | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 2742423..d896477 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -317,6 +317,7 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode, // We don't check if it exists, we just create it if flags is set to O_CREAT if(flags & O_CREAT) { + fs_plugin->unlink(filepath); recursive_mkdir(filepath, fs_plugin); } diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 62e2e11..5e7c034 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -50,7 +50,8 @@ mpio_write::operator()() { // We need to open the file and ask size (using fs_plugin) m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - LOGGER_INFO("Worker: Opening {} --> handler {} ", m_input_path, m_input_file->handle()); + remove (m_output_path.c_str()); + LOGGER_INFO("Worker: Opening {} --> handler {} / fileSize {} --- Output {} ", m_input_path, m_input_file->handle(), m_file_size, m_output_path); std::size_t file_size = m_file_size; // compute the number of blocks in the file @@ -180,6 +181,7 @@ mpio_write::progress(int ongoing_index) { m_input_file->close(); // step 2. write buffer data in parallel to the PFS //LOGGER_INFO("START WRITING file {}", m_output_path); + const auto output_file = mpioxx::file::open(m_workers, m_output_path, mpioxx::file_open_mode::create | diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index b18bd4a..6d45f2f 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -73,7 +73,6 @@ seq_mixed_operation::operator()() { m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, block_size); } - m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); -- GitLab From dc2baf642fdbae112a4bab57ecba36c729a8e394 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 13:39:38 +0200 Subject: [PATCH 07/10] fix mtime and mpio_write --- src/master.cpp | 2 +- src/worker/mpio_write.cpp | 83 +++++++++++++++++++++++++++------------ 2 files changed, 59 insertions(+), 26 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index cdb2c41..cc6d50f 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -383,7 +383,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { std::vector v_d_new; std::vector v_size_new; time_t now = time(0); - // now = now - 5; // Threshold for mtime + now = now - 1; // Threshold for mtime for(auto i = 0u; i < pt.m_sources.size(); ++i) { const auto& s = pt.m_sources[i]; diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 5e7c034..26bd3d7 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -41,17 +41,20 @@ mpio_write::operator()() { auto workers_size = m_workers.size(); auto workers_rank = m_workers.rank(); - if (m_single) { + if(m_single) { workers_size = 1; workers_rank = 0; } - + std::size_t block_size = m_kb_size * 1024u; // We need to open the file and ask size (using fs_plugin) m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - remove (m_output_path.c_str()); - LOGGER_INFO("Worker: Opening {} --> handler {} / fileSize {} --- Output {} ", m_input_path, m_input_file->handle(), m_file_size, m_output_path); + remove(m_output_path.c_str()); + LOGGER_INFO( + "Worker: Opening {} --> handler {} / fileSize {} --- Output {} ", + m_input_path, m_input_file->handle(), m_file_size, + m_output_path); std::size_t file_size = m_file_size; // compute the number of blocks in the file @@ -118,7 +121,8 @@ mpio_write::progress(int ongoing_index) { using posix_file::views::strided; // compute the number of blocks in the file - // LOGGER_INFO("Worker: Progress {}/{} --> handler {} ", ongoing_index, m_input_path, m_input_file->handle()); + // LOGGER_INFO("Worker: Progress {}/{} --> handler {} ", ongoing_index, + // m_input_path, m_input_file->handle()); int index = 0; if(ongoing_index == 0) { m_bytes_per_rank = 0; @@ -151,18 +155,20 @@ mpio_write::progress(int ongoing_index) { m_bytes_per_rank += n; + // Do sleep (But be a bit reactive...) auto total_sleep = sleep_value(); auto small_sleep = total_sleep / 100; - if (small_sleep == std::chrono::milliseconds(0)) small_sleep = std::chrono::milliseconds(1); - while( total_sleep > std::chrono::milliseconds(0)) { + if(small_sleep == std::chrono::milliseconds(0)) + small_sleep = std::chrono::milliseconds(1); + while(total_sleep > std::chrono::milliseconds(0)) { std::this_thread::sleep_for(small_sleep); total_sleep -= small_sleep; - if (total_sleep > sleep_value()) { + if(total_sleep > sleep_value()) { break; } } - + auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -180,8 +186,7 @@ mpio_write::progress(int ongoing_index) { } m_input_file->close(); // step 2. write buffer data in parallel to the PFS - //LOGGER_INFO("START WRITING file {}", m_output_path); - + const auto output_file = mpioxx::file::open(m_workers, m_output_path, mpioxx::file_open_mode::create | @@ -218,19 +223,47 @@ mpio_write::progress(int ongoing_index) { return -1; } - // step 3. parallel write data from buffers - if(const auto ec = - MPI_File_write_all(output_file, m_buffer.data(), - static_cast(m_bytes_per_rank), - MPI_BYTE, MPI_STATUS_IGNORE); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_write_all() failed: {}", - mpi::error_string(ec)); - m_status = make_mpi_error(ec); - return -1; + // step 3. parallel write data from buffers in chunks if necessary + const std::size_t max_bytes_per_call = + INT_MAX; // Maximum bytes for one MPI call + std::size_t bytes_written_total = 0; + const char* current_buffer_ptr = m_buffer.data(); + + while(bytes_written_total < m_bytes_per_rank) { + std::size_t bytes_to_write_this_call = + m_bytes_per_rank - bytes_written_total; + if(bytes_to_write_this_call > max_bytes_per_call) { + bytes_to_write_this_call = max_bytes_per_call; + } + + // Ensure the cast is now safe + int count_this_call = static_cast(bytes_to_write_this_call); + + LOGGER_DEBUG("MPI_File_write_all chunk: Writing {} bytes", + count_this_call); + + if(const auto ec = MPI_File_write_all( + output_file, current_buffer_ptr, count_this_call, + MPI_BYTE, MPI_STATUS_IGNORE); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_File_write_all() chunk failed: {}", + mpi::error_string(ec)); + // Consider freeing MPI types before returning/throwing + MPI_Type_free(&block_type); + MPI_Type_free(&file_type); + m_status = make_mpi_error(ec); + return -1; // Or throw an exception + } + + bytes_written_total += bytes_to_write_this_call; + current_buffer_ptr += bytes_to_write_this_call; } - - + + // Don't forget to free the MPI types after they are no longer needed + MPI_Type_free(&block_type); + MPI_Type_free(&file_type); + + } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_mpi_error(e.error_code()); @@ -249,9 +282,9 @@ mpio_write::progress(int ongoing_index) { return -1; } - //LOGGER_INFO("END WRITING file {}", m_output_path); + // LOGGER_INFO("END WRITING file {}", m_output_path); m_status = error_code::success; - + return -1; } -- GitLab From 2859ca281d1b60fdb29e1ded49452d2c65bee58c Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 14:00:26 +0200 Subject: [PATCH 08/10] write only bytes read in sequential --- src/worker/seq_mixed.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index 6d45f2f..38def7a 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -147,7 +147,7 @@ seq_mixed_operation::progress(int ongoing_index) { /* Do write */ m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), - file_range.size()); + n); m_bytes_per_rank += n; -- GitLab From 2ecc30b8501f93e411e5c9de447bb4fded4902b9 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 10 Apr 2025 14:22:14 +0200 Subject: [PATCH 09/10] added mtime parameter --- src/cargo.cpp | 10 +++++++++- src/master.cpp | 11 ++++++----- src/master.hpp | 4 +++- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/cargo.cpp b/src/cargo.cpp index 1a1158a..efe48ef 100644 --- a/src/cargo.cpp +++ b/src/cargo.cpp @@ -51,6 +51,7 @@ struct cargo_config { std::string address; std::uint64_t blocksize; std::string REGEX_file; + int mtime_threshold; }; cargo_config @@ -84,6 +85,13 @@ parse_command_line(int argc, char* argv[]) { ->option_text("BLOCKSIZE") ->default_val(512); + app.add_option("-T,--mtime-threshold", cfg.mtime_threshold, + "Minimum age (in seconds) of a file's modification time " + "to be considered for transfer. Defaults to 1 second.") + ->option_text("SECONDS") + ->default_val(1) + ->check(CLI::NonNegativeNumber); // Ensure it's not negative + app.add_flag_function( "-v,--version", [&](auto /*count*/) { @@ -127,7 +135,7 @@ main(int argc, char* argv[]) { if(const auto rank = world.rank(); rank == 0) { cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize, fs::current_path(), - cfg.blocksize, cfg.REGEX_file}; + cfg.blocksize, cfg.REGEX_file, cfg.mtime_threshold}; if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, diff --git a/src/master.cpp b/src/master.cpp index cc6d50f..bb4ce24 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -101,6 +101,7 @@ namespace cargo { master_server::master_server(std::string name, std::string address, bool daemonize, std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file, + int mtime_threshold, std::optional pidfile) : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size), std::move(pidfile)), @@ -110,12 +111,12 @@ master_server::master_server(std::string name, std::string address, [this]() { mpi_listener_ult(); })), m_ftio_listener_ess(thallium::xstream::create()), m_ftio_listener_ult(m_ftio_listener_ess->make_thread( - [this]() { ftio_scheduling_ult(); })) + [this]() { ftio_scheduling_ult(); })), + m_block_size(block_size), + REGEX_file(std::move(regex_file)), + m_mtime_threshold(mtime_threshold) { - m_block_size = block_size; - REGEX_file = regex_file; - #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); @@ -383,7 +384,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { std::vector v_d_new; std::vector v_size_new; time_t now = time(0); - now = now - 1; // Threshold for mtime + now = now - m_mtime_threshold; // Threshold for mtime for(auto i = 0u; i < pt.m_sources.size(); ++i) { const auto& s = pt.m_sources[i]; diff --git a/src/master.hpp b/src/master.hpp index 6943121..bc70d3f 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -53,7 +53,8 @@ class master_server : public network::server, public network::provider { public: master_server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file, + std::filesystem::path rundir, std::uint64_t block_size, + std::string regex_file, int mtime_threshold, std::optional pidfile = {}); ~master_server(); @@ -121,6 +122,7 @@ private: // Request manager request_manager m_request_manager; std::string REGEX_file; + int m_mtime_threshold; }; } // namespace cargo -- GitLab From e16f4804fd10d23fa6e28f2782ae4a336de0bc51 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 11 Apr 2025 14:21:22 +0200 Subject: [PATCH 10/10] added -T parameter to cargo_ftio --- cli/ftio.cpp | 7 ++++++- src/master.cpp | 7 ++++--- src/master.hpp | 2 +- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cli/ftio.cpp b/cli/ftio.cpp index ddeb25e..b72844c 100644 --- a/cli/ftio.cpp +++ b/cli/ftio.cpp @@ -35,6 +35,7 @@ struct ftio_config { float confidence; float probability; float period; + int mtime; bool run{false}; bool pause{false}; bool resume{false}; @@ -65,6 +66,10 @@ parse_command_line(int argc, char* argv[]) { ->option_text("float") ->default_val("-1.0"); + app.add_option("-T,--mtime", cfg.mtime, "mtime") + ->option_text("int") + ->default_val("1"); + app.add_flag( "--run", cfg.run, "Trigger stage operation to run now. Has no effect when period is set > 0"); @@ -108,7 +113,7 @@ main(int argc, char* argv[]) { const auto& endpoint = result.value(); const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, - cfg.period, cfg.run, cfg.pause, cfg.resume); + cfg.period, cfg.run, cfg.pause, cfg.resume, cfg.mtime); if(retval.has_value()) { diff --git a/src/master.cpp b/src/master.cpp index bb4ce24..bc5a570 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -751,12 +751,13 @@ master_server::transfer_statuses(const network::request& req, void master_server::ftio_int(const network::request& req, float conf, float prob, - float period, bool run, bool pause, bool resume) { + float period, bool run, bool pause, bool resume, int mtime) { using network::get_address; using network::rpc_info; using proto::generic_response; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + m_mtime_threshold = mtime; if(pause) { // send shaping info for(int rank = 1; rank < world.size(); ++rank) { @@ -784,8 +785,8 @@ master_server::ftio_int(const network::request& req, float conf, float prob, m_ftio = true; } LOGGER_INFO( - "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}, pause: {}, resume: {}}}", - rpc, conf, prob, period, run, pause, resume); + "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}, pause: {}, resume: {}, mtime: {}}}", + rpc, conf, prob, period, run, pause, resume, mtime); const auto resp = generic_response{rpc.id(), error_code::success}; diff --git a/src/master.hpp b/src/master.hpp index bc70d3f..cd78b15 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -92,7 +92,7 @@ private: void ftio_int(const network::request& req, float confidence, float probability, - float period, bool run, bool pause, bool resume); + float period, bool run, bool pause, bool resume, int mtime); private: // Dedicated execution stream for the MPI listener ULT -- GitLab