Commit c93ace29 authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'rnou/38-increase-transfer-status-to-see-the-status-of-all-the-files' into 'main'

Resolve "Increase transfer status to see the status of all the files"

Closes #38

See merge request !26
parents bd0b9cc3 4c8cb068
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19)

project(
  cargo
  VERSION 0.3.3
  VERSION 0.3.4
  LANGUAGES C CXX
)

+7 −3
Original line number Diff line number Diff line
@@ -185,9 +185,13 @@ There are a few utility command line programs that can be used to interact with
cli/ccp --server ofi+tcp://127.0.0.1:62000 --input /directory/subdir --output /directorydst/subdirdst --if <method> --of <method> 
```
`--input` and `--output` are required arguments, and can be a directory or a file path.
`--if` and `--of`select the specific transfer method, on V0.3.2 there are only to possibilities:
`--if` and `--of`select the specific transfer method, on V0.4.0 there are many combinations:

`--if mpio`  (It will read in parallel from i.e. lustre using MPI, and write using posix calls.)
`--of mpio`  (It will read using posix calls, and write using MPI (i.e. to lustre))
`--if or --of` can be: posix, gekkofs, hercules, dataclay, expand and parallel (for MPIIO requests, but only one side is allowed).

Typically you should use posix or parallel and then one specialized adhocfs. Posix is also able to be used with LD_PRELOAD, however
higher performance and flexibility can be obtained using the specific configuration.

On the other hand, MPIIO (parallel) uses normally file locking so there is a performance imapact, and posix is faster (we supose no external modifications are done).

Other commands are `ping`, `shutdown` and `shaping` (for bw control).
 No newline at end of file
+24 −0
Original line number Diff line number Diff line
@@ -125,6 +125,15 @@ public:
    status() const;


    /**
     * @brief Get all the statuses of the associated transfer.
     * 
     * @return std::vector<transfer_status> 
     */
    [[nodiscard]] std::vector<transfer_status>
    statuses() const;


    /**
     * @brief updates the bw control of the transfer
     * 
@@ -165,9 +174,21 @@ class transfer_status {
    friend transfer_status
    transfer::status() const;
    

    transfer_status(transfer_state status, float bw, error_code error) noexcept;

public:

    transfer_status(std::string name, transfer_state status, float bw, error_code error) noexcept;
    
    /**
     * Get the name of the associated dataset.
     *
     * @return The name of the dataset.
     */
    [[nodiscard]] std::string
    name() const noexcept;

    /**
     * Get the current status of the associated transfer.
     *
@@ -207,11 +228,14 @@ public:
    bw() const;

private:
    std::string m_name;
    transfer_state m_state;
    float m_bw;
    error_code m_error;
};



/**
 * Request the transfer of a dataset collection.
 *
+57 −1
Original line number Diff line number Diff line
@@ -133,6 +133,53 @@ transfer::status() const {
    throw std::runtime_error("rpc lookup failed");
}

std::vector<transfer_status>
transfer::statuses() const {
    using proto::statuses_response;

    network::client rpc_client{m_srv.protocol()};
    const auto rpc =
            network::rpc_info::create("transfer_statuses", m_srv.address());

    using response_type =
            statuses_response<std::string, transfer_state, float, error_code>;

    if(const auto lookup_rv = rpc_client.lookup(m_srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc {:<} body: {{tid: {}}}", rpc, m_id);

        if(const auto call_rv = endp.call(rpc.name(), m_id);
           call_rv.has_value()) {

            const response_type resp{call_rv.value()};
            const auto& v = resp.value();

            LOGGER_EVAL(resp.error_code(), ERROR, INFO,
                        "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc,
                        resp.error_code(), resp.op_id());

            if(resp.error_code()) {
                throw std::runtime_error(
                        fmt::format("rpc call failed: {}", resp.error_code()));
            }
            // convert vector of tuples to vector of transfer_status
            // (for some reason it asks for a public constructor)

            std::vector<transfer_status> v_statuses;
            for(const auto& [name, s, bw, ec] : v) {
                v_statuses.emplace_back(transfer_status{
                        name, s, bw, ec.value_or(error_code::success)});
            }

            return v_statuses;
        }
    }

    throw std::runtime_error("rpc lookup failed");
}

void
transfer::bw_control(std::int16_t bw_control) const {

@@ -170,13 +217,22 @@ transfer::bw_control(std::int16_t bw_control) const {

transfer_status::transfer_status(transfer_state status, float bw,
                                 error_code error) noexcept
    : m_state(status), m_bw(bw), m_error(error) {}
    : m_name(""), m_state(status), m_bw(bw), m_error(error) {}

transfer_status::transfer_status(std::string name, transfer_state status,
                                 float bw, error_code error) noexcept
    : m_name(name), m_state(status), m_bw(bw), m_error(error) {}

transfer_state
transfer_status::state() const noexcept {
    return m_state;
}

std::string
transfer_status::name() const noexcept {
    return m_name;
}

bool
transfer_status::done() const noexcept {
    return m_state == transfer_state::completed;
+1 −1
Original line number Diff line number Diff line
@@ -39,7 +39,7 @@ class Cargo(CMakePackage):
    version("0.3.1", sha256="613485354e24c4b97cb6d045657569f94dc1d9bbb391b5a166f8d18b3595428b")
    version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960")
    version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd")
    version("0.3.4", branch="rnou/fallocate")
    
    # build variants
    variant('build_type',
            default='Release',
Loading