Commit 96df152d authored by Ramon Nou's avatar Ramon Nou
Browse files

MPI (tests) fails yet

parent bc712175
Loading
Loading
Loading
Loading
Loading
+20 −6
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ struct cargo_config {
    std::optional<fs::path> output_file;
    std::string address;
    std::uint64_t blocksize;
    std::string REGEX_file;
};

cargo_config
@@ -77,7 +78,8 @@ parse_command_line(int argc, char* argv[]) {
            ->option_text("ADDRESS")
            ->required();

    app.add_option("-b,--blocksize", cfg.blocksize,
    app.add_option(
               "-b,--blocksize", cfg.blocksize,
               "Number of bytes to send in each message (in kb). Defaults to 512(kb).\n")
            ->option_text("BLOCKSIZE")
            ->default_val(512);
@@ -92,6 +94,12 @@ parse_command_line(int argc, char* argv[]) {

    try {
        app.parse(argc, argv);
        const char* REGEX_env = std::getenv(cargo::env::REGEX);
        if(REGEX_env != nullptr) {
            cfg.REGEX_file = REGEX_env;
        } else {
            cfg.REGEX_file = "";
        }
        return cfg;
    } catch(const CLI::ParseError& ex) {
        std::exit(app.exit(ex));
@@ -117,14 +125,20 @@ main(int argc, char* argv[]) {

    try {
        if(const auto rank = world.rank(); rank == 0) {
            cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize,
                                     fs::current_path(), cfg.blocksize};
            cargo::master_server srv{cfg.progname,  cfg.address,
                                     cfg.daemonize, fs::current_path(),
                                     cfg.blocksize, cfg.REGEX_file};

            if(cfg.output_file) {
                srv.configure_logger(logger::logger_type::file,
                                     get_process_output_file(*cfg.output_file));
            }

            if(cfg.REGEX_file != "") {
                fmt::print("{} set to file:{} \n", cargo::env::REGEX,
                           cfg.REGEX_file);
            } else {
                fmt::print("{} not set \n", cargo::env::REGEX);
            }
            return srv.run();
        } else {

+14 −20
Original line number Diff line number Diff line
@@ -100,7 +100,7 @@ namespace cargo {

master_server::master_server(std::string name, std::string address,
                             bool daemonize, std::filesystem::path rundir,
                             std::uint64_t block_size,
                             std::uint64_t block_size, std::string regex_file,
                             std::optional<std::filesystem::path> pidfile)
    : server(std::move(name), std::move(address), daemonize, std::move(rundir),
             std::move(block_size), std::move(pidfile)),
@@ -114,16 +114,8 @@ master_server::master_server(std::string name, std::string address,

{
    m_block_size = block_size;
    REGEX_file = regex_file;

    const char* REGEX_env = std::getenv(cargo::env::REGEX);
    if(REGEX_env != nullptr) {
        REGEX_file = REGEX_env;
        LOGGER_INFO("{} env variable set to: {}", cargo::env::REGEX,
                    REGEX_file);
    } else {
        REGEX_file = "";
        LOGGER_INFO("{} env variable not set", cargo::env::REGEX);
    }

#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
@@ -159,6 +151,7 @@ void
master_server::mpi_listener_ult() {

    mpi::communicator world;
   
    while(!m_shutting_down) {

        auto msg = world.iprobe();
@@ -488,7 +481,8 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
    // Send message to worker (seq number is 0)
    if(v_s_new.size() != 0) {
        for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) {
            const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new);
            const auto [t, m] =
                    make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new);
            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
            world.send(static_cast<int>(rank), t, m);
        }
@@ -540,8 +534,9 @@ master_server::transfer_datasets(const network::request& req,
        auto fs = FSPlugin::make_fs(
                static_cast<cargo::FSPlugin::type>(s.get_type()));
        struct stat buf;
        fs->stat(p, &buf);
        if(buf.st_mode & S_IFDIR) {
        auto rstat = fs->stat(p, &buf);
        
        if(rstat == 0 and (buf.st_mode & S_IFDIR)) {
            LOGGER_INFO("Expanding input directory {}", p);
            files = fs->readdir(p);
            // As we need to create a new directory, we need to order the files
@@ -574,7 +569,8 @@ master_server::transfer_datasets(const network::request& req,

                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                             d_new.path());
                fs->stat(s_new.path(), &buf);
                rstat = fs->stat(s_new.path(), &buf);
                if (rstat == 0)
                v_size_new.push_back(buf.st_size);
                v_s_new.push_back(s_new);
                v_d_new.push_back(d_new);
@@ -617,13 +613,11 @@ master_server::transfer_datasets(const network::request& req,

                    // Create the directory if it does not exist (only in
                    // parallel transfer)
                    if(!std::filesystem::path(d.path())
                                .parent_path()
                                .empty() and
                    if(!std::filesystem::path(d.path()).parent_path().empty() and
                       d.supports_parallel_transfer()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                        LOGGER_INFO("Created directory {}", d.path());
                        LOGGER_INFO("Created directory {}", std::filesystem::path(d.path()).parent_path());
                    }
                }

@@ -634,8 +628,8 @@ master_server::transfer_datasets(const network::request& req,
                    if(v_s_new.size() != 0) {
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] =
                                    make_message(r.tid(), 0, v_s_new, v_d_new, v_size_new);
                            const auto [t, m] = make_message(
                                    r.tid(), 0, v_s_new, v_d_new, v_size_new);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                        }
+1 −1
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ class master_server : public network::server,
                      public network::provider<master_server> {
public:
    master_server(std::string name, std::string address, bool daemonize,
                  std::filesystem::path rundir, std::uint64_t block_size,
                  std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file,
                  std::optional<std::filesystem::path> pidfile = {});

    ~master_server();
+1 −0
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ mpio_read::operator()() {
    m_status = error_code::transfer_in_progress;
    try {

        std::cout << "Trying to open file " << m_input_path << std::endl;
        const auto input_file = mpioxx::file::open(
                m_workers, m_input_path, mpioxx::file_open_mode::rdonly);

+2 −1
Original line number Diff line number Diff line
#include <fmt/format.h>
#include "common.hpp"
#include <filesystem>

std::vector<cargo::dataset>
prepare_datasets(cargo::dataset::type type, const std::string& pattern,
@@ -7,7 +8,7 @@ prepare_datasets(cargo::dataset::type type, const std::string& pattern,
    std::vector<cargo::dataset> datasets;
    datasets.reserve(n);
    for(size_t i = 0; i < n; ++i) {
        datasets.emplace_back(fmt::format(fmt::runtime(pattern), i), type);
        datasets.emplace_back(std::filesystem::current_path().string()+"/"+fmt::format(fmt::runtime(pattern), i), type);
    }

    return datasets;
Loading