Commit 56540bc3 authored by Ramon Nou's avatar Ramon Nou
Browse files

First consolidation version

parent d5ada770
Loading
Loading
Loading
Loading
Loading
+42 −23
Original line number Diff line number Diff line
@@ -43,36 +43,54 @@ namespace mpi = boost::mpi;

namespace {

// Vector of input message - Optimization
std::tuple<int, cargo::transfer_message>
make_message(std::uint64_t tid, std::uint32_t seqno,
             const cargo::dataset& input, const cargo::dataset& output) {
             const std::vector<cargo::dataset>& input, const std::vector<cargo::dataset>& output) {

    if(input.supports_parallel_transfer()) {
    auto iparallel = input[0].supports_parallel_transfer();
    auto oparallel = output[0].supports_parallel_transfer();
    auto itype = input[0].get_type();
    auto otype = output[0].get_type();

    //convert dataset to path vectors
    std::vector<std::string> v_input;
    std::vector<std::string> v_output;

    // convert input to v_input
    for (auto i : input) {
        v_input.push_back(i.path());
    }
    for (auto o : output) {
        v_output.push_back(o.path());
    }
    if(iparallel) {
        return std::make_tuple(
                static_cast<int>(cargo::tag::pread),
                cargo::transfer_message{
                        tid, seqno, input.path(),
                        static_cast<uint32_t>(input.get_type()), output.path(),
                        static_cast<uint32_t>(output.get_type())});
                        tid, seqno, v_input,
                        static_cast<uint32_t>(itype), v_output,
                        static_cast<uint32_t>(otype)});
    }

    if(output.supports_parallel_transfer()) {
    if(oparallel) {
        return std::make_tuple(
                static_cast<int>(cargo::tag::pwrite),
                cargo::transfer_message{
                        tid, seqno, input.path(),
                        static_cast<uint32_t>(input.get_type()), output.path(),
                        static_cast<uint32_t>(output.get_type())});
                        tid, seqno, v_input,
                        static_cast<uint32_t>(itype), v_output,
                        static_cast<uint32_t>(otype)});
    }

    return std::make_tuple(
            static_cast<int>(cargo::tag::seq_mixed),
            cargo::transfer_message{tid, seqno, input.path(),
                                    static_cast<uint32_t>(input.get_type()),
                                    output.path(),
                                    static_cast<uint32_t>(output.get_type())});
            cargo::transfer_message{tid, seqno, v_input,
                                    static_cast<uint32_t>(itype),
                                    v_output,
                                    static_cast<uint32_t>(otype)});
}


} // namespace

using namespace std::literals;
@@ -411,7 +429,7 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {

    // For all the transfers
    for(std::size_t i = 0; i < v_s_new.size(); ++i) {
        const auto& s = v_s_new[i];
       // const auto& s = v_s_new[i];
        const auto& d = v_d_new[i];

        // Create the directory if it does not exist (only in
@@ -421,16 +439,15 @@ master_server::transfer_dataset_internal(pending_transfer& pt) {
            std::filesystem::create_directories(
                    std::filesystem::path(d.path()).parent_path());
        }
    }


        // Send message to worker
        // Send message to worker (seq number is 0)
        for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) {
            const auto [t, m] = make_message(pt.m_p.tid(), i, s, d);
            const auto [t, m] = make_message(pt.m_p.tid(), 0, v_s_new, v_d_new);
            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
            world.send(static_cast<int>(rank), t, m);
        }
}
}

void
master_server::transfer_datasets(const network::request& req,
@@ -541,8 +558,8 @@ master_server::transfer_datasets(const network::request& req,
                    }
                }
                // For all the transfers
                for(std::size_t i = 0; i < v_s_new.size(); ++i) {
                    const auto& s = v_s_new[i];
                for(std::size_t i = 0; i < v_d_new.size(); ++i) {
                   // const auto& s = v_s_new[i];
                    const auto& d = v_d_new[i];

                    // Create the directory if it does not exist (only in
@@ -555,6 +572,7 @@ master_server::transfer_datasets(const network::request& req,
                                std::filesystem::path(d.path()).parent_path());
                        LOGGER_INFO("Created directory {}", d.path());
                    }
                }

                    // If we are not using ftio start transfer if we are on
                    // stage-out
