Verified Commit 793bb85f authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

server: Basic master-worker communication loop

parent da593ead
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ target_sources(
          worker.cpp
          worker.hpp
          env.hpp
          message.hpp
)

target_include_directories(
+53 −0
Original line number Diff line number Diff line
@@ -29,6 +29,10 @@

#include <cargo.hpp>
#include <fmt_formatters.hpp>
#include <boost/mpi.hpp>
#include "message.hpp"

using namespace std::literals;

namespace {

@@ -37,6 +41,41 @@ get_address(auto&& req) {
    return req.get_endpoint();
}

std::tuple<std::string, std::string>
split(const std::string& id) {

    constexpr auto delim = "://"sv;
    const auto n = id.find(delim);

    if(n == std::string::npos) {
        return {std::string{}, id};
    }

    return {id.substr(0, n), id.substr(n + delim.length(), id.length())};
}

cargo::transfer_request_message
create_request_message(const cargo::dataset& input,
                       const cargo::dataset& output) {

    cargo::transfer_type tx_type;

    const auto& [input_prefix, input_path] = split(input.id());
    const auto& [output_prefix, output_path] = split(output.id());

    // FIXME: id should offer member functions to retrieve the parent
    // namespace
    if(input_prefix == "lustre") {
        tx_type = cargo::parallel_read;
    } else if(output_prefix == "lustre") {
        tx_type = cargo::parallel_write;
    } else {
        tx_type = cargo::sequential;
    }

    return cargo::transfer_request_message{input_path, output_path, tx_type};
}

} // namespace

using namespace std::literals;
@@ -86,6 +125,20 @@ transfer_datasets(const net::request& req,

    assert(sources.size() == targets.size());

    boost::mpi::communicator world;
    for(auto i = 0u; i < sources.size(); ++i) {

        const auto& input_path = sources[i].id();
        const auto& output_path = targets[i].id();

        const auto m = ::create_request_message(sources[i], targets[i]);

        for(int rank = 1; rank < world.size(); ++rank) {
            world.send(rank, static_cast<int>(cargo::message_tags::transfer),
                       m);
        }
    }

    cargo::transfer tx{42};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "

src/message.hpp

0 → 100644
+135 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#ifndef CARGO_MESSAGE_HPP
#define CARGO_MESSAGE_HPP

#include <fmt/format.h>
#include <filesystem>
#include <boost/archive/binary_oarchive.hpp>
#include <utility>

namespace cargo {

enum transfer_type { parallel_read, parallel_write, sequential };
enum class message_tags { transfer, status, shutdown };

class transfer_request_message {

    friend class boost::serialization::access;

public:
    transfer_request_message() = default;

    transfer_request_message(const std::filesystem::path& input_path,
                             const std::filesystem::path& output_path,
                             transfer_type type)
        : m_input_path(input_path), m_output_path(output_path), m_type(type) {}

    std::filesystem::path
    input_path() const {
        return m_input_path;
    }

    std::filesystem::path
    output_path() const {
        return m_output_path;
    }

    transfer_type
    type() const {
        return m_type;
    }

private:
    template <class Archive>
    void
    serialize(Archive& ar, const unsigned int version) {
        (void) version;

        ar& m_input_path;
        ar& m_output_path;
        ar& m_type;
    }

    std::string m_input_path;
    std::string m_output_path;
    transfer_type m_type;
};

class transfer_status_message {

    friend class boost::serialization::access;

public:
    transfer_status_message() = default;

    explicit transfer_status_message(std::uint64_t transfer_id)
        : m_transfer_id(transfer_id) {}

    std::uint64_t
    transfer_id() const {
        return m_transfer_id;
    }

private:
    template <class Archive>
    void
    serialize(Archive& ar, const unsigned int version) {
        (void) version;

        ar& m_transfer_id;
    }

    std::uint64_t m_transfer_id{};
};

} // namespace cargo

template <>
struct fmt::formatter<cargo::transfer_request_message>
    : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const cargo::transfer_request_message& r, FormatContext& ctx) const {
        const auto str = fmt::format("{{input_path: {}, output_path: {}}}",
                                     r.input_path(), r.output_path());
        return formatter<std::string_view>::format(str, ctx);
    }
};

