Commit 83748e29 authored by Ramon Nou's avatar Ramon Nou
Browse files

Small files optimization, size RPC reduction

parent 8d3e3622
Loading
Loading
Loading
Loading
+10 −13
Original line number Diff line number Diff line
@@ -49,7 +49,8 @@ namespace {
std::tuple<int, cargo::transfer_message>
make_message(std::uint64_t tid, std::uint32_t seqno,
             const std::vector<cargo::dataset>& input,
             const std::vector<cargo::dataset>& output) {
             const std::vector<cargo::dataset>& output,
             std::vector<std::size_t>& v_size) {

    auto iparallel = input[0].supports_parallel_transfer();
    auto oparallel = output[0].supports_parallel_transfer();
@@ -72,7 +73,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
                static_cast<int>(cargo::tag::pread),
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
                                        static_cast<uint32_t>(otype), v_size});
    }

    if(oparallel) {
@@ -80,14 +81,14 @@ make_message(std::uint64_t tid, std::uint32_t seqno,
                static_cast<int>(cargo::tag::pwrite),
                cargo::transfer_message{tid, seqno, v_input,
                                        static_cast<uint32_t>(itype), v_output,
                                        static_cast<uint32_t>(otype)});
                                        static_cast<uint32_t>(otype), v_size});
    }

    return std::make_tuple(
            static_cast<int>(cargo::tag::seq_mixed),
            cargo::transfer_message{tid, seqno, v_input,
                                    static_cast<uint32_t>(itype), v_output,
                                    static_cast<uint32_t>(otype)});
                                    static_cast<uint32_t>(otype), v_size});
}


@@ -378,6 +379,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
    mpi::communicator world;
    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
    std::vector<std::size_t> v_size_new;
    time_t now = time(0);
    now = now - 5; // Threshold for mtime
    for(auto i = 0u; i < pt.m_sources.size(); ++i) {
@@ -447,6 +449,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
            if(buf.st_mtime < now) {
                v_s_new.push_back(s);
                v_d_new.push_back(d);
                v_size_new.push_back(buf.st_size);
            }
        }
    }
@@ -485,7 +488,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
    // Send message to worker (seq number is 0)
    if(v_s_new.size() != 0) {
        for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) {
            const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new);
            const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new, v_size_new);
            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
            world.send(static_cast<int>(rank), t, m);
        }
@@ -514,7 +517,7 @@ master_server::transfer_datasets(const network::request& req,
    std::vector<cargo::dataset> v_s_new;
    std::vector<cargo::dataset> v_d_new;
    // We ask for the size of the input files.
    std::vector<ssize_t> v_size_new;
    std::vector<std::size_t> v_size_new;

    for(auto i = 0u; i < sources.size(); ++i) {

@@ -628,19 +631,13 @@ master_server::transfer_datasets(const network::request& req,
                // stage-out
                if(!m_ftio) {
                    // If we are on stage-out

                    // some sleep here may help ? too many messages to the
                    // workers? Changed to one message for all the files. seq is
                    // 0
                    if(v_s_new.size() != 0) {
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] =
                                    make_message(r.tid(), 0, v_s_new, v_d_new);
                                    make_message(r.tid(), 0, v_s_new, v_d_new, v_size_new);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                            // Wait 1 ms
                            // std::this_thread::sleep_for(20ms);
                        }
                    }
                } else {
+11 −2
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@
#include <fmt/format.h>
#include <fmt/ranges.h>
#include <filesystem>
#include <cstddef>
#include <boost/archive/binary_oarchive.hpp>
#include <utility>
#include <optional>
@@ -57,10 +58,11 @@ public:

    transfer_message(std::uint64_t tid, std::uint32_t seqno,
                     std::vector<std::string> input_path, std::uint32_t i_type,
                     std::vector<std::string> output_path, std::uint32_t o_type)
                     std::vector<std::string> output_path, std::uint32_t o_type,
                     std::vector<std::size_t> sizes)
        : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)),
          m_i_type(i_type), m_output_path(std::move(output_path)),
          m_o_type(o_type) {}
          m_o_type(o_type), m_sizes(std::move(sizes)) {}

    [[nodiscard]] std::uint64_t
    tid() const {
@@ -93,6 +95,11 @@ public:
        return static_cast<cargo::FSPlugin::type>(m_i_type);
    }

    [[nodiscard]] const std::vector<std::size_t> &
    sizes() const {
        return m_sizes;
    }
    
