diff --git a/README.md b/README.md index b19c751928edc61ff2957f7aa35bd2c6cd5e06c8..032e2f7c506c68ca98ae5607219d1bd566194ba1 100644 --- a/README.md +++ b/README.md @@ -211,4 +211,10 @@ cargo_ftio --server tcp://127.0.0.1:62000 -c -1 -p -1 -t 25 If Cargo finds the adhoc fs libraries (we support GekkoFS and dataclay, in this release), it will automatically use them. The CMake command will show which adhocfs are detected. -On the other hand, LD_preload techniques could be used. \ No newline at end of file +On the other hand, LD_preload techniques could be used. + +## REGEX file +Cargo can use a file with a regular expression that filters the files inside the `ftio` stage-out directory. +The file is specified in the `CARGO_REGEX` environment variable. + +As an example a file with the contents : `^/[a-zA-Z0-9]*turbPipe0\.f\d+` will filter all the files that have turbPipe in their name. diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index d11f02f23c0d7dd20352640e3def81826592fedf..338dc13d097b648a063e4606bb4358ae513da902 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -307,7 +307,7 @@ transfer::wait() const { auto s = status(); while(!s.done() && !s.failed()) { - s = wait_for(150ms); + s = wait_for(1000ms); } return s; diff --git a/src/env.hpp b/src/env.hpp index d204d2078442bc94cd33470c05969e23e8c62a45..78ccc3fe239148d97ee27d60469bb80ad6ba6336 100644 --- a/src/env.hpp +++ b/src/env.hpp @@ -32,6 +32,7 @@ namespace cargo::env { static constexpr auto LOG = ADD_PREFIX("LOG"); static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); +static constexpr auto REGEX = ADD_PREFIX("REGEX"); } // namespace cargo::env diff --git a/src/master.cpp b/src/master.cpp index 76610181b79f8d13b52904359af4805a228fcc19..06c1361a797a39bb33f28c6c95b1d94cab84dae6 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -23,8 +23,8 @@ *****************************************************************************/ #include -#include -#include +#include "logger/logger.hpp" +#include "net/server.hpp" #include #include @@ -37,42 +37,60 @@ #include "proto/rpc/response.hpp" #include "proto/mpi/message.hpp" #include "parallel_request.hpp" +#include +#include using namespace std::literals; namespace mpi = boost::mpi; namespace { +// Vector of input message - Optimization std::tuple make_message(std::uint64_t tid, std::uint32_t seqno, - const cargo::dataset& input, const cargo::dataset& output) { + const std::vector& input, + const std::vector& output) { - if(input.supports_parallel_transfer()) { + auto iparallel = input[0].supports_parallel_transfer(); + auto oparallel = output[0].supports_parallel_transfer(); + auto itype = input[0].get_type(); + auto otype = output[0].get_type(); + + // convert dataset to path vectors + std::vector v_input; + std::vector v_output; + + // convert input to v_input + for(auto i : input) { + v_input.push_back(i.path()); + } + for(auto o : output) { + v_output.push_back(o.path()); + } + if(iparallel) { return std::make_tuple( static_cast(cargo::tag::pread), - cargo::transfer_message{ - tid, seqno, input.path(), - static_cast(input.get_type()), output.path(), - static_cast(output.get_type())}); + cargo::transfer_message{tid, seqno, v_input, + static_cast(itype), v_output, + static_cast(otype)}); } - if(output.supports_parallel_transfer()) { + if(oparallel) { return std::make_tuple( static_cast(cargo::tag::pwrite), - cargo::transfer_message{ - tid, seqno, input.path(), - static_cast(input.get_type()), output.path(), - static_cast(output.get_type())}); + cargo::transfer_message{tid, seqno, v_input, + static_cast(itype), v_output, + static_cast(otype)}); } return std::make_tuple( static_cast(cargo::tag::seq_mixed), - cargo::transfer_message{tid, seqno, input.path(), - static_cast(input.get_type()), - output.path(), - static_cast(output.get_type())}); + cargo::transfer_message{tid, seqno, v_input, + static_cast(itype), v_output, + static_cast(otype)}); } + } // namespace using namespace std::literals; @@ -95,6 +113,16 @@ master_server::master_server(std::string name, std::string address, { + const char* REGEX_env = std::getenv(cargo::env::REGEX); + if(REGEX_env != nullptr) { + REGEX_file = REGEX_env; + LOGGER_INFO("{} env variable set to: {}", cargo::env::REGEX, + REGEX_file); + } else { + REGEX_file = ""; + LOGGER_INFO("{} env variable not set", cargo::env::REGEX); + } + #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); provider::define(EXPAND(shutdown)); @@ -134,7 +162,7 @@ master_server::mpi_listener_ult() { auto msg = world.iprobe(); if(!msg) { - std::this_thread::sleep_for(1000us); + std::this_thread::sleep_for(10ms); // thallium::thread::self().sleep(m_network_engine, 10); continue; } @@ -324,6 +352,28 @@ master_server::shutdown(const network::request& req) { void master_server::transfer_dataset_internal(pending_transfer& pt) { + // setup regex + // Read the regular expression pattern from the file + auto patternFile = REGEX_file; + bool filtering = false; + std::string patternStr; // default + if(REGEX_file != "") { + std::ifstream file(patternFile); + if(file.is_open()) { + std::getline(file, patternStr); + file.close(); + filtering = true; + } else { + LOGGER_ERROR("opening pattern file {}", patternFile); + // return; + } + LOGGER_INFO("Using pattern str '{}' for regex from file '{}'", + patternStr, patternFile); + } + std::regex pattern; + if(filtering) + pattern.assign(patternStr); + mpi::communicator world; std::vector v_s_new; std::vector v_d_new; @@ -367,6 +417,13 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // We need to get filename from the original root // path (d.path) plus the path from f, removing the // initial path p (taking care of the trailing /) + // LOGGER_INFO("GKFS file {} checking ...", s_new.path()); + if(filtering) { + if(!std::regex_match(s_new.path(), pattern)) { + LOGGER_INFO("GKFS file {} IGNORED", s_new.path()); + continue; + } + } auto leading = p.size(); if(leading > 0 and p.back() == '/') { leading--; @@ -382,6 +439,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { v_s_new.push_back(s_new); v_d_new.push_back(d_new); } + // break; } } else { fs->stat(s.path(), &buf); @@ -411,7 +469,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { // For all the transfers for(std::size_t i = 0; i < v_s_new.size(); ++i) { - const auto& s = v_s_new[i]; + // const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; // Create the directory if it does not exist (only in @@ -421,11 +479,12 @@ master_server::transfer_dataset_internal(pending_transfer& pt) { std::filesystem::create_directories( std::filesystem::path(d.path()).parent_path()); } + } - - // Send message to worker + // Send message to worker (seq number is 0) + if(v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { - const auto [t, m] = make_message(pt.m_p.tid(), i, s, d); + const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); } @@ -479,6 +538,11 @@ master_server::transfer_datasets(const network::request& req, if(buf.st_mode & S_IFDIR) { LOGGER_INFO("Expanding input directory {}", p); files = fs->readdir(p); + // As we need to create a new directory, we need to order the files + // so that directories are created in the correct order + + // Order the files alphabetically + std::sort(files.begin(), files.end()); /* @@ -536,8 +600,8 @@ master_server::transfer_datasets(const network::request& req, } } // For all the transfers - for(std::size_t i = 0; i < v_s_new.size(); ++i) { - const auto& s = v_s_new[i]; + for(std::size_t i = 0; i < v_d_new.size(); ++i) { + // const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; // Create the directory if it does not exist (only in @@ -548,24 +612,33 @@ master_server::transfer_datasets(const network::request& req, d.supports_parallel_transfer()) { std::filesystem::create_directories( std::filesystem::path(d.path()).parent_path()); + LOGGER_INFO("Created directory {}", d.path()); } + } - // If we are not using ftio start transfer if we are on - // stage-out - if(!m_ftio) { - // If we are on stage-out - + // If we are not using ftio start transfer if we are on + // stage-out + if(!m_ftio) { + // If we are on stage-out + // some sleep here may help ? too many messages to the + // workers? Changed to one message for all the files. seq is + // 0 + if(v_s_new.size() != 0) { for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { - const auto [t, m] = make_message(r.tid(), i, s, d); + const auto [t, m] = + make_message(r.tid(), 0, v_s_new, v_d_new); LOGGER_INFO("msg <= to: {} body: {}", rank, m); world.send(static_cast(rank), t, m); + // Wait 1 ms + // std::this_thread::sleep_for(20ms); } - } else { - m_ftio_tid = r.tid(); } + } else { + m_ftio_tid = r.tid(); } + LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, error_code::success, r.tid()); req.respond(response_with_id{rpc.id(), error_code::success, @@ -587,7 +660,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); + LOGGER_DEBUG("rpc {:>} body: {{tid: {}}}", rpc, tid); m_request_manager.lookup(tid) .or_else([&](auto&& ec) { diff --git a/src/master.hpp b/src/master.hpp index 612ae8aab232a75f46ce0ea9b5ac78a44f6b1ddf..2e221a08bb417895447d660b2487e3f1cd1c8577 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -29,12 +29,13 @@ #include "cargo.hpp" #include "request_manager.hpp" #include "parallel_request.hpp" +#include "env.hpp" namespace cargo { class pending_transfer { public: - pending_transfer():m_p(cargo::parallel_request(0,0,0)) { + pending_transfer() : m_p(cargo::parallel_request(0, 0, 0)) { m_work = false; } @@ -42,7 +43,8 @@ public: cargo::parallel_request m_p; std::vector m_sources; std::vector m_targets; - // Expanded sources and targets (those that are being processed by the worker) + // Expanded sources and targets (those that are being processed by the + // worker) std::vector m_expanded_sources; std::vector m_expanded_targets; }; @@ -118,6 +120,7 @@ private: transfer_dataset_internal(pending_transfer& pt); // Request manager request_manager m_request_manager; + std::string REGEX_file; }; } // namespace cargo diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 756653483d06d477906e5dfb8bcad98477f32188..8cb17299626874436bb0ec70426f3ce9aefa7f37 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -31,7 +31,7 @@ #include #include #include -#include +#include "logger/logger.hpp" // very simple RAII wrappers for some MPI types + utility functions diff --git a/src/net/server.cpp b/src/net/server.cpp index a49bc1a803f8eb0e9abd66e2e06b42ff3d47a5f1..bdd526b80d2b5407e4861d86820e717a2dc76b67 100644 --- a/src/net/server.cpp +++ b/src/net/server.cpp @@ -343,6 +343,7 @@ server::run() { LOGGER_INFO("LIBGKFS_HOSTS_FILE env variable set to: {}", gkfs_hosts); } + LOGGER_INFO(""); LOGGER_INFO("[[ Start up successful, awaiting requests... ]]"); diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 0eb65d9195a0c5913f3858566ec81950e7985f03..8b12b14f0fb627d93fe3f7b85ece0dc44d730f0b 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -69,7 +69,8 @@ public: file_handle& operator=(const file_handle& other) = delete; - explicit operator bool() const noexcept { + explicit + operator bool() const noexcept { return valid(); } @@ -161,7 +162,8 @@ public: explicit file(std::filesystem::path filepath, cargo::FSPlugin::type t) noexcept - : m_path(std::move(filepath)), m_fs_plugin(cargo::FSPlugin::make_fs(t)) {} + : m_path(std::move(filepath)), + m_fs_plugin(cargo::FSPlugin::make_fs(t)) {} file(std::filesystem::path filepath, int fd, std::shared_ptr fs_plugin) noexcept @@ -293,6 +295,14 @@ protected: std::shared_ptr m_fs_plugin; }; +static void +recursive_mkdir(const std::filesystem::path& path, + std::shared_ptr fs_plugin) { + if(path.has_parent_path() and path != "/") { + recursive_mkdir(path.parent_path(), fs_plugin); + fs_plugin->mkdir(path.parent_path().c_str(), 0755); + } +} static inline file open(const std::filesystem::path& filepath, int flags, ::mode_t mode, @@ -304,7 +314,7 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode, // We don't check if it exists, we just create it if flags is set to O_CREAT if(flags & O_CREAT) { - fs_plugin->mkdir(filepath.parent_path().c_str(), 0755); + recursive_mkdir(filepath, fs_plugin); } int fd = fs_plugin->open(filepath.c_str(), flags, mode); diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index aff3eacfe00782ff02638b3450bf79774afb72b1..9b070e839dd57e2c43bbb42a70c62fc5fe3a4c7e 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -75,26 +75,29 @@ gekko_plugin::readdir(const std::string& path) { std::vector files; std::vector final_list; files = gkfs::syscall::gkfs_get_file_list(path); - + for(auto& file : files) { - struct stat buf; - stat("/" + file, &buf); + struct stat buf; + std::string correct_path = file; + if(path.size() != 1) { + correct_path = path + "/" + file; + } else { + correct_path = "/" + file; + } + + stat(correct_path, &buf); + if(S_ISDIR(buf.st_mode)) { - std::vector subfiles = readdir("/" + file); + std::vector subfiles = readdir(correct_path); final_list.insert(final_list.end(), subfiles.begin(), subfiles.end()); } else { - - if(path.size() != 1) { - final_list.push_back(path + "/" + file); - } else { - final_list.push_back("/" + file); - } + final_list.push_back(correct_path); } } - + return final_list; } diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index b61705a3af9e7427e34995395117cec8d7e8a30d..4245073ea53cabb6e4266e645a8d2ceaa214638d 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -26,12 +26,14 @@ #define CARGO_PROTO_MPI_MESSAGE_HPP #include +#include #include #include #include #include #include "cargo.hpp" #include "boost_serialization_std_optional.hpp" +#include #include "posix_file/file.hpp" namespace cargo { @@ -54,8 +56,8 @@ public: transfer_message() = default; transfer_message(std::uint64_t tid, std::uint32_t seqno, - std::string input_path, std::uint32_t i_type, - std::string output_path, std::uint32_t o_type) + std::vector input_path, std::uint32_t i_type, + std::vector output_path, std::uint32_t o_type) : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)), m_i_type(i_type), m_output_path(std::move(output_path)), m_o_type(o_type) {} @@ -70,12 +72,12 @@ public: return m_seqno; } - [[nodiscard]] const std::string& + [[nodiscard]] const std::vector & input_path() const { return m_input_path; } - [[nodiscard]] const std::string& + [[nodiscard]] const std::vector & output_path() const { return m_output_path; } @@ -107,9 +109,9 @@ private: std::uint64_t m_tid{}; std::uint32_t m_seqno{}; - std::string m_input_path; + std::vector m_input_path; std::uint32_t m_i_type{}; - std::string m_output_path; + std::vector m_output_path; std::uint32_t m_o_type{}; }; @@ -244,6 +246,7 @@ struct fmt::formatter : formatter { } }; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. diff --git a/src/request_manager.cpp b/src/request_manager.cpp index 4d697dfc3341d50f08e24743c2e0a931f0cc753b..7f278461a5b33da4e58d5edee45ce5ac50435f4e 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -114,7 +114,7 @@ request_manager::lookup(std::uint64_t tid) { } } // TODO : completed should have the name of the file if its not found - return request_status{"", transfer_state::completed, 0.0f}; + return request_status{"", transfer_state::completed, 0.0f, error_code::success}; } LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index ab401d018ebbb914bae07e09856c0f12fec6b207..20ac280344b4cee125fa42b858b7a9eb93faaa94 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -173,6 +173,7 @@ mpio_write::progress(int ongoing_index) { } // step 2. write buffer data in parallel to the PFS + //LOGGER_INFO("START WRITING file {}", m_output_path); const auto output_file = mpioxx::file::open(m_workers, m_output_path, mpioxx::file_open_mode::create | @@ -238,6 +239,7 @@ mpio_write::progress(int ongoing_index) { return -1; } + //LOGGER_INFO("END WRITING file {}", m_output_path); m_status = error_code::success; return -1; diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 9a3cdf4195b692329f5f1b6b38cf15a5e1769e68..dd6aeed28fa1bd13c2f33f9fd6410f53897c0b59 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -113,7 +113,7 @@ operation::set_comm(int rank, std::uint64_t tid, std::uint32_t seqno, cargo::error_code operation::progress() const { - return error_code::other; + return error_code::success; } } // namespace cargo diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index e0ec8bc303279a5be00cf680f61041e3c847e4e4..a7e728ae335c2f1d3b2abc7722efe02458637cbc 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -39,6 +39,7 @@ seq_mixed_operation::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; + m_input_file = std::make_unique( posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); std::size_t file_size = m_input_file->size(); @@ -68,7 +69,6 @@ seq_mixed_operation::operator()() { block_size); } - m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 8a6252732513a5badc9c144716c35838dbb0e68f..0967af5574282cceff2924d51bc628300922f51a 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -24,10 +24,9 @@ #include #include -#include +#include "logger/logger.hpp" #include #include - #include "worker.hpp" #include "fmt_formatters.hpp" @@ -108,9 +107,11 @@ worker::run() { bool done = false; while(!done) { // Always loop pending operations + // TODO: This seems that it is not a good idea, we have a lot of () ongoing auto I = m_ops.begin(); auto IE = m_ops.end(); + //LOGGER_INFO ("[Status] Pending: {}", m_ops.size()); if(I != IE) { auto op = I->second.first.get(); int index = I->second.second; @@ -118,9 +119,10 @@ worker::run() { if(index == -1) { // operation not started // Print error message - update_state(op->source(), op->tid(), op->seqno(), - op->output_path(), transfer_state::running, - -1.0f); + /* We avoid this update, that may not come into order...*/ + //update_state(op->source(), op->tid(), op->seqno(), + // op->output_path(), transfer_state::running, + // -1.0f); cargo::error_code ec = (*op)(); if(ec != cargo::error_code::transfer_in_progress) { update_state(op->source(), op->tid(), op->seqno(), @@ -135,9 +137,9 @@ worker::run() { // Operation in progress index = op->progress(index); if(index == -1) { - // operation finished + // operation finishe cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), + update_state(op->source(), op->tid(), op->seqno(), op->output_path(), ec ? transfer_state::failed : transfer_state::completed, @@ -153,7 +155,8 @@ worker::run() { op->bw()); } I->second.second = index; - ++I; + // If we have ++I we go trhu another file + //++I; } } } @@ -180,25 +183,31 @@ worker::run() { transfer_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + // Iterate over all the vector (input and output) and create a new op per file + for (std::size_t i = 0; i < m.input_path().size(); i++) { + std::string input_path = m.input_path()[i]; + std::string output_path = m.output_path()[i]; + update_state(msg->source(), m.tid(), i, + output_path, transfer_state::pending, -1.0f); + m_ops.emplace(std::make_pair( - make_pair(m.input_path(), m.output_path()), + make_pair(input_path, output_path), make_pair(operation::make_operation( - t, workers, m.input_path(), - m.output_path(), m_block_size, + t, workers, input_path, + output_path, m_block_size, m.i_type(), m.o_type()), -1))); - + // TODO : Issue 1, seqno is not different from each file -(we use i) const auto op = - m_ops[make_pair(m.input_path(), m.output_path())] + m_ops[make_pair(input_path, output_path)] .first.get(); - op->set_comm(msg->source(), m.tid(), m.seqno(), t); + op->set_comm(msg->source(), m.tid(), i, t); - update_state(op->source(), op->tid(), op->seqno(), - op->output_path(), transfer_state::pending, -1.0f); - break; + + } + break; } - case tag::bw_shaping: { shaper_message m; world.recv(msg->source(), msg->tag(), m);