Commit ff90c01e authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'marc/nek5000' into 'main'

Draft: Added Nek changes, fixing things

There is a waiting/sleep issue, detected and corrected by Marc

See merge request !33
parents 1dbd5331 d582b132
Loading
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -212,3 +212,9 @@ If Cargo finds the adhoc fs libraries (we support GekkoFS and dataclay, in this
The CMake command will show which adhocfs are detected.

On the other hand, LD_preload techniques could be used.

## REGEX file 
Cargo can use a file with a regular expression that filters the files inside the `ftio` stage-out directory.
The file is specified in the `CARGO_REGEX` environment variable.

As an example a file with the contents : `^/[a-zA-Z0-9]*turbPipe0\.f\d+` will filter all the files that have turbPipe in their name.
+1 −1
Original line number Diff line number Diff line
@@ -307,7 +307,7 @@ transfer::wait() const {
    auto s = status();

    while(!s.done() && !s.failed()) {
        s = wait_for(150ms);
        s = wait_for(1000ms);
    }

    return s;
+1 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ namespace cargo::env {

static constexpr auto LOG = ADD_PREFIX("LOG");
static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT");
static constexpr auto REGEX = ADD_PREFIX("REGEX");

} // namespace cargo::env

+106 −33
Original line number Diff line number Diff line
@@ -23,8 +23,8 @@
 *****************************************************************************/

#include <functional>
#include <logger/logger.hpp>
#include <net/server.hpp>
#include "logger/logger.hpp"
#include "net/server.hpp"

#include <cargo.hpp>
#include <fmt_formatters.hpp>
@@ -37,42 +37,60 @@
#include "proto/rpc/response.hpp"
#include "proto/mpi/message.hpp"
#include "parallel_request.hpp"
#include <regex>
#include <fstream>

using namespace std::literals;
namespace mpi = boost::mpi;

namespace {

// Vector of input message - Optimization
std::tuple<int, cargo::transfer_message>
make_message(std::uint64_t tid, std::uint32_t seqno,
             const cargo::dataset& input, const cargo::dataset& output) {
             const std::vector<cargo::dataset>& input,
             const std::vector<cargo::dataset>& output) {

    if(input.supports_parallel_transfer()) {
    auto iparallel = input[0].supports_parallel_transfer();
    auto oparallel = output[0].supports_parallel_transfer();
    auto itype = input[0].get_type();
    auto otype = output[0].get_type();

    // convert dataset to path vectors
    std::vector<std::string> v_input;
    std::vector<std::string> v_output;

    // convert input to v_input
    for(auto i : input) {
        v_input.push_back(i.path());
    }
    for(auto o : output) {
        v_output.push_back(o.path());
    }
    if(iparallel) {
        return std::make_tuple(
                static_cast<int>(cargo::tag::pread),
                cargo::transfer_message{
                        tid, seqno, input.path(),
                        static_cast<uint32_t>(input.get_type()), output.path(),
                        static_cast<uint32_t>(output.get_type())});
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
    }

    if(output.supports_parallel_transfer()) {
    if(oparallel) {
        return std::make_tuple(
                static_cast<int>(cargo::tag::pwrite),
                cargo::transfer_message{
                        tid, seqno, input.path(),
                        static_cast<uint32_t>(input.get_type()), output.path(),
                        static_cast<uint32_t>(output.get_type())});
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
    }

    return std::make_tuple(
            static_cast<int>(cargo::tag::seq_mixed),
            cargo::transfer_message{tid, seqno, input.path(),
                                    static_cast<uint32_t>(input.get_type()),
                                    output.path(),
                                    static_cast<uint32_t>(output.get_type())});
            cargo::transfer_message{tid, seqno, v_input,
                                    static_cast<uint32_t>(itype), v_output,
                                    static_cast<uint32_t>(otype)});
}


} // namespace

using namespace std::literals;
@@ -95,6 +113,16 @@ master_server::master_server(std::string name, std::string address,

{

    const char* REGEX_env = std::getenv(cargo::env::REGEX);
    if(REGEX_env != nullptr) {
        REGEX_file = REGEX_env;
        LOGGER_INFO("{} env variable set to: {}", cargo::env::REGEX,
                    REGEX_file);
    } else {
        REGEX_file = "";
        LOGGER_INFO("{} env variable not set", cargo::env::REGEX);
    }

#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
    provider::define(EXPAND(shutdown));
@@ -134,7 +162,7 @@ master_server::mpi_listener_ult() {
        auto msg = world.iprobe();

        if(!msg) {
            std::this_thread::sleep_for(1000us);
            std::this_thread::sleep_for(10ms);
            // thallium::thread::self().sleep(m_network_engine, 10);
            continue;
        }
@@ -324,6 +352,28 @@ master_server::shutdown(const network::request& req) {
void
master_server::transfer_dataset_internal(pending_transfer& pt) {

    // setup regex
    // Read the regular expression pattern from the file
    auto patternFile = REGEX_file;
    bool filtering = false;
    std::string patternStr; // default
    if(REGEX_file != "") {
        std::ifstream file(patternFile);
        if(file.is_open()) {
            std::getline(file, patternStr);
            file.close();
            filtering = true;
        } else {
            LOGGER_ERROR("opening pattern file {}", patternFile);
            // return;
        }
        LOGGER_INFO("Using pattern str '{}' for regex from file '{}'",
                    patternStr, patternFile);
    }
    std::regex pattern;
    if(filtering)
        pattern.assign(patternStr);

    mpi::communicator world;
    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
@@ -367,6 +417,13 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
                // We need to get filename from the original root
                // path (d.path) plus the path from f, removing the
                // initial path p (taking care of the trailing /)
                // LOGGER_INFO("GKFS file {} checking ...", s_new.path());
                if(filtering) {
                    if(!std::regex_match(s_new.path(), pattern)) {
                        LOGGER_INFO("GKFS file {} IGNORED", s_new.path());
                        continue;
                    }
                }
                auto leading = p.size();
                if(leading > 0 and p.back() == '/') {
                    leading--;
@@ -382,6 +439,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
                    v_s_new.push_back(s_new);
                    v_d_new.push_back(d_new);
                }
                // break;
            }
        } else {
            fs->stat(s.path(), &buf);
@@ -411,7 +469,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {

    // For all the transfers
    for(std::size_t i = 0; i < v_s_new.size(); ++i) {
        const auto& s = v_s_new[i];
        // const auto& s = v_s_new[i];
        const auto& d = v_d_new[i];

        // Create the directory if it does not exist (only in
@@ -421,11 +479,12 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
            std::filesystem::create_directories(
                    std::filesystem::path(d.path()).parent_path());
        }
    }


        // Send message to worker
    // Send message to worker (seq number is 0)
    if(v_s_new.size() != 0) {
        for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) {
            const auto [t, m] = make_message(pt.m_p.tid(), i, s, d);
            const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new);
            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
            world.send(static_cast<int>(rank), t, m);
        }
@@ -479,6 +538,11 @@ master_server::transfer_datasets(const network::request& req,
        if(buf.st_mode & S_IFDIR) {
            LOGGER_INFO("Expanding input directory {}", p);
            files = fs->readdir(p);
            // As we need to create a new directory, we need to order the files
            // so that directories are created in the correct order

            // Order the files alphabetically
            std::sort(files.begin(), files.end());


            /*
@@ -536,8 +600,8 @@ master_server::transfer_datasets(const network::request& req,
                    }
                }
                // For all the transfers
                for(std::size_t i = 0; i < v_s_new.size(); ++i) {
                    const auto& s = v_s_new[i];
                for(std::size_t i = 0; i < v_d_new.size(); ++i) {
                    // const auto& s = v_s_new[i];
                    const auto& d = v_d_new[i];

                    // Create the directory if it does not exist (only in
@@ -548,6 +612,8 @@ master_server::transfer_datasets(const network::request& req,
                       d.supports_parallel_transfer()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                        LOGGER_INFO("Created directory {}", d.path());
                    }
                }

                // If we are not using ftio start transfer if we are on
@@ -555,17 +621,24 @@ master_server::transfer_datasets(const network::request& req,
                if(!m_ftio) {
                    // If we are on stage-out


                    // some sleep here may help ? too many messages to the
                    // workers? Changed to one message for all the files. seq is
                    // 0
                    if(v_s_new.size() != 0) {
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] = make_message(r.tid(), i, s, d);
                            const auto [t, m] =
                                    make_message(r.tid(), 0, v_s_new, v_d_new);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                            // Wait 1 ms
                            // std::this_thread::sleep_for(20ms);
                        }
                    }
                } else {
                    m_ftio_tid = r.tid();
                }
                }

                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                            error_code::success, r.tid());
                req.respond(response_with_id{rpc.id(), error_code::success,
@@ -587,7 +660,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid);
    LOGGER_DEBUG("rpc {:>} body: {{tid: {}}}", rpc, tid);

    m_request_manager.lookup(tid)
            .or_else([&](auto&& ec) {
+5 −2
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include "cargo.hpp"
#include "request_manager.hpp"
#include "parallel_request.hpp"
#include "env.hpp"

namespace cargo {

@@ -42,7 +43,8 @@ public:
    cargo::parallel_request m_p;
    std::vector<cargo::dataset> m_sources;
    std::vector<cargo::dataset> m_targets;
    // Expanded sources and targets (those that are being processed by the worker)
    // Expanded sources and targets (those that are being processed by the
    // worker)
    std::vector<cargo::dataset> m_expanded_sources;
    std::vector<cargo::dataset> m_expanded_targets;
};
@@ -118,6 +120,7 @@ private:
    transfer_dataset_internal(pending_transfer& pt);
    // Request manager
    request_manager m_request_manager;
    std::string REGEX_file;
};

} // namespace cargo
Loading