From 8d3e36229a036c0cf2cbf89908a4e2ee38cc8e01 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 9 Sep 2024 13:54:18 +0200 Subject: [PATCH 1/7] Some basic changes --- src/master.cpp | 8 ++++++++ src/master.hpp | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/master.cpp b/src/master.cpp index 06c1361..c5bb61f 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -112,6 +112,7 @@ master_server::master_server(std::string name, std::string address, [this]() { ftio_scheduling_ult(); })) { + m_block_size = block_size; const char* REGEX_env = std::getenv(cargo::env::REGEX); if(REGEX_env != nullptr) { @@ -512,6 +513,8 @@ master_server::transfer_datasets(const network::request& req, std::vector v_s_new; std::vector v_d_new; + // We ask for the size of the input files. + std::vector v_size_new; for(auto i = 0u; i < sources.size(); ++i) { @@ -568,11 +571,16 @@ master_server::transfer_datasets(const network::request& req, LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), d_new.path()); + fs->stat(s_new.path(), &buf); + v_size_new.push_back(buf.st_size); v_s_new.push_back(s_new); v_d_new.push_back(d_new); } } else { + // We do not create any optimization for single files + fs->stat(s.path(), &buf); + v_size_new.push_back(buf.st_size); v_s_new.push_back(s); v_d_new.push_back(d); } diff --git a/src/master.hpp b/src/master.hpp index 2e221a0..6870854 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -111,7 +111,7 @@ private: std::uint64_t m_ftio_tid = 0; // FTIO enabled flag, we need to call ftio once. bool m_ftio = false; - + ssize_t m_block_size = 0; pending_transfer m_pending_transfer; -- GitLab From 83748e2955035936f3299759a0b81bf95c025166 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 12 Sep 2024 15:36:30 +0200 Subject: [PATCH 2/7] Small files optimization, size RPC reduction --- src/master.cpp | 23 +++++----- src/proto/mpi/message.hpp | 13 +++++- src/worker/mpio_read.cpp | 15 ++++--- src/worker/mpio_read.hpp | 5 ++- src/worker/mpio_write.cpp | 12 +++-- src/worker/mpio_write.hpp | 10 +++-- src/worker/ops.cpp | 10 ++--- src/worker/ops.hpp | 16 ++++++- src/worker/seq_mixed.cpp | 11 +++-- src/worker/seq_mixed.hpp | 13 ++++-- src/worker/sequential.cpp | 11 +++-- src/worker/sequential.hpp | 8 ++-- src/worker/worker.cpp | 93 ++++++++++++++++++++++++++------------- src/worker/worker.hpp | 2 +- 14 files changed, 164 insertions(+), 78 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index c5bb61f..7c01f7b 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -49,7 +49,8 @@ namespace { std::tuple make_message(std::uint64_t tid, std::uint32_t seqno, const std::vector& input, - const std::vector& output) { + const std::vector& output, + std::vector& v_size) { auto iparallel = input[0].supports_parallel_transfer(); auto oparallel = output[0].supports_parallel_transfer(); @@ -72,7 +73,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno, static_cast(cargo::tag::pread), cargo::transfer_message{tid, seqno, v_input, static_cast(itype), v_output, - static_cast(otype)}); + static_cast(otype), v_size}); } if(oparallel) { @@ -80,14 +81,14 @@ make_message(std::uint64_t tid, std::uint32_t seqno, static_cast(cargo::tag::pwrite), cargo::transfer_message{tid, seqno, v_input, static_cast(itype), v_output, - static_cast(otype)}); + static_cast(otype), v_size}); } return std::make_tuple( static_cast(cargo::tag::seq_mixed), cargo::transfer_message{tid, seqno, v_input, static_cast(itype), v_output, - static_cast(otype)}); + static_cast(otype), v_size}); } @@ -378,6 +379,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { mpi::communicator world; std::vector v_s_new; std::vector v_d_new; + std::vector v_size_new; time_t now = time(0); now = now - 5; // Threshold for mtime for(auto i = 0u; i < pt.m_sources.size(); ++i) { @@ -447,6 +449,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { if(buf.st_mtime < now) { v_s_new.push_back(s); v_d_new.push_back(d); + v_size_new.push_back(buf.st_size); } } } @@ -485,7 +488,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // Send message to worker (seq number is 0) if(v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { - const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new); + const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); } @@ -514,7 +517,7 @@ master_server::transfer_datasets(const network::request& req, std::vector v_s_new; std::vector v_d_new; // We ask for the size of the input files. - std::vector v_size_new; + std::vector v_size_new; for(auto i = 0u; i < sources.size(); ++i) { @@ -628,19 +631,13 @@ master_server::transfer_datasets(const network::request& req, // stage-out if(!m_ftio) { // If we are on stage-out - - // some sleep here may help ? too many messages to the - // workers? Changed to one message for all the files. seq is - // 0 if(v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = - make_message(r.tid(), 0, v_s_new, v_d_new); + make_message(r.tid(), 0, v_s_new, v_d_new, v_size_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); - // Wait 1 ms - // std::this_thread::sleep_for(20ms); } } } else { diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index 4245073..37c63bd 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -57,10 +58,11 @@ public: transfer_message(std::uint64_t tid, std::uint32_t seqno, std::vector input_path, std::uint32_t i_type, - std::vector output_path, std::uint32_t o_type) + std::vector output_path, std::uint32_t o_type, + std::vector sizes) : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)), m_i_type(i_type), m_output_path(std::move(output_path)), - m_o_type(o_type) {} + m_o_type(o_type), m_sizes(std::move(sizes)) {} [[nodiscard]] std::uint64_t tid() const { @@ -93,6 +95,11 @@ public: return static_cast(m_i_type); } + [[nodiscard]] const std::vector & + sizes() const { + return m_sizes; + } + private: template void @@ -105,6 +112,7 @@ private: ar& m_output_path; ar& m_i_type; ar& m_o_type; + ar& m_sizes; } std::uint64_t m_tid{}; @@ -113,6 +121,7 @@ private: std::uint32_t m_i_type{}; std::vector m_output_path; std::uint32_t m_o_type{}; + std::vector m_sizes; }; class status_message { diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 1f5ab73..75bf359 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -34,10 +34,10 @@ mpio_read::mpio_read(mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_i_type, - FSPlugin::type fs_o_type) + FSPlugin::type fs_o_type, std::size_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), - m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type) {} + m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} cargo::error_code mpio_read::operator()() { @@ -51,7 +51,7 @@ mpio_read::operator()() { const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); - mpioxx::offset file_size = input_file.size(); + mpioxx::offset file_size = m_file_size; std::size_t block_size = m_kb_size * 1024u; // create block type @@ -67,8 +67,13 @@ mpio_read::operator()() { ++total_blocks; } - const auto workers_size = m_workers.size(); - const auto workers_rank = m_workers.rank(); + auto workers_size = m_workers.size(); + auto workers_rank = m_workers.rank(); + + if (m_single) { + workers_size = 1; + workers_rank = 0; + } // create file type MPI_Datatype file_type; diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 10984c6..b52d6cd 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -39,7 +39,7 @@ class mpio_read : public operation { public: mpio_read(mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, - FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type); + FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type, std::size_t size, bool single); cargo::error_code operator()() final; @@ -65,6 +65,7 @@ private: cargo::error_code m_status; std::filesystem::path m_input_path{}; std::filesystem::path m_output_path{}; + std::unique_ptr m_output_file; int m_workers_size; int m_workers_rank; @@ -74,6 +75,8 @@ private: std::uint64_t m_kb_size; FSPlugin::type m_fs_i_type; FSPlugin::type m_fs_o_type; + std::size_t m_file_size; + bool m_single; }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 20ac280..7e2c051 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -38,14 +38,20 @@ mpio_write::operator()() { m_status = error_code::transfer_in_progress; try { - const auto workers_size = m_workers.size(); - const auto workers_rank = m_workers.rank(); + auto workers_size = m_workers.size(); + auto workers_rank = m_workers.rank(); + + if (m_single) { + workers_size = 1; + workers_rank = 0; + } + std::size_t block_size = m_kb_size * 1024u; // We need to open the file and ask size (using fs_plugin) m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - std::size_t file_size = m_input_file->size(); + std::size_t file_size = m_file_size; // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 0edf735..514afdd 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -39,11 +39,11 @@ class mpio_write : public operation { public: mpio_write(mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, - FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type, std::size_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), - m_fs_o_type(fs_o_type) {} + m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} cargo::error_code operator()() final; @@ -70,12 +70,12 @@ private: cargo::error_code m_status; std::filesystem::path m_input_path{}; std::filesystem::path m_output_path{}; - + std::unique_ptr m_input_file; int m_workers_size; int m_workers_rank; std::size_t m_block_size; - std::size_t m_file_size; + int m_total_blocks; memory_buffer m_buffer; @@ -84,6 +84,8 @@ private: std::uint64_t m_kb_size; FSPlugin::type m_fs_i_type; FSPlugin::type m_fs_o_type; + std::size_t m_file_size; + bool m_single; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index dd6aeed..124ab96 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -37,26 +37,26 @@ operation::make_operation(cargo::tag t, mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_i_type, - FSPlugin::type fs_o_type) { + FSPlugin::type fs_o_type, std::size_t size, bool single) { using cargo::tag; switch(t) { case tag::pread: return std::make_unique( std::move(workers), std::move(input_path), - std::move(output_path), block_size, fs_i_type, fs_o_type); + std::move(output_path), block_size, fs_i_type, fs_o_type, size, single); case tag::pwrite: return std::make_unique( std::move(workers), std::move(input_path), - std::move(output_path), block_size, fs_i_type, fs_o_type); + std::move(output_path), block_size, fs_i_type, fs_o_type, size, single); case tag::sequential: return std::make_unique( std::move(workers), std::move(input_path), - std::move(output_path), block_size, fs_i_type, fs_o_type); + std::move(output_path), block_size, fs_i_type, fs_o_type, size, single); case tag::seq_mixed: return std::make_unique( std::move(workers), std::move(input_path), - std::move(output_path), block_size, fs_i_type, fs_o_type); + std::move(output_path), block_size, fs_i_type, fs_o_type, size, single); default: return {}; } diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 2acd884..79e0e35 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -39,11 +39,25 @@ namespace cargo { class operation { public: +/** + * @brief + * + * @param t + * @param workers + * @param input_path + * @param output_path + * @param block_size + * @param fs_i_type + * @param fs_o_type + * @param size size of the file gathered in the master to reduce operations + * @param single The file is only processed in this rank + * @return std::unique_ptr + */ static std::unique_ptr make_operation(cargo::tag t, boost::mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, - FSPlugin::type fs_i_type, FSPlugin::type fs_o_type); + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type, std::size_t size, bool single); virtual ~operation() = default; diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index a7e728a..4a30793 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -36,13 +36,18 @@ seq_mixed_operation::operator()() { m_status = error_code::transfer_in_progress; try { - const auto workers_size = m_workers.size(); - const auto workers_rank = m_workers.rank(); + auto workers_size = m_workers.size(); + auto workers_rank = m_workers.rank(); + + if (m_single) { + workers_size = 1; + workers_rank = 0; + } std::size_t block_size = m_kb_size * 1024u; m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - std::size_t file_size = m_input_file->size(); + std::size_t file_size = m_file_size; // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); diff --git a/src/worker/seq_mixed.hpp b/src/worker/seq_mixed.hpp index 45b3ba6..7d6af0f 100644 --- a/src/worker/seq_mixed.hpp +++ b/src/worker/seq_mixed.hpp @@ -42,11 +42,11 @@ public: std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_i_type, - FSPlugin::type fs_o_type) + FSPlugin::type fs_o_type, ssize_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), - m_fs_o_type(fs_o_type) {} + m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} cargo::error_code operator()() final; @@ -66,16 +66,21 @@ public: return m_input_path; } + ssize_t + size() const { + return m_file_size; + } + private: mpi::communicator m_workers; std::filesystem::path m_input_path{}; std::filesystem::path m_output_path{}; std::unique_ptr m_input_file; std::unique_ptr m_output_file; + int m_workers_size; int m_workers_rank; std::size_t m_block_size; - std::size_t m_file_size; int m_total_blocks; memory_buffer m_buffer; @@ -84,7 +89,9 @@ private: std::uint64_t m_kb_size; FSPlugin::type m_fs_i_type; FSPlugin::type m_fs_o_type; + std::size_t m_file_size; cargo::error_code m_status; + bool m_single; bool write{}; }; diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index eebfaea..42e3b9a 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -36,12 +36,17 @@ seq_operation::operator()() { m_status = error_code::transfer_in_progress; try { - const auto workers_size = m_workers.size(); - const auto workers_rank = m_workers.rank(); + auto workers_size = m_workers.size(); + auto workers_rank = m_workers.rank(); + + if (m_single) { + workers_size = 1; + workers_rank = 0; + } std::size_t block_size = m_kb_size * 1024u; m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); - std::size_t file_size = m_input_file->size(); + std::size_t file_size = m_file_size; // compute the number of blocks in the file diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 8073ceb..1273cf6 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -40,11 +40,11 @@ class seq_operation : public operation { public: seq_operation(mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, std::uint64_t block_size, - FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type, std::size_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), - m_fs_o_type(fs_o_type) {} + m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} cargo::error_code operator()() final; @@ -73,7 +73,7 @@ private: int m_workers_size; int m_workers_rank; std::size_t m_block_size; - std::size_t m_file_size; + int m_total_blocks; memory_buffer m_buffer; @@ -82,7 +82,9 @@ private: std::uint64_t m_kb_size; FSPlugin::type m_fs_i_type; FSPlugin::type m_fs_o_type; + std::size_t m_file_size; cargo::error_code m_status; + bool m_single; bool write{}; }; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 0967af5..eeb8525 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -24,7 +24,7 @@ #include #include -#include "logger/logger.hpp" +#include "../logger/logger.hpp" #include #include #include "worker.hpp" @@ -107,11 +107,12 @@ worker::run() { bool done = false; while(!done) { // Always loop pending operations - // TODO: This seems that it is not a good idea, we have a lot of () ongoing + // TODO: This seems that it is not a good idea, we have a lot of () + // ongoing auto I = m_ops.begin(); auto IE = m_ops.end(); - //LOGGER_INFO ("[Status] Pending: {}", m_ops.size()); + // LOGGER_INFO ("[Status] Pending: {}", m_ops.size()); if(I != IE) { auto op = I->second.first.get(); int index = I->second.second; @@ -120,9 +121,9 @@ worker::run() { // operation not started // Print error message /* We avoid this update, that may not come into order...*/ - //update_state(op->source(), op->tid(), op->seqno(), - // op->output_path(), transfer_state::running, - // -1.0f); + // update_state(op->source(), op->tid(), op->seqno(), + // op->output_path(), transfer_state::running, + // -1.0f); cargo::error_code ec = (*op)(); if(ec != cargo::error_code::transfer_in_progress) { update_state(op->source(), op->tid(), op->seqno(), @@ -139,7 +140,7 @@ worker::run() { if(index == -1) { // operation finishe cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), ec ? transfer_state::failed : transfer_state::completed, @@ -183,30 +184,60 @@ worker::run() { transfer_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); - // Iterate over all the vector (input and output) and create a new op per file - for (std::size_t i = 0; i < m.input_path().size(); i++) { - std::string input_path = m.input_path()[i]; - std::string output_path = m.output_path()[i]; - update_state(msg->source(), m.tid(), i, - output_path, transfer_state::pending, -1.0f); - - m_ops.emplace(std::make_pair( - make_pair(input_path, output_path), - make_pair(operation::make_operation( - t, workers, input_path, - output_path, m_block_size, - m.i_type(), m.o_type()), - -1))); - // TODO : Issue 1, seqno is not different from each file -(we use i) - const auto op = - m_ops[make_pair(input_path, output_path)] - .first.get(); - - op->set_comm(msg->source(), m.tid(), i, t); - - - } - break; + // Iterate over all the vector (input and output) and create a + // new op per file + for(std::size_t i = 0; i < m.input_path().size(); i++) { + std::string input_path = m.input_path()[i]; + std::string output_path = m.output_path()[i]; + std::size_t size = m.sizes()[i]; + + if(size <= m_block_size * 1024) { + // Optimize and process only to one worker + // the one that is processing is i%worker == 0 + if(i % (world.rank() - 1) == 0) { + update_state(msg->source(), m.tid(), i, output_path, + transfer_state::pending, -1.0f); + + m_ops.emplace(std::make_pair( + make_pair(input_path, output_path), + make_pair(operation::make_operation( + t, workers, input_path, + output_path, m_block_size, + m.i_type(), m.o_type(), size, true), + -1))); + // TODO : Issue 1, seqno is not different from each file + // -(we use i) + const auto op = + m_ops[make_pair(input_path, output_path)] + .first.get(); + + op->set_comm(msg->source(), m.tid(), i, t); + + } else { + update_state(msg->source(), m.tid(), i, output_path, + transfer_state::completed, -1.0f); + } + } else { + update_state(msg->source(), m.tid(), i, output_path, + transfer_state::pending, -1.0f); + + m_ops.emplace(std::make_pair( + make_pair(input_path, output_path), + make_pair(operation::make_operation( + t, workers, input_path, + output_path, m_block_size, + m.i_type(), m.o_type(), size, false), + -1))); + // TODO : Issue 1, seqno is not different from each file + // -(we use i) + const auto op = + m_ops[make_pair(input_path, output_path)] + .first.get(); + + op->set_comm(msg->source(), m.tid(), i, t); + } + } + break; } case tag::bw_shaping: { shaper_message m; diff --git a/src/worker/worker.hpp b/src/worker/worker.hpp index 84e877a..a4eded9 100644 --- a/src/worker/worker.hpp +++ b/src/worker/worker.hpp @@ -26,7 +26,7 @@ #ifndef CARGO_WORKER_HPP #define CARGO_WORKER_HPP -#include "proto/mpi/message.hpp" +#include "../proto/mpi/message.hpp" #include #include "ops.hpp" namespace cargo { -- GitLab From bc7121759ec4958445eee23157fde09a3f1f0a50 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 12 Sep 2024 16:30:20 +0200 Subject: [PATCH 3/7] Arithmetic bug in the worker --- lib/libcargo.cpp | 6 +++--- src/worker/worker.cpp | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 338dc13..67555e8 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -22,9 +22,9 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include -#include -#include +#include "cargo.hpp" +#include "fmt_formatters.hpp" +#include "net/serialization.hpp" #include #include #include diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index eeb8525..6cc5adb 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -194,7 +194,7 @@ worker::run() { if(size <= m_block_size * 1024) { // Optimize and process only to one worker // the one that is processing is i%worker == 0 - if(i % (world.rank() - 1) == 0) { + if(((i%workers.size()) == (unsigned int)workers.rank())) { update_state(msg->source(), m.tid(), i, output_path, transfer_state::pending, -1.0f); -- GitLab From 96df152d5c8607c83f4f03e06398b76e490ab128 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 13 Sep 2024 13:38:47 +0200 Subject: [PATCH 4/7] MPI (tests) fails yet --- src/cargo.cpp | 26 +++++++-- src/master.cpp | 34 +++++------ src/master.hpp | 2 +- src/worker/mpio_read.cpp | 1 + tests/common.cpp | 3 +- tests/common.hpp | 8 +++ tests/tests.cpp | 122 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 168 insertions(+), 28 deletions(-) diff --git a/src/cargo.cpp b/src/cargo.cpp index 0560aaa..1a1158a 100644 --- a/src/cargo.cpp +++ b/src/cargo.cpp @@ -50,6 +50,7 @@ struct cargo_config { std::optional output_file; std::string address; std::uint64_t blocksize; + std::string REGEX_file; }; cargo_config @@ -76,9 +77,10 @@ parse_command_line(int argc, char* argv[]) { "Check `fi_info` to see the list of available protocols.\n") ->option_text("ADDRESS") ->required(); - - app.add_option("-b,--blocksize", cfg.blocksize, - "Number of bytes to send in each message (in kb). Defaults to 512(kb).\n") + + app.add_option( + "-b,--blocksize", cfg.blocksize, + "Number of bytes to send in each message (in kb). Defaults to 512(kb).\n") ->option_text("BLOCKSIZE") ->default_val(512); @@ -92,6 +94,12 @@ parse_command_line(int argc, char* argv[]) { try { app.parse(argc, argv); + const char* REGEX_env = std::getenv(cargo::env::REGEX); + if(REGEX_env != nullptr) { + cfg.REGEX_file = REGEX_env; + } else { + cfg.REGEX_file = ""; + } return cfg; } catch(const CLI::ParseError& ex) { std::exit(app.exit(ex)); @@ -117,14 +125,20 @@ main(int argc, char* argv[]) { try { if(const auto rank = world.rank(); rank == 0) { - cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize, - fs::current_path(), cfg.blocksize}; + cargo::master_server srv{cfg.progname, cfg.address, + cfg.daemonize, fs::current_path(), + cfg.blocksize, cfg.REGEX_file}; if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, get_process_output_file(*cfg.output_file)); } - + if(cfg.REGEX_file != "") { + fmt::print("{} set to file:{} \n", cargo::env::REGEX, + cfg.REGEX_file); + } else { + fmt::print("{} not set \n", cargo::env::REGEX); + } return srv.run(); } else { diff --git a/src/master.cpp b/src/master.cpp index 7c01f7b..a91562d 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -100,7 +100,7 @@ namespace cargo { master_server::master_server(std::string name, std::string address, bool daemonize, std::filesystem::path rundir, - std::uint64_t block_size, + std::uint64_t block_size, std::string regex_file, std::optional pidfile) : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size), std::move(pidfile)), @@ -114,16 +114,8 @@ master_server::master_server(std::string name, std::string address, { m_block_size = block_size; + REGEX_file = regex_file; - const char* REGEX_env = std::getenv(cargo::env::REGEX); - if(REGEX_env != nullptr) { - REGEX_file = REGEX_env; - LOGGER_INFO("{} env variable set to: {}", cargo::env::REGEX, - REGEX_file); - } else { - REGEX_file = ""; - LOGGER_INFO("{} env variable not set", cargo::env::REGEX); - } #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); @@ -159,6 +151,7 @@ void master_server::mpi_listener_ult() { mpi::communicator world; + while(!m_shutting_down) { auto msg = world.iprobe(); @@ -488,7 +481,8 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // Send message to worker (seq number is 0) if(v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { - const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new); + const auto [t, m] = + make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); } @@ -540,8 +534,9 @@ master_server::transfer_datasets(const network::request& req, auto fs = FSPlugin::make_fs( static_cast(s.get_type())); struct stat buf; - fs->stat(p, &buf); - if(buf.st_mode & S_IFDIR) { + auto rstat = fs->stat(p, &buf); + + if(rstat == 0 and (buf.st_mode & S_IFDIR)) { LOGGER_INFO("Expanding input directory {}", p); files = fs->readdir(p); // As we need to create a new directory, we need to order the files @@ -574,7 +569,8 @@ master_server::transfer_datasets(const network::request& req, LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), d_new.path()); - fs->stat(s_new.path(), &buf); + rstat = fs->stat(s_new.path(), &buf); + if (rstat == 0) v_size_new.push_back(buf.st_size); v_s_new.push_back(s_new); v_d_new.push_back(d_new); @@ -617,13 +613,11 @@ master_server::transfer_datasets(const network::request& req, // Create the directory if it does not exist (only in // parallel transfer) - if(!std::filesystem::path(d.path()) - .parent_path() - .empty() and + if(!std::filesystem::path(d.path()).parent_path().empty() and d.supports_parallel_transfer()) { std::filesystem::create_directories( std::filesystem::path(d.path()).parent_path()); - LOGGER_INFO("Created directory {}", d.path()); + LOGGER_INFO("Created directory {}", std::filesystem::path(d.path()).parent_path()); } } @@ -634,8 +628,8 @@ master_server::transfer_datasets(const network::request& req, if(v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { - const auto [t, m] = - make_message(r.tid(), 0, v_s_new, v_d_new, v_size_new); + const auto [t, m] = make_message( + r.tid(), 0, v_s_new, v_d_new, v_size_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); } diff --git a/src/master.hpp b/src/master.hpp index 6870854..6943121 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -53,7 +53,7 @@ class master_server : public network::server, public network::provider { public: master_server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir, std::uint64_t block_size, + std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file, std::optional pidfile = {}); ~master_server(); diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 75bf359..1504982 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -48,6 +48,7 @@ mpio_read::operator()() { m_status = error_code::transfer_in_progress; try { + std::cout << "Trying to open file " << m_input_path << std::endl; const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); diff --git a/tests/common.cpp b/tests/common.cpp index a6f0036..4338dbc 100644 --- a/tests/common.cpp +++ b/tests/common.cpp @@ -1,5 +1,6 @@ #include #include "common.hpp" +#include std::vector prepare_datasets(cargo::dataset::type type, const std::string& pattern, @@ -7,7 +8,7 @@ prepare_datasets(cargo::dataset::type type, const std::string& pattern, std::vector datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { - datasets.emplace_back(fmt::format(fmt::runtime(pattern), i), type); + datasets.emplace_back(std::filesystem::current_path().string()+"/"+fmt::format(fmt::runtime(pattern), i), type); } return datasets; diff --git a/tests/common.hpp b/tests/common.hpp index 3085ed6..d1090e9 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -87,6 +87,14 @@ public: } }; +/** + * @brief Prepares the dataset, prepend the current dir. We do not support relative directories in cargo. + * + * @param type + * @param pattern + * @param n + * @return std::vector + */ std::vector prepare_datasets(cargo::dataset::type type, const std::string& pattern, size_t n); diff --git a/tests/tests.cpp b/tests/tests.cpp index c8b514a..8d354da 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -328,6 +328,128 @@ SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { } } + +SCENARIO("POSIX reads", "[flex_stager][posix_reads]") { + + random_data_generator rng{catch2_seed, 0, + std::numeric_limits::max() - 1u}; + + GIVEN("Some input datasets from a PFS") { + + REQUIRE(!server_address.empty()); + + cargo::server server{server_address}; + + const auto sources = + prepare_datasets(cargo::dataset::type::posix, + "pr-source-dataset-{}", NDATASETS); + const auto targets = prepare_datasets( + cargo::dataset::type::posix, "pr-target-dataset-{}", NDATASETS); + + static std::vector input_files; + input_files.reserve(sources.size()); + + for(const auto& d : sources) { + input_files.emplace_back( + create_temporary_file(d.path(), 1000, rng)); + } + + // ensure there are no dangling output files from another test run + std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { + std::filesystem::remove(dataset.path()); + }); + + + WHEN("Transferring datasets to a POSIX storage system") { + const auto tx = cargo::transfer_datasets(server, sources, targets); + + // wait for the transfer to complete + auto s = tx.wait(); + + CAPTURE(s.error()); + REQUIRE(s.state() == cargo::transfer_state::completed); + REQUIRE(s.error() == cargo::error_code::success); + + THEN("Output datasets are identical to input datasets") { + + std::vector output_files; + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(output_files), + [](auto&& target) { + return scoped_file{target.path()}; + }); + + for(std::size_t i = 0; i < input_files.size(); ++i) { + REQUIRE(std::filesystem::exists(output_files[i].path())); + REQUIRE(::equal(input_files[i].path(), + output_files[i].path())); + } + } + } + } +} + +SCENARIO("Posix writes", "[flex_stager][posix_writes]") { + + std::size_t file_size = 1000; // GENERATE(1000, 10000); + + [[maybe_unused]] ascii_data_generator ascii_gen{512}; + [[maybe_unused]] random_data_generator rng{ + catch2_seed, 0, std::numeric_limits::max() - 1u}; + + GIVEN("Some input datasets from a POSIX storage system") { + + REQUIRE(!server_address.empty()); + + cargo::server server{server_address}; + + const auto sources = prepare_datasets( + cargo::dataset::type::posix, "pw-source-dataset-{}", NDATASETS); + const auto targets = + prepare_datasets(cargo::dataset::type::parallel, + "pw-target-dataset-{}", NDATASETS); + + static std::vector input_files; + input_files.reserve(sources.size()); + + for(const auto& d : sources) { + input_files.emplace_back( + create_temporary_file(d.path(), file_size, rng)); + } + + // ensure there are no danling output files from another test run + std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { + std::filesystem::remove(dataset.path()); + }); + + WHEN("Transferring datasets to a PFS") { + const auto tx = cargo::transfer_datasets(server, sources, targets); + + // wait for the transfer to complete + auto s = tx.wait(); + + CAPTURE(s.error()); + REQUIRE(s.state() == cargo::transfer_state::completed); + REQUIRE(s.error() == cargo::error_code::success); + + THEN("Output datasets are identical to input datasets") { + + std::vector output_files; + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(output_files), + [](auto&& target) { + return scoped_file{target.path()}; + }); + + for(std::size_t i = 0; i < input_files.size(); ++i) { + REQUIRE(::equal(input_files[i].path(), targets[i].path())); + } + } + } + } +} + + int main(int argc, char* argv[]) { -- GitLab From 5d4647fcd3a1ccfd046e2d1679c6f11f54112b47 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 17 Sep 2024 14:18:09 +0200 Subject: [PATCH 5/7] Solved MPI bug, single rank for small files --- src/mpioxx.hpp | 3 +-- src/worker/mpio_read.cpp | 10 +++---- src/worker/worker.cpp | 56 ++++++++++++++++++++++++++-------------- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 8cb1729..0254968 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -134,8 +134,7 @@ public: const std::filesystem::path& filepath, file_open_mode mode) { MPI_File result; - - + if(const auto ec = MPI_File_open(comm, filepath.c_str(), static_cast(mode), MPI_INFO_NULL, &result); diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 1504982..ee9c572 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -37,7 +37,8 @@ mpio_read::mpio_read(mpi::communicator workers, FSPlugin::type fs_o_type, std::size_t size, bool single) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), - m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {} + m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), + m_single(single) {} cargo::error_code mpio_read::operator()() { @@ -48,13 +49,11 @@ mpio_read::operator()() { m_status = error_code::transfer_in_progress; try { - std::cout << "Trying to open file " << m_input_path << std::endl; const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); mpioxx::offset file_size = m_file_size; std::size_t block_size = m_kb_size * 1024u; - // create block type MPI_Datatype block_type; MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, @@ -71,11 +70,10 @@ mpio_read::operator()() { auto workers_size = m_workers.size(); auto workers_rank = m_workers.rank(); - if (m_single) { + if(m_single) { workers_size = 1; workers_rank = 0; } - // create file type MPI_Datatype file_type; /* @@ -144,7 +142,6 @@ mpio_read::operator()() { m_workers_rank = workers_rank; m_block_size = block_size; - } catch(const mpioxx::io_error& e) { LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); m_status = make_mpi_error(e.error_code()); @@ -175,7 +172,6 @@ mpio_read::progress(int ongoing_index) { try { int index = 0; // TODO : FS not defined... - m_status = error_code::transfer_in_progress; for(const auto& file_range : all_of(posix_file::file{m_input_path, m_fs_i_type}) | diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 6cc5adb..18809d4 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -194,28 +194,45 @@ worker::run() { if(size <= m_block_size * 1024) { // Optimize and process only to one worker // the one that is processing is i%worker == 0 - if(((i%workers.size()) == (unsigned int)workers.rank())) { + if(((i % workers.size()) == + (unsigned int) workers.rank())) { update_state(msg->source(), m.tid(), i, output_path, - transfer_state::pending, -1.0f); - - m_ops.emplace(std::make_pair( - make_pair(input_path, output_path), - make_pair(operation::make_operation( - t, workers, input_path, - output_path, m_block_size, - m.i_type(), m.o_type(), size, true), - -1))); - // TODO : Issue 1, seqno is not different from each file - // -(we use i) - const auto op = - m_ops[make_pair(input_path, output_path)] - .first.get(); - - op->set_comm(msg->source(), m.tid(), i, t); + transfer_state::pending, -1.0f); + std::vector ranks_to_exclude; + // Exclude all + for(auto id = 0; id < workers.size(); id++) { + if(id != workers.rank()) { + ranks_to_exclude.push_back(id); + } + } + + const auto tempworkers = ::make_communicator( + workers, + workers.group().exclude( + ranks_to_exclude.begin(), + ranks_to_exclude.end()), + 0); + + m_ops.emplace(std::make_pair( + make_pair(input_path, output_path), + make_pair(operation::make_operation( + t, tempworkers, + input_path, output_path, + m_block_size, m.i_type(), + m.o_type(), size, true), + -1))); + // TODO : Issue 1, seqno is not different from each + // file + // -(we use i) + const auto op = + m_ops[make_pair(input_path, output_path)] + .first.get(); + + op->set_comm(msg->source(), m.tid(), i, t); } else { update_state(msg->source(), m.tid(), i, output_path, - transfer_state::completed, -1.0f); + transfer_state::completed, -1.0f); } } else { update_state(msg->source(), m.tid(), i, output_path, @@ -226,7 +243,8 @@ worker::run() { make_pair(operation::make_operation( t, workers, input_path, output_path, m_block_size, - m.i_type(), m.o_type(), size, false), + m.i_type(), m.o_type(), size, + false), -1))); // TODO : Issue 1, seqno is not different from each file // -(we use i) -- GitLab From d342ef6b38985d96c9b113e854109460a5ab154c Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 17 Sep 2024 14:55:58 +0200 Subject: [PATCH 6/7] manage multiple tests --- tests/tests.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/tests.cpp b/tests/tests.cpp index 8d354da..42af873 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -342,9 +342,9 @@ SCENARIO("POSIX reads", "[flex_stager][posix_reads]") { const auto sources = prepare_datasets(cargo::dataset::type::posix, - "pr-source-dataset-{}", NDATASETS); + "pxr-source-dataset-{}", NDATASETS); const auto targets = prepare_datasets( - cargo::dataset::type::posix, "pr-target-dataset-{}", NDATASETS); + cargo::dataset::type::posix, "pxr-target-dataset-{}", NDATASETS); static std::vector input_files; input_files.reserve(sources.size()); @@ -404,10 +404,10 @@ SCENARIO("Posix writes", "[flex_stager][posix_writes]") { cargo::server server{server_address}; const auto sources = prepare_datasets( - cargo::dataset::type::posix, "pw-source-dataset-{}", NDATASETS); + cargo::dataset::type::posix, "pxw-source-dataset-{}", NDATASETS); const auto targets = prepare_datasets(cargo::dataset::type::parallel, - "pw-target-dataset-{}", NDATASETS); + "pxw-target-dataset-{}", NDATASETS); static std::vector input_files; input_files.reserve(sources.size()); -- GitLab From 2277bef1aac4b865be09d8bc81339eccfaf649f0 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 17 Sep 2024 15:06:36 +0200 Subject: [PATCH 7/7] Big tests, missing directories yet --- tests/tests.cpp | 119 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 119 insertions(+) diff --git a/tests/tests.cpp b/tests/tests.cpp index 42af873..529ccc5 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -449,6 +449,125 @@ SCENARIO("Posix writes", "[flex_stager][posix_writes]") { } } +SCENARIO("Big POSIX reads", "[flex_stager][Big_posix_reads]") { + + random_data_generator rng{catch2_seed, 0, + std::numeric_limits::max() - 1u}; + + GIVEN("Some input datasets from a PFS") { + + REQUIRE(!server_address.empty()); + + cargo::server server{server_address}; + + const auto sources = + prepare_datasets(cargo::dataset::type::posix, + "pxbr-source-dataset-{}", NDATASETS); + const auto targets = prepare_datasets( + cargo::dataset::type::posix, "pxbr-target-dataset-{}", NDATASETS); + + static std::vector input_files; + input_files.reserve(sources.size()); + + for(const auto& d : sources) { + input_files.emplace_back( + create_temporary_file(d.path(), 720*1024, rng)); + } + + // ensure there are no dangling output files from another test run + std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { + std::filesystem::remove(dataset.path()); + }); + + + WHEN("Transferring datasets to a POSIX storage system") { + const auto tx = cargo::transfer_datasets(server, sources, targets); + + // wait for the transfer to complete + auto s = tx.wait(); + + CAPTURE(s.error()); + REQUIRE(s.state() == cargo::transfer_state::completed); + REQUIRE(s.error() == cargo::error_code::success); + + THEN("Output datasets are identical to input datasets") { + + std::vector output_files; + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(output_files), + [](auto&& target) { + return scoped_file{target.path()}; + }); + + for(std::size_t i = 0; i < input_files.size(); ++i) { + REQUIRE(std::filesystem::exists(output_files[i].path())); + REQUIRE(::equal(input_files[i].path(), + output_files[i].path())); + } + } + } + } +} + +SCENARIO("Big Posix writes", "[flex_stager][big_posix_writes]") { + + std::size_t file_size = 712*1024; // GENERATE(1000, 10000); + + [[maybe_unused]] ascii_data_generator ascii_gen{512}; + [[maybe_unused]] random_data_generator rng{ + catch2_seed, 0, std::numeric_limits::max() - 1u}; + + GIVEN("Some input datasets from a POSIX storage system") { + + REQUIRE(!server_address.empty()); + + cargo::server server{server_address}; + + const auto sources = prepare_datasets( + cargo::dataset::type::posix, "pxbw-source-dataset-{}", NDATASETS); + const auto targets = + prepare_datasets(cargo::dataset::type::parallel, + "pxbw-target-dataset-{}", NDATASETS); + + static std::vector input_files; + input_files.reserve(sources.size()); + + for(const auto& d : sources) { + input_files.emplace_back( + create_temporary_file(d.path(), file_size, rng)); + } + + // ensure there are no danling output files from another test run + std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { + std::filesystem::remove(dataset.path()); + }); + + WHEN("Transferring datasets to a PFS") { + const auto tx = cargo::transfer_datasets(server, sources, targets); + + // wait for the transfer to complete + auto s = tx.wait(); + + CAPTURE(s.error()); + REQUIRE(s.state() == cargo::transfer_state::completed); + REQUIRE(s.error() == cargo::error_code::success); + + THEN("Output datasets are identical to input datasets") { + + std::vector output_files; + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(output_files), + [](auto&& target) { + return scoped_file{target.path()}; + }); + + for(std::size_t i = 0; i < input_files.size(); ++i) { + REQUIRE(::equal(input_files[i].path(), targets[i].path())); + } + } + } + } +} int main(int argc, char* argv[]) { -- GitLab