Commit 577a01a1 authored by Ramon Nou's avatar Ramon Nou
Browse files

Add statuses (WIP)

parent bd0b9cc3
Loading
Loading
Loading
Loading
Loading
+24 −0
Original line number Diff line number Diff line
@@ -125,6 +125,15 @@ public:
    status() const;


    /**
     * @brief Get all the statuses of the associated transfer.
     * 
     * @return std::vector<transfer_status> 
     */
    [[nodiscard]] std::vector<transfer_status>
    statuses() const;


    /**
     * @brief updates the bw control of the transfer
     * 
@@ -165,9 +174,21 @@ class transfer_status {
    friend transfer_status
    transfer::status() const;
    

    transfer_status(transfer_state status, float bw, error_code error) noexcept;

public:

    transfer_status(std::string name, transfer_state status, float bw, error_code error) noexcept;
    
    /**
     * Get the name of the associated dataset.
     *
     * @return The name of the dataset.
     */
    [[nodiscard]] std::string
    name() const noexcept;

    /**
     * Get the current status of the associated transfer.
     *
@@ -207,11 +228,14 @@ public:
    bw() const;

private:
    std::string m_name;
    transfer_state m_state;
    float m_bw;
    error_code m_error;
};



/**
 * Request the transfer of a dataset collection.
 *
+57 −1
Original line number Diff line number Diff line
@@ -133,6 +133,53 @@ transfer::status() const {
    throw std::runtime_error("rpc lookup failed");
}

std::vector<transfer_status>
transfer::statuses() const {
    using proto::statuses_response;

    network::client rpc_client{m_srv.protocol()};
    const auto rpc =
            network::rpc_info::create("transfer_statuses", m_srv.address());

    using response_type =
            statuses_response<std::string, transfer_state, float, error_code>;

    if(const auto lookup_rv = rpc_client.lookup(m_srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc {:<} body: {{tid: {}}}", rpc, m_id);

        if(const auto call_rv = endp.call(rpc.name(), m_id);
           call_rv.has_value()) {

            const response_type resp{call_rv.value()};
            const auto& v = resp.value();

            LOGGER_EVAL(resp.error_code(), ERROR, INFO,
                        "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc,
                        resp.error_code(), resp.op_id());

            if(resp.error_code()) {
                throw std::runtime_error(
                        fmt::format("rpc call failed: {}", resp.error_code()));
            }
            // convert vector of tuples to vector of transfer_status
            // (for some reason it asks for a public constructor)

            std::vector<transfer_status> v_statuses;
            for(const auto& [name, s, bw, ec] : v) {
                v_statuses.emplace_back(transfer_status{
                        name, s, bw, ec.value_or(error_code::success)});
            }

            return v_statuses;
        }
    }

    throw std::runtime_error("rpc lookup failed");
}

void
transfer::bw_control(std::int16_t bw_control) const {

@@ -170,13 +217,22 @@ transfer::bw_control(std::int16_t bw_control) const {

transfer_status::transfer_status(transfer_state status, float bw,
                                 error_code error) noexcept
    : m_state(status), m_bw(bw), m_error(error) {}
    : m_name(""), m_state(status), m_bw(bw), m_error(error) {}

transfer_status::transfer_status(std::string name, transfer_state status,
                                 float bw, error_code error) noexcept
    : m_name(name), m_state(status), m_bw(bw), m_error(error) {}

transfer_state
transfer_status::state() const noexcept {
    return m_state;
}

std::string
transfer_status::name() const noexcept {
    return m_name;
}

bool
transfer_status::done() const noexcept {
    return m_state == transfer_state::completed;
+5 −0
Original line number Diff line number Diff line
@@ -109,6 +109,11 @@ using status_response =
        response_with_value<std::tuple<Status, Bw, std::optional<Error>>,
                            Error>;

template <typename Name, typename Status, typename Bw, typename Error>
using statuses_response =
        response_with_value<std::vector< std::tuple<Name, Status, Bw, std::optional<Error> > >,
                            Error>;

} // namespace cargo::proto

#endif // CARGO_PROTO_RPC_RESPONSE_HPP