private:
    template <class Archive>
    void
@@ -105,6 +112,7 @@ private:
        ar& m_output_path;
        ar& m_i_type;
        ar& m_o_type;
        ar& m_sizes;
    }

    std::uint64_t m_tid{};
@@ -113,6 +121,7 @@ private:
    std::uint32_t m_i_type{};
    std::vector<std::string> m_output_path;
    std::uint32_t m_o_type{};
    std::vector<std::size_t> m_sizes;
};

class status_message {
+10 −5
Original line number Diff line number Diff line
@@ -34,10 +34,10 @@ mpio_read::mpio_read(mpi::communicator workers,
                     std::filesystem::path input_path,
                     std::filesystem::path output_path,
                     std::uint64_t block_size, FSPlugin::type fs_i_type,
                     FSPlugin::type fs_o_type)
                     FSPlugin::type fs_o_type, std::size_t size, bool single)
    : m_workers(std::move(workers)), m_input_path(std::move(input_path)),
      m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)),
      m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type) {}
      m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type), m_file_size(size), m_single(single) {}

cargo::error_code
mpio_read::operator()() {
@@ -51,7 +51,7 @@ mpio_read::operator()() {
        const auto input_file = mpioxx::file::open(
                m_workers, m_input_path, mpioxx::file_open_mode::rdonly);

        mpioxx::offset file_size = input_file.size();
        mpioxx::offset file_size = m_file_size;
        std::size_t block_size = m_kb_size * 1024u;

        // create block type
@@ -67,8 +67,13 @@ mpio_read::operator()() {
            ++total_blocks;
        }

        const auto workers_size = m_workers.size();
        const auto workers_rank = m_workers.rank();
        auto workers_size = m_workers.size();
        auto workers_rank = m_workers.rank();

        if (m_single) {
            workers_size = 1;
            workers_rank = 0;
        }

        // create file type
        MPI_Datatype file_type;
+4 −1
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ class mpio_read : public operation {
public:
    mpio_read(mpi::communicator workers, std::filesystem::path input_path,
              std::filesystem::path output_path, std::uint64_t block_size,
              FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type);
              FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type, std::size_t size, bool single);

    cargo::error_code
    operator()() final;
@@ -65,6 +65,7 @@ private:
    cargo::error_code m_status;
    std::filesystem::path m_input_path{};
    std::filesystem::path m_output_path{};
    
    std::unique_ptr<posix_file::file> m_output_file;
    int m_workers_size;
    int m_workers_rank;
@@ -74,6 +75,8 @@ private:
    std::uint64_t m_kb_size;
    FSPlugin::type m_fs_i_type;
    FSPlugin::type m_fs_o_type;
    std::size_t m_file_size;
    bool m_single;
};

} // namespace cargo
+9 −3
Original line number Diff line number Diff line
@@ -38,14 +38,20 @@ mpio_write::operator()() {
    m_status = error_code::transfer_in_progress;
    try {

        const auto workers_size = m_workers.size();
        const auto workers_rank = m_workers.rank();
        auto workers_size = m_workers.size();
        auto workers_rank = m_workers.rank();

        if (m_single) {
            workers_size = 1;
            workers_rank = 0;
        }
        
        std::size_t block_size = m_kb_size * 1024u;
        // We need to open the file and ask size (using fs_plugin)
        m_input_file = std::make_unique<posix_file::file>(
                posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type));

        std::size_t file_size = m_input_file->size();
        std::size_t file_size = m_file_size;

        // compute the number of blocks in the file
        int total_blocks = static_cast<int>(file_size / block_size);
Loading