Commit 4a84a83c authored by Ramon Nou's avatar Ramon Nou
Browse files

FTIO interface created

parent a362795c
Loading
Loading
Loading
Loading
Loading
+20 −1
Original line number Diff line number Diff line
@@ -101,7 +101,26 @@ target_link_libraries(shaping
    cargo
)

install(TARGETS cargo_ping cargo_shutdown ccp shaping

################################################################################
## ftio: A CLI tool to send the ftio info to a Cargo server 
add_executable(cargo_ftio)

target_sources(cargo_ftio
  PRIVATE
    ftio.cpp
)

target_link_libraries(cargo_ftio
  PUBLIC
    fmt::fmt
    CLI11::CLI11
    net::rpc_client
    cargo
)


install(TARGETS cargo_ping cargo_shutdown ccp shaping cargo_ftio
        RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)

cli/ftio.cpp

0 → 100644
+114 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <fmt/format.h>
#include <cargo.hpp>
#include <filesystem>
#include <CLI/CLI.hpp>
#include <net/client.hpp>
#include <net/endpoint.hpp>

struct ftio_config {
    std::string progname;
    std::string server_address;
    float confidence;
    float probability;
};

ftio_config
parse_command_line(int argc, char* argv[]) {

    ftio_config cfg;

    cfg.progname = std::filesystem::path{argv[0]}.filename().string();

    CLI::App app{"Cargo ftio client", cfg.progname};

    app.add_option("-s,--server", cfg.server_address, "Server address")
            ->option_text("ADDRESS")
            ->required();

    app.add_option("-c,--conf", cfg.confidence, "confidence")
            ->option_text("float")
            ->required();

    app.add_option("-p,--probability", cfg.probability, "probability")
            ->option_text("float")
            ->default_str("-1.0");


    try {
        app.parse(argc, argv);
        return cfg;
    } catch(const CLI::ParseError& ex) {
        std::exit(app.exit(ex));
    }
}

auto
parse_address(const std::string& address) {
    const auto pos = address.find("://");
    if(pos == std::string::npos) {
        throw std::runtime_error(fmt::format("Invalid address: {}", address));
    }

    const auto protocol = address.substr(0, pos);
    return std::make_pair(protocol, address);
}


int
main(int argc, char* argv[]) {

    ftio_config cfg = parse_command_line(argc, argv);

    try {
        const auto [protocol, address] = parse_address(cfg.server_address);
        network::client rpc_client{protocol};

        if(const auto result = rpc_client.lookup(address); result.has_value()) {
            const auto& endpoint = result.value();
            const auto retval = endpoint.call("ftio_int", cfg.confidence, cfg.probability);

            if(retval.has_value()) {

                auto error_code = int{retval.value()};

                fmt::print("ftio_int RPC was successful!\n");
                fmt::print("  (server replied with: {})\n", error_code);
                return EXIT_SUCCESS;
            }

            fmt::print(stderr, "ftio_int RPC failed\n");
            return EXIT_FAILURE;

        } else {
            fmt::print(stderr, "Failed to lookup address: {}\n", address);
            return EXIT_FAILURE;
        }
    } catch(const std::exception& ex) {
        fmt::print(stderr, "Error: {}\n", ex.what());
        return EXIT_FAILURE;
    }
}
+65 −2
Original line number Diff line number Diff line
@@ -88,7 +88,12 @@ master_server::master_server(std::string name, std::string address,
      provider(m_network_engine, 0),
      m_mpi_listener_ess(thallium::xstream::create()),
      m_mpi_listener_ult(m_mpi_listener_ess->make_thread(
              [this]() { mpi_listener_ult(); })) {
              [this]() { mpi_listener_ult(); })),
      m_ftio_listener_ess(thallium::xstream::create()),
      m_ftio_listener_ult(m_ftio_listener_ess->make_thread(
              [this]() { ftio_scheduling_ult(); }))

{

#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
@@ -97,6 +102,7 @@ master_server::master_server(std::string name, std::string address,
    provider::define(EXPAND(transfer_status));
    provider::define(EXPAND(bw_control));
    provider::define(EXPAND(transfer_statuses));
    provider::define(EXPAND(ftio_int));

#undef EXPAND

@@ -110,6 +116,10 @@ master_server::master_server(std::string name, std::string address,
        m_mpi_listener_ult = thallium::managed<thallium::thread>{};
        m_mpi_listener_ess->join();
        m_mpi_listener_ess = thallium::managed<thallium::xstream>{};
        m_ftio_listener_ult->join();
        m_ftio_listener_ult = thallium::managed<thallium::thread>{};
        m_ftio_listener_ess->join();
        m_ftio_listener_ess = thallium::managed<thallium::xstream>{};
    });
}

@@ -163,6 +173,28 @@ master_server::mpi_listener_ult() {
    LOGGER_INFO("Exit");
}


void
master_server::ftio_scheduling_ult() {


    while(!m_shutting_down) {

        std::this_thread::sleep_for(1000ms);
        // thallium::thread::self().sleep(m_network_engine, 10);


        // Do something with the confidence and probability
        if (ftio_changed) {
            ftio_changed = false;
        LOGGER_INFO("Confidence is {}, probability is {}", confidence,
                    probability);
        }
    }

    LOGGER_INFO("Shutting down.");
}

#define RPC_NAME() (__FUNCTION__)

void
@@ -417,4 +449,35 @@ master_server::transfer_statuses(const network::request& req,
            });
}


void
master_server::ftio_int(const network::request& req, float conf,
                        float prob) {
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    confidence = conf;
    probability = prob;
    ftio_changed = true;
    LOGGER_INFO("rpc {:>} body: {{confidence: {}, probability: {}}}", rpc,
                conf, prob);

    // do the magic here

    // 1. Update the confidence and probability values inside cargo

    // Scheduling thread should be running and waiting for them

    //

    const auto resp = generic_response{rpc.id(), error_code::success};

    LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());

    req.respond(resp);
}

} // namespace cargo
+15 −0
Original line number Diff line number Diff line
@@ -44,6 +44,9 @@ private:
    void
    mpi_listener_ult();

    void
    ftio_scheduling_ult();

    void
    ping(const network::request& req);

@@ -66,11 +69,23 @@ private:
    void
    bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping);


    void 
    ftio_int(const network::request& req, float confidence, float probability);

private:
    // Dedicated execution stream for the MPI listener ULT
    thallium::managed<thallium::xstream> m_mpi_listener_ess;
    // ULT for the MPI listener
    thallium::managed<thallium::thread> m_mpi_listener_ult;
    // Dedicated execution stream for the ftio scheduler
    thallium::managed<thallium::xstream> m_ftio_listener_ess;
    // ULT for the ftio scheduler
    thallium::managed<thallium::thread> m_ftio_listener_ult;
    // FTIO decision values (below 0, implies not used)
    float confidence = -1.0f;
    float probability = -1.0f;
    bool ftio_changed = true;
    // Request manager
    request_manager m_request_manager;
};