diff --git a/CMakeLists.txt b/CMakeLists.txt index 5ea946c177a7bc2dbfbab15d3817ce2cf7a08c53..5820afec79c58284d1ae2f97cf13713fe011e233 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -213,7 +213,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building {fmt}") FetchContent_Declare( fmt GIT_REPOSITORY https://github.com/fmtlib/fmt - GIT_TAG 10.2.1 + GIT_TAG 11.1.4 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -226,7 +226,7 @@ message(STATUS "[${PROJECT_NAME}] Downloading and building spdlog") FetchContent_Declare( spdlog GIT_REPOSITORY https://github.com/gabime/spdlog - GIT_TAG v1.14.0 + GIT_TAG v1.15.2 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -238,7 +238,7 @@ set_target_properties(spdlog PROPERTIES POSITION_INDEPENDENT_CODE ON) message(STATUS "[${PROJECT_NAME}] Searching for CLI11") FetchContent_Declare(cli11 GIT_REPOSITORY https://github.com/CLIUtils/CLI11 -GIT_TAG 291c58789c031208f08f4f261a858b5b7083e8e2 # v2.3.2 +GIT_TAG v2.5.0 # v2.3.2 GIT_SHALLOW ON GIT_PROGRESS ON ) @@ -293,6 +293,7 @@ endif() ### Threads: required by ASIO find_package(Threads REQUIRED) +find_package(Python COMPONENTS Development) ### ASIO: used for signal handling ### @@ -329,7 +330,7 @@ if (CARGO_BUILD_TESTS) FetchContent_Declare( Catch2 GIT_REPOSITORY https://github.com/catchorg/Catch2.git - GIT_TAG 605a34765aa5d5ecbf476b4598a862ada971b0cc # v3.0.1 + GIT_TAG v3.8.0 # v3.0.1 GIT_SHALLOW ON GIT_PROGRESS ON ) diff --git a/cli/ftio.cpp b/cli/ftio.cpp index ddeb25ec5e113f1ffd28e541eb1dacaa60005973..b72844cef90b4d60eb45836aea05e1bbd02303fc 100644 --- a/cli/ftio.cpp +++ b/cli/ftio.cpp @@ -35,6 +35,7 @@ struct ftio_config { float confidence; float probability; float period; + int mtime; bool run{false}; bool pause{false}; bool resume{false}; @@ -65,6 +66,10 @@ parse_command_line(int argc, char* argv[]) { ->option_text("float") ->default_val("-1.0"); + app.add_option("-T,--mtime", cfg.mtime, "mtime") + ->option_text("int") + ->default_val("1"); + app.add_flag( "--run", cfg.run, "Trigger stage operation to run now. Has no effect when period is set > 0"); @@ -108,7 +113,7 @@ main(int argc, char* argv[]) { const auto& endpoint = result.value(); const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability, - cfg.period, cfg.run, cfg.pause, cfg.resume); + cfg.period, cfg.run, cfg.pause, cfg.resume, cfg.mtime); if(retval.has_value()) { diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index ea292ace24318297a57c592dd17cac74ead81b03..4f3d416226c5a5002ecfd1038bcbb3a8d6a4430f 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -47,6 +47,13 @@ target_link_libraries(cargo PRIVATE net::rpc_client ) +## If Python::Python available. update link libraries +if (Python_FOUND) + target_link_libraries(cargo PRIVATE Python::Python) +endif () + + + ## Install library + targets ################################################### set_target_properties( diff --git a/lib/fmt_formatters.hpp b/lib/fmt_formatters.hpp index 7ab61be285a7b488bfd216e03c2c598f399b436b..288cebf33da0de15035ab102bcc217de07f36fcd 100644 --- a/lib/fmt_formatters.hpp +++ b/lib/fmt_formatters.hpp @@ -34,6 +34,7 @@ #include #include #include +#include #include "cargo/error.hpp" diff --git a/src/cargo.cpp b/src/cargo.cpp index 1a1158af5eec3d3d146348de54cf51a50d5dfca9..efe48ef8b88d0cb65082faea007804c1668bb714 100644 --- a/src/cargo.cpp +++ b/src/cargo.cpp @@ -51,6 +51,7 @@ struct cargo_config { std::string address; std::uint64_t blocksize; std::string REGEX_file; + int mtime_threshold; }; cargo_config @@ -84,6 +85,13 @@ parse_command_line(int argc, char* argv[]) { ->option_text("BLOCKSIZE") ->default_val(512); + app.add_option("-T,--mtime-threshold", cfg.mtime_threshold, + "Minimum age (in seconds) of a file's modification time " + "to be considered for transfer. Defaults to 1 second.") + ->option_text("SECONDS") + ->default_val(1) + ->check(CLI::NonNegativeNumber); // Ensure it's not negative + app.add_flag_function( "-v,--version", [&](auto /*count*/) { @@ -127,7 +135,7 @@ main(int argc, char* argv[]) { if(const auto rank = world.rank(); rank == 0) { cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize, fs::current_path(), - cfg.blocksize, cfg.REGEX_file}; + cfg.blocksize, cfg.REGEX_file, cfg.mtime_threshold}; if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, diff --git a/src/master.cpp b/src/master.cpp index 2b88b7e8b76a1f5bc71ea77309585cb0fe43db75..bc5a57082a01e1a5c34dc9dd2f33d922e1f5caa7 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -101,6 +101,7 @@ namespace cargo { master_server::master_server(std::string name, std::string address, bool daemonize, std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file, + int mtime_threshold, std::optional pidfile) : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size), std::move(pidfile)), @@ -110,12 +111,12 @@ master_server::master_server(std::string name, std::string address, [this]() { mpi_listener_ult(); })), m_ftio_listener_ess(thallium::xstream::create()), m_ftio_listener_ult(m_ftio_listener_ess->make_thread( - [this]() { ftio_scheduling_ult(); })) + [this]() { ftio_scheduling_ult(); })), + m_block_size(block_size), + REGEX_file(std::move(regex_file)), + m_mtime_threshold(mtime_threshold) { - m_block_size = block_size; - REGEX_file = regex_file; - #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); @@ -242,8 +243,9 @@ master_server::ftio_scheduling_ult() { continue; } - LOGGER_INFO("Checking if there is work to do in {}", - m_pending_transfer.m_sources); + LOGGER_INFO("Checking if there is work to do in {} (should be empty : {})", + m_pending_transfer.m_sources, m_pending_transfer.m_expanded_sources ); + m_pending_transfer.m_expanded_sources = {}; m_pending_transfer.m_expanded_targets = {}; @@ -382,7 +384,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { std::vector v_d_new; std::vector v_size_new; time_t now = time(0); - // now = now - 5; // Threshold for mtime + now = now - m_mtime_threshold; // Threshold for mtime for(auto i = 0u; i < pt.m_sources.size(); ++i) { const auto& s = pt.m_sources[i]; @@ -749,12 +751,13 @@ master_server::transfer_statuses(const network::request& req, void master_server::ftio_int(const network::request& req, float conf, float prob, - float period, bool run, bool pause, bool resume) { + float period, bool run, bool pause, bool resume, int mtime) { using network::get_address; using network::rpc_info; using proto::generic_response; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + m_mtime_threshold = mtime; if(pause) { // send shaping info for(int rank = 1; rank < world.size(); ++rank) { @@ -782,8 +785,8 @@ master_server::ftio_int(const network::request& req, float conf, float prob, m_ftio = true; } LOGGER_INFO( - "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}, pause: {}, resume: {}}}", - rpc, conf, prob, period, run, pause, resume); + "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}, pause: {}, resume: {}, mtime: {}}}", + rpc, conf, prob, period, run, pause, resume, mtime); const auto resp = generic_response{rpc.id(), error_code::success}; diff --git a/src/master.hpp b/src/master.hpp index 6943121ad5b81defc5894c0a7f4a8188e7b2e1f4..cd78b154f1511542b4c95170b5af4b131e684d8b 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -53,7 +53,8 @@ class master_server : public network::server, public network::provider { public: master_server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file, + std::filesystem::path rundir, std::uint64_t block_size, + std::string regex_file, int mtime_threshold, std::optional pidfile = {}); ~master_server(); @@ -91,7 +92,7 @@ private: void ftio_int(const network::request& req, float confidence, float probability, - float period, bool run, bool pause, bool resume); + float period, bool run, bool pause, bool resume, int mtime); private: // Dedicated execution stream for the MPI listener ULT @@ -121,6 +122,7 @@ private: // Request manager request_manager m_request_manager; std::string REGEX_file; + int m_mtime_threshold; }; } // namespace cargo diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 8b12b14f0fb627d93fe3f7b85ece0dc44d730f0b..d89647700215596db54f64a35360ad48ed568a59 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -288,7 +288,10 @@ public: return bytes_written; } - + int handle() const noexcept { + return m_handle.native(); + } + protected: const std::filesystem::path m_path; file_handle m_handle; @@ -314,6 +317,7 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode, // We don't check if it exists, we just create it if flags is set to O_CREAT if(flags & O_CREAT) { + fs_plugin->unlink(filepath); recursive_mkdir(filepath, fs_plugin); } diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index ee9c572de15ba36d8c7ef19e38a82da16af95387..67397800f31e3f0c2c2ee07a9ce76efc03e19243 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -128,7 +128,8 @@ mpio_read::operator()() { mpi::error_string(ec)); return make_mpi_error(ec); } - + + // step3. POSIX write data // We need to create the directory if it does not exists (using // FSPlugin) diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 7e2c051a148673575c908a717727939c3fc72fcb..26bd3d7e07ee3a13ca344fff5c09b3530266d766 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -41,16 +41,20 @@ mpio_write::operator()() { auto workers_size = m_workers.size(); auto workers_rank = m_workers.rank(); - if (m_single) { + if(m_single) { workers_size = 1; workers_rank = 0; } - + std::size_t block_size = m_kb_size * 1024u; // We need to open the file and ask size (using fs_plugin) m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - + remove(m_output_path.c_str()); + LOGGER_INFO( + "Worker: Opening {} --> handler {} / fileSize {} --- Output {} ", + m_input_path, m_input_file->handle(), m_file_size, + m_output_path); std::size_t file_size = m_file_size; // compute the number of blocks in the file @@ -117,7 +121,8 @@ mpio_write::progress(int ongoing_index) { using posix_file::views::strided; // compute the number of blocks in the file - + // LOGGER_INFO("Worker: Progress {}/{} --> handler {} ", ongoing_index, + // m_input_path, m_input_file->handle()); int index = 0; if(ongoing_index == 0) { m_bytes_per_rank = 0; @@ -150,18 +155,20 @@ mpio_write::progress(int ongoing_index) { m_bytes_per_rank += n; + // Do sleep (But be a bit reactive...) auto total_sleep = sleep_value(); auto small_sleep = total_sleep / 100; - if (small_sleep == std::chrono::milliseconds(0)) small_sleep = std::chrono::milliseconds(1); - while( total_sleep > std::chrono::milliseconds(0)) { + if(small_sleep == std::chrono::milliseconds(0)) + small_sleep = std::chrono::milliseconds(1); + while(total_sleep > std::chrono::milliseconds(0)) { std::this_thread::sleep_for(small_sleep); total_sleep -= small_sleep; - if (total_sleep > sleep_value()) { + if(total_sleep > sleep_value()) { break; } } - + auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -177,9 +184,9 @@ mpio_write::progress(int ongoing_index) { ++index; } - + m_input_file->close(); // step 2. write buffer data in parallel to the PFS - //LOGGER_INFO("START WRITING file {}", m_output_path); + const auto output_file = mpioxx::file::open(m_workers, m_output_path, mpioxx::file_open_mode::create | @@ -216,17 +223,47 @@ mpio_write::progress(int ongoing_index) { return -1; } - // step 3. parallel write data from buffers - if(const auto ec = - MPI_File_write_all(output_file, m_buffer.data(), - static_cast(m_bytes_per_rank), - MPI_BYTE, MPI_STATUS_IGNORE); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_write_all() failed: {}", - mpi::error_string(ec)); - m_status = make_mpi_error(ec); - return -1; + // step 3. parallel write data from buffers in chunks if necessary + const std::size_t max_bytes_per_call = + INT_MAX; // Maximum bytes for one MPI call + std::size_t bytes_written_total = 0; + const char* current_buffer_ptr = m_buffer.data(); + + while(bytes_written_total < m_bytes_per_rank) { + std::size_t bytes_to_write_this_call = + m_bytes_per_rank - bytes_written_total; + if(bytes_to_write_this_call > max_bytes_per_call) { + bytes_to_write_this_call = max_bytes_per_call; + } + + // Ensure the cast is now safe + int count_this_call = static_cast(bytes_to_write_this_call); + + LOGGER_DEBUG("MPI_File_write_all chunk: Writing {} bytes", + count_this_call); + + if(const auto ec = MPI_File_write_all( + output_file, current_buffer_ptr, count_this_call, + MPI_BYTE, MPI_STATUS_IGNORE); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_File_write_all() chunk failed: {}", + mpi::error_string(ec)); + // Consider freeing MPI types before returning/throwing + MPI_Type_free(&block_type); + MPI_Type_free(&file_type); + m_status = make_mpi_error(ec); + return -1; // Or throw an exception + } + + bytes_written_total += bytes_to_write_this_call; + current_buffer_ptr += bytes_to_write_this_call; } + + // Don't forget to free the MPI types after they are no longer needed + MPI_Type_free(&block_type); + MPI_Type_free(&file_type); + + } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_mpi_error(e.error_code()); @@ -245,7 +282,7 @@ mpio_write::progress(int ongoing_index) { return -1; } - //LOGGER_INFO("END WRITING file {}", m_output_path); + // LOGGER_INFO("END WRITING file {}", m_output_path); m_status = error_code::success; return -1; diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index 766375978a987778c3917e26fbf1db9fc64bee2a..38def7aedbb7a94754a08eaf76b27e8f6a7f93b7 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -73,7 +73,6 @@ seq_mixed_operation::operator()() { m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, block_size); } - m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); @@ -148,7 +147,7 @@ seq_mixed_operation::progress(int ongoing_index) { /* Do write */ m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), - file_range.size()); + n); m_bytes_per_rank += n; @@ -185,6 +184,7 @@ seq_mixed_operation::progress(int ongoing_index) { m_status = error_code::success; + m_input_file->close(); m_output_file->close(); return -1; } diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 42e3b9af6e347003d4e21db1f69d64ffeacf1800..4ddeaf82156ca5f9637f7efd3159ae99baf528d6 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -185,7 +185,7 @@ seq_operation::progress(int ongoing_index) { // step3. POSIX write data // We need to create the directory if it does not exists (using // FSPlugin) - + m_input_file->close(); if(write and ongoing_index == 0) { m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); @@ -230,7 +230,7 @@ seq_operation::progress(int ongoing_index) { ++index; } - + m_output_file->close(); } catch(const posix_file::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_system_error(e.error_code());