template <>
struct fmt::formatter<cargo::transfer_status_message>
    : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const cargo::transfer_status_message& s, FormatContext& ctx) const {
        const auto str = fmt::format("{{transfer_id: {}}}", s.transfer_id());
        return formatter<std::string_view>::format(str, ctx);
    }
};

#endif // CARGO_MESSAGE_HPP
+125 −1
Original line number Diff line number Diff line
@@ -22,5 +22,129 @@
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <fmt/format.h>
#include <logger/logger.hpp>
#include <boost/mpi.hpp>
#include <boost/mpi/error_string.hpp>
#include "message.hpp"

namespace mpi = boost::mpi;
using namespace std::chrono_literals;


namespace {

// boost MPI doesn't have a communicator constructor that uses
// MPI_Comm_create_group()
mpi::communicator
make_communicator(const mpi::communicator& comm, const mpi::group& group,
                  int tag) {
    MPI_Comm newcomm;
    if(const auto ec = MPI_Comm_create_group(comm, group, tag, &newcomm);
       ec != MPI_SUCCESS) {
        LOGGER_ERROR("MPI_Comm_create_group() failed: {}",
                     boost::mpi::error_string(ec));
        MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
    }
    return mpi::communicator{newcomm, boost::mpi::comm_take_ownership};
}


} // namespace

void
mpio_read(const mpi::communicator& workers,
          const std::filesystem::path& input_path,
          const std::filesystem::path& output_path) {

    (void) workers;
    (void) input_path;
    (void) output_path;

    LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
}

void
mpio_write(const mpi::communicator& workers,
           const std::filesystem::path& input_path,
           const std::filesystem::path& output_path) {

    (void) workers;
    (void) input_path;
    (void) output_path;

    LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
}

void
worker() {}
sequential_transfer(const std::filesystem::path& input_path,
                    const std::filesystem::path& output_path) {
    (void) input_path;
    (void) output_path;

    LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
}

void
worker() {

    // Create a separate communicator only for worker processes
    const mpi::communicator world;
    const auto ranks_to_exclude = std::array<int, 1>{0};
    const auto workers =
            ::make_communicator(world,
                                world.group().exclude(ranks_to_exclude.begin(),
                                                      ranks_to_exclude.end()),
                                0);

    LOGGER_INIT(fmt::format("worker_{:03}", world.rank()), "console color");

    // Initialization finished
    LOGGER_INFO("Staging process initialized (world_rank {}, workers_rank: {})",
                world.rank(), workers.rank());

    bool done = false;

    while(!done) {

        auto msg = world.iprobe();

        if(!msg) {
            // FIXME: sleep time should be configurable
            std::this_thread::sleep_for(150ms);
            continue;
        }

        switch(static_cast<cargo::message_tags>(msg->tag())) {
            case cargo::message_tags::transfer: {
                cargo::transfer_request_message m;
                world.recv(mpi::any_source, msg->tag(), m);
                LOGGER_DEBUG("Transfer request received!: {}", m);

                switch(m.type()) {
                    case cargo::parallel_read:
                        ::mpio_read(workers, m.input_path(), m.output_path());
                        break;
                    case cargo::parallel_write:
                        ::mpio_write(workers, m.input_path(), m.output_path());
                        break;
                    case cargo::sequential:
                        ::sequential_transfer(m.input_path(), m.output_path());
                        break;
                }
                break;
            }

            case cargo::message_tags::status: {
                cargo::transfer_status_message m;
                world.recv(mpi::any_source, msg->tag(), m);
                LOGGER_DEBUG("Transfer status query received!: {}", m);
                break;
            }

            case cargo::message_tags::shutdown:
                done = true;
                break;
        }
    }
}