Verified Commit 414fc9da authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Replace `master` function with `master_server` class

RPC handlers are now member functions of `master_server`
parent 4744b45f
Loading
Loading
Loading
Loading
+11 −1
Original line number Diff line number Diff line
@@ -105,7 +105,17 @@ main(int argc, char* argv[]) {

    try {
        if(world.rank() == 0) {
            master(cfg);

            master_server srv{cfg.progname, cfg.address, cfg.daemonize,
                              fs::current_path()};

            if(cfg.output_file) {
                srv.configure_logger(logger::logger_type::file,
                                     *cfg.output_file);
            }

            return srv.run();

        } else {
            worker();
        }
+44 −52
Original line number Diff line number Diff line
@@ -29,17 +29,17 @@
#include <cargo.hpp>
#include <fmt_formatters.hpp>
#include <boost/mpi.hpp>
#include <utility>
#include "message.hpp"
#include "master.hpp"
#include "net/utilities.hpp"
#include "net/request.hpp"
#include "proto/rpc/response.hpp"

using namespace std::literals;

namespace {

std::string
get_address(auto&& req) {
    return req.get_endpoint();
}

cargo::transfer_request_message
create_request_message(const cargo::dataset& input,
                       const cargo::dataset& output) {
@@ -62,48 +62,55 @@ create_request_message(const cargo::dataset& input,

using namespace std::literals;

struct remote_procedure {
    static std::uint64_t
    new_id() {
        static std::atomic_uint64_t current_id;
        return current_id++;
master_server::master_server(std::string name, std::string address,
                             bool daemonize, std::filesystem::path rundir,
                             std::optional<std::filesystem::path> pidfile)
    : server(std::move(name), std::move(address), daemonize, std::move(rundir),
             std::move(pidfile)),
      provider(m_network_engine, 0) {

#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
    provider::define(EXPAND(transfer_datasets));

#undef EXPAND
}
};

namespace handlers {
#define RPC_NAME() ("ADM_"s + __FUNCTION__)

void
ping(const thallium::request& req) {
master_server::ping(const network::request& req) {

    const auto rpc_id = remote_procedure::new_id();
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)));
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    const auto retval = cargo::error_code{0};
    LOGGER_INFO("rpc {:>} body: {{}}", rpc);

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)), retval);
    const auto resp = generic_response{rpc.id(), cargo::error_code{0}};

    req.respond(retval);
    LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());

    req.respond(resp);
}

void
transfer_datasets(const network::request& req,
master_server::transfer_datasets(const network::request& req,
                                 const std::vector<cargo::dataset>& sources,
                                 const std::vector<cargo::dataset>& targets) {

    const auto rpc_id = remote_procedure::new_id();
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{sources: {}, targets: {}}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)), sources, targets);
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;

    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources,
                targets);

    const auto retval = cargo::error_code{0};
    const auto resp = generic_response{rpc.id(), cargo::error_code{0}};

    assert(sources.size() == targets.size());

@@ -123,23 +130,8 @@ transfer_datasets(const network::request& req,

    cargo::transfer tx{42};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, transfer: {}}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)), retval, tx);

    req.respond(retval);
}


} // namespace handlers

void
master(const config::settings& cfg) {

    network::server daemon(cfg);
    LOGGER_INFO("rpc {:<} body: {{retval: {}, transfer: {}}}", rpc,
                resp.error_code(), tx);

    daemon.set_handler("ping"s, handlers::ping);
    daemon.set_handler("transfer_datasets"s, handlers::transfer_datasets);
    daemon.run();
    req.respond(resp);
}
+20 −0
Original line number Diff line number Diff line
@@ -25,6 +25,26 @@
#ifndef CARGO_MASTER_HPP
#define CARGO_MASTER_HPP

#include "net/server.hpp"
#include "cargo.hpp"

class master_server : public network::server,
                      public network::provider<master_server> {
public:
    master_server(std::string name, std::string address, bool daemonize,
                  std::filesystem::path rundir,
                  std::optional<std::filesystem::path> pidfile = {});

private:
    void
    ping(const network::request& req);

    void
    transfer_datasets(const network::request& req,
                      const std::vector<cargo::dataset>& sources,
                      const std::vector<cargo::dataset>& targets);
};

namespace config {
struct settings;
} // namespace config
+99 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#ifndef CARGO_PROTO_RPC_RESPONSE_HPP
#define CARGO_PROTO_RPC_RESPONSE_HPP

#include <cstdint>
#include <optional>

namespace proto {

template <typename Error>
class generic_response {

public:
    constexpr generic_response() noexcept = default;

    constexpr generic_response(std::uint64_t op_id, Error&& ec) noexcept
        : m_op_id(op_id), m_error_code(ec) {}

    [[nodiscard]] constexpr std::uint64_t
    op_id() const noexcept {
        return m_op_id;
    }

    [[nodiscard]] constexpr Error
    error_code() const noexcept {
        return m_error_code;
    }

    template <typename Archive>
    constexpr void
    serialize(Archive&& ar) {
        ar & m_op_id;
        ar & m_error_code;
    }

private:
    std::uint64_t m_op_id = 0;
    Error m_error_code{};
};

template <typename Error, typename Value>
class response_with_value : public generic_response<Error> {

public:
    constexpr response_with_value() noexcept = default;

    constexpr response_with_value(std::uint64_t op_id, Error&& ec,
                                  std::optional<Value> value) noexcept
        : generic_response<Error>(op_id, ec), m_value(std::move(value)) {}

    constexpr auto
    value() const noexcept {
        return m_value.value();
    }

    constexpr auto
    has_value() const noexcept {
        return m_value.has_value();
    }

    template <typename Archive>
    constexpr void
    serialize(Archive&& ar) {
        ar(cereal::base_class<generic_response<Error>>(this), m_value);
    }

private:
    std::optional<Value> m_value;
};

template <typename Error>
using response_with_id = response_with_value<Error, std::uint64_t>;

} // namespace proto

#endif // CARGO_PROTO_RPC_RESPONSE_HPP