Verified Commit 3f65380c authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add a `request_manager` to `master_server`

parent 20dd602e
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -39,6 +39,11 @@ target_sources(
          env.hpp
          mpioxx.hpp
          message.hpp
          request.cpp
          request.hpp
          request_manager.cpp
          request_manager.hpp
          shared_mutex.hpp
          proto/rpc/response.hpp
)

+29 −24
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@
#include "net/utilities.hpp"
#include "net/request.hpp"
#include "proto/rpc/response.hpp"
#include "request.hpp"

using namespace std::literals;
namespace mpi = boost::mpi;
@@ -43,7 +44,7 @@ namespace mpi = boost::mpi;
namespace {

cargo::transfer_request
make_request(const cargo::dataset& input, const cargo::dataset& output) {
make_message(const cargo::dataset& input, const cargo::dataset& output) {

    static std::uint64_t id = 0;
    cargo::transfer_type tx_type;
@@ -152,35 +153,39 @@ master_server::transfer_datasets(const network::request& req,
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
    using proto::response_with_id;

    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources,
                targets);

    const auto resp = generic_response{rpc.id(), error_code{0}};

    m_request_manager.create(world.size() - 1, sources, targets)
            .or_else([&](auto&& ec) {
                LOGGER_ERROR("Failed to create request: {}", ec);
                LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);
                req.respond(generic_response{rpc.id(), ec});
            })
            .map([&](auto&& r) {
                assert(sources.size() == targets.size());

    mpi::communicator world;
                for(auto i = 0u; i < sources.size(); ++i) {
                    const auto& s = sources[i];
                    const auto& d = targets[i];

        const auto& input_path = sources[i].path();
        const auto& output_path = targets[i].path();

        const auto m = ::make_request(sources[i], targets[i]);

        for(int rank = 1; rank < world.size(); ++rank) {
            world.send(rank, static_cast<int>(tag::transfer), m);
                    for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) {
                        world.send(static_cast<int>(rank),
                                   static_cast<int>(tag::transfer),
                                   make_message(s, d));
                    }
                }

    transfer tx{42};

    LOGGER_INFO("rpc {:<} body: {{retval: {}, transfer: {}}}", rpc,
                resp.error_code(), tx);

    req.respond(resp);
                LOGGER_INFO("rpc {:<} body: {{retval: {}, transfer_id: {}}}",
                            rpc, error_code::success, r.id());
                req.respond(response_with_id{rpc.id(), error_code::success,
                                             r.id()});
            });
}

} // namespace cargo
+3 −0
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@

#include "net/server.hpp"
#include "cargo.hpp"
#include "request_manager.hpp"

namespace cargo {

@@ -56,6 +57,8 @@ private:
    thallium::managed<thallium::xstream> m_mpi_listener_ess;
    // ULT for the MPI listener
    thallium::managed<thallium::thread> m_mpi_listener_ult;
    // Request manager
    request_manager m_request_manager;
};

} // namespace cargo

src/request.cpp

0 → 100644
+43 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include "cargo.hpp"
#include "request.hpp"

namespace cargo {

request::request(std::uint64_t tid, std::size_t nworkers)
    : m_tid(tid), m_nworkers(nworkers) {}

[[nodiscard]] std::uint64_t
request::tid() const {
    return m_tid;
}

[[nodiscard]] std::size_t
request::nworkers() const {
    return m_nworkers;
}

} // namespace cargo
 No newline at end of file

src/request.hpp

0 → 100644
+58 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#ifndef CARGO_REQUEST_HPP
#define CARGO_REQUEST_HPP

#include <cstdint>
#include <vector>

namespace cargo {

class dataset;

class request {

public:
    request(std::uint64_t id, std::size_t nworkers);

    [[nodiscard]] std::uint64_t
    tid() const;

    [[nodiscard]] std::size_t
    nworkers() const;

private:
    /** Unique identifier for the request */
    std::uint64_t m_tid;
    /** Number of workers to be used for the request */
    std::size_t m_nworkers;
};

enum class part_status { pending, running, completed, failed };
enum class request_status { pending, running, completed, failed };

} // namespace cargo

#endif // CARGO_REQUEST_HPP
Loading