Verified Commit 79a38579 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

WIP: [scord] Add transfer manager and initiate transfers in Cargo

parent 11f5a5a5
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -26,7 +26,7 @@
add_executable(scord)

target_sources(scord PRIVATE scord.cpp
  job_manager.hpp adhoc_storage_manager.hpp
  job_manager.hpp adhoc_storage_manager.hpp transfer_manager.hpp
  pfs_storage_manager.hpp ${CMAKE_CURRENT_BINARY_DIR}/defaults.hpp
  internal_types.hpp internal_types.cpp rpc_server.hpp rpc_server.cpp)

@@ -51,6 +51,7 @@ target_link_libraries(
  CLI11::CLI11
  RedisPlusPlus::RedisPlusPlus
  ryml::ryml
  cargo::cargo
)

install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR})
+5 −0
Original line number Diff line number Diff line
@@ -79,6 +79,11 @@ adhoc_storage_metadata::controller_address() const {
    return m_adhoc_storage.context().controller_address();
}

std::string const&
adhoc_storage_metadata::data_stager_address() const {
    return m_adhoc_storage.context().data_stager_address();
}

void
adhoc_storage_metadata::update(scord::adhoc_storage::resources new_resources) {
    m_adhoc_storage.update(std::move(new_resources));
+41 −0
Original line number Diff line number Diff line
@@ -81,6 +81,9 @@ struct adhoc_storage_metadata {
    std::string const&
    controller_address() const;

    std::string const&
    data_stager_address() const;

    void
    update(scord::adhoc_storage::resources new_resources);

@@ -114,6 +117,44 @@ struct pfs_storage_metadata {
    std::shared_ptr<scord::internal::job_metadata> m_client_info;
};

template <typename TransferHandle>
struct transfer_metadata {
    transfer_metadata(transfer_id id, TransferHandle&& handle,
                      std::vector<scord::qos::limit> qos)
        : m_id(id), m_handle(handle), m_qos(std::move(qos)) {}

    transfer_id
    id() const {
        return m_id;
    }

    TransferHandle
    transfer() const {
        return m_handle;
    }

    std::vector<scord::qos::limit> const&
    qos() const {
        return m_qos;
    }

    float
    measured_bandwidth() const {
        return m_measured_bandwidth;
    }

    void
    update(float bandwidth) {
        m_measured_bandwidth = bandwidth;
    }

    transfer_id m_id;
    TransferHandle m_handle;
    std::vector<scord::qos::limit> m_qos;
    float m_measured_bandwidth = -1.0;
};


} // namespace scord::internal

#endif // SCORD_INTERNAL_TYPES_HPP
+63 −7
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@
#include <net/endpoint.hpp>
#include <net/serialization.hpp>
#include <net/utilities.hpp>
#include <cargo/cargo.hpp>
#include "rpc_server.hpp"

template <typename T, typename E>
@@ -611,17 +612,72 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id,
                "limits: {}, mapping: {}}}",
                rpc, job_id, sources, targets, limits, mapping);

    scord::error_code ec;
    const auto jm_result = m_job_manager.find(job_id);

    if(!jm_result) {
        LOGGER_ERROR("rpc id: {} error_msg: \"Error finding job: {}\"",
                     rpc.id(), job_id);
        const auto resp = response_with_id{rpc.id(), jm_result.error()};
        LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());
        req.respond(resp);
        return;
    }

    const auto& job_metadata_ptr = jm_result.value();

    if(!job_metadata_ptr->adhoc_storage_metadata()) {
        LOGGER_ERROR("rpc id: {} error_msg: \"Job has no adhoc storage\"",
                     rpc.id(), job_id);
        const auto resp = response_with_id{rpc.id(), error_code::no_resources};
        LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code());
        req.respond(resp);
        return;
    }

    const auto data_stager_address =
            job_metadata_ptr->adhoc_storage_metadata()->data_stager_address();

    std::optional<std::uint64_t> tx_id;
    // Transform the `scord::dataset`s into `cargo::dataset`s and contact the
    // Cargo service associated with the job's adhoc storage instance to
    // execute the transfers.
    cargo::server srv{data_stager_address};

    // TODO: generate a global ID for the transfer and contact Cargo to
    // actually request it
    tx_id = 42;
    std::vector<cargo::dataset> inputs;
    std::vector<cargo::dataset> outputs;

    const auto resp = response_with_id{rpc.id(), ec, tx_id};
    // TODO: check type of storage tier to enable parallel transfers
    std::transform(sources.cbegin(), sources.cend(), std::back_inserter(inputs),
                   [](const auto& src) { return cargo::dataset{src.id()}; });

    LOGGER_INFO("rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, ec, tx_id);
    std::transform(targets.cbegin(), targets.cend(),
                   std::back_inserter(outputs),
                   [](const auto& tgt) { return cargo::dataset{tgt.id()}; });

    const auto cargo_tx = cargo::transfer_datasets(srv, inputs, outputs);

    // Register the transfer into the `tranfer_manager`.
    // We embed the generated `cargo::transfer` object into
    // scord's `transfer_metadata` so that we can later query the Cargo
    // service for the transfer's status.
    const auto rv =
            m_transfer_manager.create(cargo_tx, limits)
                    .or_else([&](auto&& ec) {
                        LOGGER_ERROR("rpc id: {} error_msg: \"Error creating "
                                     "transfer: {}\"",
                                     rpc.id(), ec);
                    })
                    .and_then([&](auto&& transfer_metadata_ptr)
                                      -> tl::expected<transfer_id, error_code> {
                        return transfer_metadata_ptr->id();
                    });

    const auto resp =
            rv ? response_with_id{rpc.id(), error_code::success, rv.value()}
               : response_with_id{rpc.id(), rv.error()};

    LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc,
                resp.error_code(), resp.value_or_none());

    req.respond(resp);
}
+6 −0
Original line number Diff line number Diff line
@@ -31,6 +31,11 @@
#include "job_manager.hpp"
#include "adhoc_storage_manager.hpp"
#include "pfs_storage_manager.hpp"
#include "transfer_manager.hpp"

namespace cargo {
class transfer;
}

namespace scord {

@@ -103,6 +108,7 @@ private:
    job_manager m_job_manager;
    adhoc_storage_manager m_adhoc_manager;
    pfs_storage_manager m_pfs_manager;
    transfer_manager<cargo::transfer> m_transfer_manager;
};

} // namespace scord
Loading