@@ -562,19 +580,20 @@ master_server::transfer_datasets(const network::request& req,
                        // If we are on stage-out

// some sleep here may help ? too many messages to the workers?
// Changed to one message for all the files. seq is 0
                        for(std::size_t rank = 1; rank <= r.nworkers();
                            ++rank) {
                            const auto [t, m] = make_message(r.tid(), i, s, d);
                            const auto [t, m] = make_message(r.tid(), 0, v_s_new, v_d_new);
                            LOGGER_INFO("msg <= to: {} body: {}", rank, m);
                            world.send(static_cast<int>(rank), t, m);
                            // Wait 1 ms
                            std::this_thread::sleep_for(20ms);
                            // std::this_thread::sleep_for(20ms);

                        }
                    } else {
                        m_ftio_tid = r.tid();
                    }
                }
                
                LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc,
                            error_code::success, r.tid());
                req.respond(response_with_id{rpc.id(), error_code::success,
+9 −6
Original line number Diff line number Diff line
@@ -26,12 +26,14 @@
#define CARGO_PROTO_MPI_MESSAGE_HPP

#include <fmt/format.h>
#include <fmt/ranges.h>
#include <filesystem>
#include <boost/archive/binary_oarchive.hpp>
#include <utility>
#include <optional>
#include "cargo.hpp"
#include "boost_serialization_std_optional.hpp"
#include <boost/serialization/vector.hpp>
#include "posix_file/file.hpp"

namespace cargo {
@@ -54,8 +56,8 @@ public:
    transfer_message() = default;

    transfer_message(std::uint64_t tid, std::uint32_t seqno,
                     std::string input_path, std::uint32_t i_type,
                     std::string output_path, std::uint32_t o_type)
                     std::vector<std::string> input_path, std::uint32_t i_type,
                     std::vector<std::string> output_path, std::uint32_t o_type)
        : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)),
          m_i_type(i_type), m_output_path(std::move(output_path)),
          m_o_type(o_type) {}
@@ -70,12 +72,12 @@ public:
        return m_seqno;
    }

    [[nodiscard]] const std::string&
    [[nodiscard]] const std::vector<std::string> &
    input_path() const {
        return m_input_path;
    }

    [[nodiscard]] const std::string&
    [[nodiscard]] const std::vector<std::string> &
    output_path() const {
        return m_output_path;
    }
@@ -107,9 +109,9 @@ private:

    std::uint64_t m_tid{};
    std::uint32_t m_seqno{};
    std::string m_input_path;
    std::vector<std::string> m_input_path;
    std::uint32_t m_i_type{};
    std::string m_output_path;
    std::vector<std::string> m_output_path;
    std::uint32_t m_o_type{};
};

@@ -244,6 +246,7 @@ struct fmt::formatter<cargo::transfer_message> : formatter<std::string_view> {
    }
};


template <>
struct fmt::formatter<cargo::status_message> : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
+15 −7
Original line number Diff line number Diff line
@@ -108,6 +108,7 @@ worker::run() {
    bool done = false;
    while(!done) {
        // Always loop pending operations
        // TODO: This seems that it is not a good idea, we have a lot of () ongoing

        auto I = m_ops.begin();
        auto IE = m_ops.end();
@@ -153,7 +154,8 @@ worker::run() {
                                     op->bw());
                    }
                    I->second.second = index;
                    ++I;
                    // If we have ++I we go trhu another file
                    //++I;
                }
            }
        }
@@ -180,22 +182,28 @@ worker::run() {
                transfer_message m;
                world.recv(msg->source(), msg->tag(), m);
                LOGGER_INFO("msg => from: {} body: {}", msg->source(), m);
                // Iterate over all the vector (input and output) and create a new op per file
                for (std::size_t i = 0; i < m.input_path().size(); i++) {
                std::string input_path = m.input_path()[i];
                std::string output_path = m.output_path()[i];
                m_ops.emplace(std::make_pair(
                        make_pair(m.input_path(), m.output_path()),
                        make_pair(input_path, output_path),
                        make_pair(operation::make_operation(
                                          t, workers, m.input_path(),
                                          m.output_path(), m_block_size,
                                          t, workers, input_path,
                                          output_path, m_block_size,
                                          m.i_type(), m.o_type()),
                                  -1)));

                // TODO : Issue 1, seqno is not different from each file -(we use i)
                const auto op =
                        m_ops[make_pair(m.input_path(), m.output_path())]
                        m_ops[make_pair(input_path, output_path)]
                                .first.get();

                op->set_comm(msg->source(), m.tid(), m.seqno(), t);
                op->set_comm(msg->source(), m.tid(), i, t);

                update_state(op->source(), op->tid(), op->seqno(),
                             op->output_path(), transfer_state::pending, -1.0f);
                }

                break;
            }