Commit d582b132 authored by Ramon Nou's avatar Ramon Nou
Browse files

Added CARGO_REGEX env variable, to specify regex file

parent 925ce74f
Loading
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -212,3 +212,9 @@ If Cargo finds the adhoc fs libraries (we support GekkoFS and dataclay, in this
The CMake command will show which adhocfs are detected.

On the other hand, LD_preload techniques could be used.

## REGEX file 
Cargo can use a file with a regular expression that filters the files inside the `ftio` stage-out directory.
The file is specified in the `CARGO_REGEX` environment variable.

As an example a file with the contents : `^/[a-zA-Z0-9]*turbPipe0\.f\d+` will filter all the files that have turbPipe in their name.
+1 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ namespace cargo::env {

static constexpr auto LOG = ADD_PREFIX("LOG");
static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT");
static constexpr auto REGEX = ADD_PREFIX("REGEX");

} // namespace cargo::env

+71 −57
Original line number Diff line number Diff line
@@ -23,8 +23,8 @@
 *****************************************************************************/

#include <functional>
#include <logger/logger.hpp>
#include <net/server.hpp>
#include "logger/logger.hpp"
#include "net/server.hpp"

#include <cargo.hpp>
#include <fmt_formatters.hpp>
@@ -48,7 +48,8 @@ namespace {
// Vector of input message - Optimization
std::tuple<int, cargo::transfer_message>
make_message(std::uint64_t tid, std::uint32_t seqno,
             const std::vector<cargo::dataset>& input, const std::vector<cargo::dataset>& output) {
             const std::vector<cargo::dataset>& input,
             const std::vector<cargo::dataset>& output) {

    auto iparallel = input[0].supports_parallel_transfer();
    auto oparallel = output[0].supports_parallel_transfer();
@@ -69,8 +70,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
    if(iparallel) {
        return std::make_tuple(
                static_cast<int>(cargo::tag::pread),
                cargo::transfer_message{
                        tid, seqno, v_input,
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
    }
@@ -78,8 +78,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
    if(oparallel) {
        return std::make_tuple(
                static_cast<int>(cargo::tag::pwrite),
                cargo::transfer_message{
                        tid, seqno, v_input,
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
    }
@@ -87,8 +86,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
    return std::make_tuple(
            static_cast<int>(cargo::tag::seq_mixed),
            cargo::transfer_message{tid, seqno, v_input,
                                    static_cast<uint32_t>(itype),
                                    v_output,
                                    static_cast<uint32_t>(itype), v_output,
                                    static_cast<uint32_t>(otype)});
}

@@ -115,6 +113,16 @@ master_server::master_server(std::string name, std::string address,

{

    const char* REGEX_env = std::getenv(cargo::env::REGEX);
    if(REGEX_env != nullptr) {
        REGEX_file = REGEX_env;
        LOGGER_INFO("{} env variable set to: {}", cargo::env::REGEX,
                    REGEX_file);
    } else {
        REGEX_file = "";
        LOGGER_INFO("{} env variable not set", cargo::env::REGEX);
    }

#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
    provider::define(EXPAND(shutdown));
@@ -346,10 +354,11 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {

    // setup regex
    // Read the regular expression pattern from the file
    auto patternFile = "/lustre/project/nhr-admire/shared/nek_regex4cargo.txt";
    std::ifstream file(patternFile);
    std::string patternStr; // default
    auto patternFile = REGEX_file;
    bool filtering = false;
    std::string patternStr; // default
    if(REGEX_file != "") {
        std::ifstream file(patternFile);
        if(file.is_open()) {
            std::getline(file, patternStr);
            file.close();
@@ -358,9 +367,12 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
            LOGGER_ERROR("opening pattern file {}", patternFile);
            // return;
        }
    LOGGER_INFO("Using pattern str '{}' for regex from file '{}'", patternStr, patternFile);
        LOGGER_INFO("Using pattern str '{}' for regex from file '{}'",
                    patternStr, patternFile);
    }
    std::regex pattern;
    if (filtering) pattern.assign(patternStr);
    if(filtering)
        pattern.assign(patternStr);

    mpi::communicator world;
    std::vector<cargo::dataset> v_s_new;
@@ -609,12 +621,14 @@ master_server::transfer_datasets(const network::request& req,
                if(!m_ftio) {
                    // If we are on stage-out

// some sleep here may help ? too many messages to the workers?
// Changed to one message for all the files. seq is 0
                    // some sleep here may help ? too many messages to the
                    // workers? Changed to one message for all the files. seq is
                    // 0
                    if(v_s_new.size() != 0) {
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] = make_message(r.tid(), 0, v_s_new, v_d_new);
                            const auto [t, m] =
                                    make_message(r.tid(), 0, v_s_new, v_d_new);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                            // Wait 1 ms
+5 −2
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include "cargo.hpp"
#include "request_manager.hpp"
#include "parallel_request.hpp"
#include "env.hpp"

namespace cargo {

@@ -42,7 +43,8 @@ public:
    cargo::parallel_request m_p;
    std::vector<cargo::dataset> m_sources;
    std::vector<cargo::dataset> m_targets;
    // Expanded sources and targets (those that are being processed by the worker)
    // Expanded sources and targets (those that are being processed by the
    // worker)
    std::vector<cargo::dataset> m_expanded_sources;
    std::vector<cargo::dataset> m_expanded_targets;
};
@@ -118,6 +120,7 @@ private:
    transfer_dataset_internal(pending_transfer& pt);
    // Request manager
    request_manager m_request_manager;
    std::string REGEX_file;
};

} // namespace cargo
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@
#include <filesystem>
#include <fmt/format.h>
#include <fmt/chrono.h>
#include <logger/logger.hpp>
#include "logger/logger.hpp"

// very simple RAII wrappers for some MPI types + utility functions

Loading