Commit 2ceeee55 authored by Ramon Nou's avatar Ramon Nou
Browse files

BW Shaping workflow

parent 7436c150
Loading
Loading
Loading
Loading
+18 −1
Original line number Diff line number Diff line
@@ -84,7 +84,24 @@ target_link_libraries(ccp
    cargo
)

install(TARGETS cargo_ping cargo_shutdown ccp
################################################################################
## shaping: A CLI tool to request a Cargo server to slowdown transfers 
add_executable(shaping)

target_sources(shaping
  PRIVATE
    shaping.cpp
)

target_link_libraries(shaping
  PUBLIC
    fmt::fmt
    CLI11::CLI11
    net::rpc_client
    cargo
)

install(TARGETS cargo_ping cargo_shutdown ccp shaping
        RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)

cli/shaping.cpp

0 → 100644
+114 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of Cargo.
 *
 * Cargo is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Cargo is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Cargo.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

#include <fmt/format.h>
#include <cargo.hpp>
#include <filesystem>
#include <CLI/CLI.hpp>
#include <net/client.hpp>
#include <net/endpoint.hpp>

struct shaping_config {
    std::string progname;
    std::string server_address;
    std::int64_t tid;
    std::int16_t shaping;
};

shaping_config
parse_command_line(int argc, char* argv[]) {

    shaping_config cfg;

    cfg.progname = std::filesystem::path{argv[0]}.filename().string();

    CLI::App app{"Cargo shaping client", cfg.progname};

    app.add_option("-s,--server", cfg.server_address, "Server address")
            ->option_text("ADDRESS")
            ->required();

    app.add_option("-i,--tid", cfg.tid, "transfer id")
            ->option_text("integer")
            ->required();

    app.add_option("-b,--bw", cfg.shaping, "bw shaping")
            ->option_text("integer")
            ->required();


    try {
        app.parse(argc, argv);
        return cfg;
    } catch(const CLI::ParseError& ex) {
        std::exit(app.exit(ex));
    }
}

auto
parse_address(const std::string& address) {
    const auto pos = address.find("://");
    if(pos == std::string::npos) {
        throw std::runtime_error(fmt::format("Invalid address: {}", address));
    }

    const auto protocol = address.substr(0, pos);
    return std::make_pair(protocol, address);
}


int
main(int argc, char* argv[]) {

    shaping_config cfg = parse_command_line(argc, argv);

    try {
        const auto [protocol, address] = parse_address(cfg.server_address);
        network::client rpc_client{protocol};

        if(const auto result = rpc_client.lookup(address); result.has_value()) {
            const auto& endpoint = result.value();
            const auto retval = endpoint.call("bw_shaping", cfg.tid, cfg.shaping);

            if(retval.has_value()) {

                auto error_code = int{retval.value()};

                fmt::print("bw_shaping RPC was successful!\n");
                fmt::print("  (server replied with: {})\n", error_code);
                return EXIT_SUCCESS;
            }

            fmt::print(stderr, "bw_shaping RPC failed\n");
            return EXIT_FAILURE;

        } else {
            fmt::print(stderr, "Failed to lookup address: {}\n", address);
            return EXIT_FAILURE;
        }
    } catch(const std::exception& ex) {
        fmt::print(stderr, "Error: {}\n", ex.what());
        return EXIT_FAILURE;
    }
}
+8 −3
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@
#ifndef CARGO_HPP
#define CARGO_HPP

#include <cstdint>
#include <string>
#include <vector>
#include <chrono>
@@ -149,7 +150,7 @@ class transfer_status {
    friend transfer_status
    transfer::status() const;

    transfer_status(transfer_state status, error_code error) noexcept;
    transfer_status(transfer_state status, float bw, error_code error) noexcept;

public:
    /**
@@ -187,8 +188,12 @@ public:
    [[nodiscard]] error_code
    error() const;

    [[nodiscard]] float
    bw() const;

private:
    transfer_state m_state;
    float m_bw;
    error_code m_error;
};

+10 −5
Original line number Diff line number Diff line
@@ -93,7 +93,7 @@ transfer::status() const {
    network::client rpc_client{m_srv.protocol()};
    const auto rpc =
            network::rpc_info::create("transfer_status", m_srv.address());
    using response_type = status_response<transfer_state, error_code>;
    using response_type = status_response<transfer_state, float, error_code>;

    if(const auto lookup_rv = rpc_client.lookup(m_srv.address());
       lookup_rv.has_value()) {
@@ -105,7 +105,7 @@ transfer::status() const {
           call_rv.has_value()) {

            const response_type resp{call_rv.value()};
            const auto& [s, ec] = resp.value();
            const auto& [s, bw, ec] = resp.value();

            LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                        "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc,
@@ -116,16 +116,16 @@ transfer::status() const {
                        fmt::format("rpc call failed: {}", resp.error_code()));
            }

            return transfer_status{s, ec.value_or(error_code::success)};
            return transfer_status{s, bw, ec.value_or(error_code::success)};
        }
    }

    throw std::runtime_error("rpc lookup failed");
}

transfer_status::transfer_status(transfer_state status,
transfer_status::transfer_status(transfer_state status, float bw,
                                 error_code error) noexcept
    : m_state(status), m_error(error) {}
    : m_state(status), m_bw(bw), m_error(error) {}

transfer_state
transfer_status::state() const noexcept {
@@ -142,6 +142,11 @@ transfer_status::failed() const noexcept {
    return m_state == transfer_state::failed;
}

float
transfer_status::bw() const {
    return m_bw;
}

error_code
transfer_status::error() const {
    switch(m_state) {
+32 −5
Original line number Diff line number Diff line
@@ -85,6 +85,7 @@ master_server::master_server(std::string name, std::string address,
    provider::define(EXPAND(shutdown));
    provider::define(EXPAND(transfer_datasets));
    provider::define(EXPAND(transfer_status));
    provider::define(EXPAND(bw_shaping));

#undef EXPAND

@@ -125,7 +126,7 @@ master_server::mpi_listener_ult() {
                            msg->source(), m);

                m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1,
                                         m.state(), m.error_code());
                                         m.state(), m.bw(), m.error_code());
                break;
            }

@@ -169,6 +170,32 @@ master_server::ping(const network::request& req) {
    req.respond(resp);
}


void
master_server::bw_shaping(const network::request& req, std::uint64_t tid,
                          std::int16_t shaping) {
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    LOGGER_INFO("rpc {:>} body: {{tid: {}, shaping: {}}}", rpc, tid, shaping);

    for(int rank = 1; rank < world.size(); ++rank) {
        const auto m = cargo::shaper_message{tid, shaping};
        LOGGER_INFO("msg <= to: {} body: {}", rank, m);
        world.send(static_cast<int>(rank), static_cast<int>(tag::bw_shaping),
                   m);
    }

    const auto resp = generic_response{rpc.id(), error_code::success};

    LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());

    req.respond(resp);
}

void
master_server::shutdown(const network::request& req) {
    using network::get_address;
@@ -232,7 +259,7 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
    using proto::status_response;

    using response_type =
            status_response<cargo::transfer_state, cargo::error_code>;
            status_response<cargo::transfer_state, float, cargo::error_code>;

    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
@@ -248,9 +275,9 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
            .map([&](auto&& rs) {
                LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc,
                            error_code::success, rs);
                req.respond(
                        response_type{rpc.id(), error_code::success,
                                      std::make_pair(rs.state(), rs.error())});
                req.respond(response_type{
                        rpc.id(), error_code::success,
                        std::make_tuple(rs.state(), rs.bw(), rs.error())});
            });
}

Loading