Commit c15313f1 authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'rnou/newdevelop' into 'main'

Optimize small files

Solves #45

See merge request !34
parents ff90c01e 2277bef1
Loading
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -22,9 +22,9 @@
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <cargo.hpp>
#include <fmt_formatters.hpp>
#include <net/serialization.hpp>
#include "cargo.hpp"
#include "fmt_formatters.hpp"
#include "net/serialization.hpp"
#include <iomanip>
#include <logger/logger.hpp>
#include <net/client.hpp>
+20 −6
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ struct cargo_config {
    std::optional<fs::path> output_file;
    std::string address;
    std::uint64_t blocksize;
    std::string REGEX_file;
};

cargo_config
@@ -77,7 +78,8 @@ parse_command_line(int argc, char* argv[]) {
            ->option_text("ADDRESS")
            ->required();

    app.add_option("-b,--blocksize", cfg.blocksize,
    app.add_option(
               "-b,--blocksize", cfg.blocksize,
               "Number of bytes to send in each message (in kb). Defaults to 512(kb).\n")
            ->option_text("BLOCKSIZE")
            ->default_val(512);
@@ -92,6 +94,12 @@ parse_command_line(int argc, char* argv[]) {

    try {
        app.parse(argc, argv);
        const char* REGEX_env = std::getenv(cargo::env::REGEX);
        if(REGEX_env != nullptr) {
            cfg.REGEX_file = REGEX_env;
        } else {
            cfg.REGEX_file = "";
        }
        return cfg;
    } catch(const CLI::ParseError& ex) {
        std::exit(app.exit(ex));
@@ -117,14 +125,20 @@ main(int argc, char* argv[]) {

    try {
        if(const auto rank = world.rank(); rank == 0) {
            cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize,
                                     fs::current_path(), cfg.blocksize};
            cargo::master_server srv{cfg.progname,  cfg.address,
                                     cfg.daemonize, fs::current_path(),
                                     cfg.blocksize, cfg.REGEX_file};

            if(cfg.output_file) {
                srv.configure_logger(logger::logger_type::file,
                                     get_process_output_file(*cfg.output_file));
            }

            if(cfg.REGEX_file != "") {
                fmt::print("{} set to file:{} \n", cargo::env::REGEX,
                           cfg.REGEX_file);
            } else {
                fmt::print("{} not set \n", cargo::env::REGEX);
            }
            return srv.run();
        } else {

+28 −29
Original line number Diff line number Diff line
@@ -49,7 +49,8 @@ namespace {
std::tuple<int, cargo::transfer_message>
make_message(std::uint64_t tid, std::uint32_t seqno,
             const std::vector<cargo::dataset>& input,
             const std::vector<cargo::dataset>& output) {
             const std::vector<cargo::dataset>& output,
             std::vector<std::size_t>& v_size) {

    auto iparallel = input[0].supports_parallel_transfer();
    auto oparallel = output[0].supports_parallel_transfer();
@@ -72,7 +73,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
                static_cast<int>(cargo::tag::pread),
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
                                        static_cast<uint32_t>(otype), v_size});
    }

    if(oparallel) {
@@ -80,14 +81,14 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
                static_cast<int>(cargo::tag::pwrite),
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
                                        static_cast<uint32_t>(otype), v_size});
    }

    return std::make_tuple(
            static_cast<int>(cargo::tag::seq_mixed),
            cargo::transfer_message{tid, seqno, v_input,
                                    static_cast<uint32_t>(itype), v_output,
                                    static_cast<uint32_t>(otype)});
                                    static_cast<uint32_t>(otype), v_size});
}


@@ -99,7 +100,7 @@ namespace cargo {

master_server::master_server(std::string name, std::string address,
                             bool daemonize, std::filesystem::path rundir,
                             std::uint64_t block_size,
                             std::uint64_t block_size, std::string regex_file,
                             std::optional<std::filesystem::path> pidfile)
    : server(std::move(name), std::move(address), daemonize, std::move(rundir),
             std::move(block_size), std::move(pidfile)),
@@ -112,16 +113,9 @@ master_server::master_server(std::string name, std::string address,
              [this]() { ftio_scheduling_ult(); }))

{
    m_block_size = block_size;
    REGEX_file = regex_file;

    const char* REGEX_env = std::getenv(cargo::env::REGEX);
    if(REGEX_env != nullptr) {
        REGEX_file = REGEX_env;
        LOGGER_INFO("{} env variable set to: {}", cargo::env::REGEX,
                    REGEX_file);
    } else {
        REGEX_file = "";
        LOGGER_INFO("{} env variable not set", cargo::env::REGEX);
    }

#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
@@ -157,6 +151,7 @@ void
master_server::mpi_listener_ult() {

    mpi::communicator world;
   
    while(!m_shutting_down) {

        auto msg = world.iprobe();
@@ -377,6 +372,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
    mpi::communicator world;
    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
    std::vector<std::size_t> v_size_new;
    time_t now = time(0);
    now = now - 5; // Threshold for mtime
    for(auto i = 0u; i < pt.m_sources.size(); ++i) {
@@ -446,6 +442,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
            if(buf.st_mtime < now) {
                v_s_new.push_back(s);
                v_d_new.push_back(d);
                v_size_new.push_back(buf.st_size);
            }
        }
    }
@@ -484,7 +481,8 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
    // Send message to worker (seq number is 0)
    if(v_s_new.size() != 0) {
        for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) {
            const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new);
            const auto [t, m] =
                    make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new);
            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
            world.send(static_cast<int>(rank), t, m);
        }
@@ -512,6 +510,8 @@ master_server::transfer_datasets(const network::request& req,

    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
    // We ask for the size of the input files.
    std::vector<std::size_t> v_size_new;

    for(auto i = 0u; i < sources.size(); ++i) {

@@ -534,8 +534,9 @@ master_server::transfer_datasets(const network::request& req,
        auto fs = FSPlugin::make_fs(
                static_cast<cargo::FSPlugin::type>(s.get_type()));
        struct stat buf;
        fs->stat(p, &buf);
        if(buf.st_mode & S_IFDIR) {
        auto rstat = fs->stat(p, &buf);
        
        if(rstat == 0 and (buf.st_mode & S_IFDIR)) {
            LOGGER_INFO("Expanding input directory {}", p);
            files = fs->readdir(p);
            // As we need to create a new directory, we need to order the files
@@ -568,11 +569,17 @@ master_server::transfer_datasets(const network::request& req,

                LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(),
                             d_new.path());
                rstat = fs->stat(s_new.path(), &buf);
                if (rstat == 0)
                v_size_new.push_back(buf.st_size);
                v_s_new.push_back(s_new);
                v_d_new.push_back(d_new);
            }

        } else {
            // We do not create any optimization for single files
            fs->stat(s.path(), &buf);
            v_size_new.push_back(buf.st_size);
            v_s_new.push_back(s);
            v_d_new.push_back(d);
        }
@@ -606,13 +613,11 @@ master_server::transfer_datasets(const network::request& req,

                    // Create the directory if it does not exist (only in
                    // parallel transfer)
                    if(!std::filesystem::path(d.path())
                                .parent_path()
                                .empty() and
                    if(!std::filesystem::path(d.path()).parent_path().empty() and
                       d.supports_parallel_transfer()) {
                        std::filesystem::create_directories(
                                std::filesystem::path(d.path()).parent_path());
                        LOGGER_INFO("Created directory {}", d.path());
                        LOGGER_INFO("Created directory {}", std::filesystem::path(d.path()).parent_path());
                    }
                }

@@ -620,19 +625,13 @@ master_server::transfer_datasets(const network::request& req,
                // stage-out
                if(!m_ftio) {
                    // If we are on stage-out

                    // some sleep here may help ? too many messages to the
                    // workers? Changed to one message for all the files. seq is
                    // 0
                    if(v_s_new.size() != 0) {
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] =
                                    make_message(r.tid(), 0, v_s_new, v_d_new);
                            const auto [t, m] = make_message(
                                    r.tid(), 0, v_s_new, v_d_new, v_size_new);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                            // Wait 1 ms
                            // std::this_thread::sleep_for(20ms);
                        }
                    }
                } else {
+2 −2
Original line number Diff line number Diff line
@@ -53,7 +53,7 @@ class master_server : public network::server,
                      public network::provider<master_server> {
public:
    master_server(std::string name, std::string address, bool daemonize,
                  std::filesystem::path rundir, std::uint64_t block_size,
                  std::filesystem::path rundir, std::uint64_t block_size, std::string regex_file,
                  std::optional<std::filesystem::path> pidfile = {});

    ~master_server();
@@ -111,7 +111,7 @@ private:
    std::uint64_t m_ftio_tid = 0;
    // FTIO enabled flag, we need to call ftio once.
    bool m_ftio = false;

    ssize_t m_block_size = 0;

    pending_transfer m_pending_transfer;

+1 −2
Original line number Diff line number Diff line
@@ -135,7 +135,6 @@ public:

        MPI_File result;
   
    
        if(const auto ec =
                   MPI_File_open(comm, filepath.c_str(), static_cast<int>(mode),
                                 MPI_INFO_NULL, &result);
Loading