Verified Commit 57dd9eb6 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

scord: Rewrite `ADM_transfer_datasets` RPC

parent 9b32e40e
Loading
Loading
Loading
Loading
+21 −0
Original line number Diff line number Diff line
@@ -224,6 +224,7 @@ struct transfer {
        n_to_n = ADM_MAPPING_N_TO_N
    };

    transfer();
    explicit transfer(transfer_id id);
    explicit transfer(ADM_transfer_t transfer);

@@ -239,6 +240,12 @@ struct transfer {
    transfer_id
    id() const;

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
@@ -260,6 +267,7 @@ enum class scope : std::underlying_type<ADM_qos_scope_t>::type {

struct entity {

    entity();
    template <typename T>
    entity(admire::qos::scope s, T&& data);
    explicit entity(ADM_qos_entity_t entity);
@@ -280,6 +288,12 @@ struct entity {
    T
    data() const;

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
@@ -287,6 +301,7 @@ private:

struct limit {

    limit();
    limit(admire::qos::subclass cls, uint64_t value);
    limit(admire::qos::subclass cls, uint64_t value,
          const admire::qos::entity& e);
@@ -310,6 +325,12 @@ struct limit {
    uint64_t
    value() const;

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
+129 −0
Original line number Diff line number Diff line
@@ -1308,6 +1308,7 @@ job::serialize<thallium::proc_output_archive<>>(
class transfer::impl {

public:
    impl() = default;
    explicit impl(transfer_id id) : m_id(id) {}

    impl(const impl& rhs) = default;
@@ -1322,10 +1323,24 @@ public:
        return m_id;
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_id));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_id));
    }

private:
    transfer_id m_id;
};

transfer::transfer() = default;

transfer::transfer(transfer_id id)
    : m_pimpl(std::make_unique<transfer::impl>(id)) {}

@@ -1353,6 +1368,32 @@ transfer::id() const {
    return m_pimpl->id();
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
transfer::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
transfer::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
transfer::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
transfer::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
transfer::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

class dataset::impl {
public:
    impl() = default;
@@ -1965,6 +2006,8 @@ namespace qos {

class entity::impl {
public:
    impl() = default;

    template <typename T>
    impl(const admire::qos::scope& s, T&& data) : m_scope(s), m_data(data) {}

@@ -1990,6 +2033,20 @@ public:
        return std::get<T>(m_data);
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_scope));
        ar(SCORD_SERIALIZATION_NVP(m_data));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_scope));
        ar(SCORD_SERIALIZATION_NVP(m_data));
    }

private:
    static std::variant<dataset, node, job, transfer>
    init_helper(ADM_qos_entity_t entity) {
@@ -2014,6 +2071,8 @@ private:
    std::variant<dataset, node, job, transfer> m_data;
};

entity::entity() = default;

template <typename T>
entity::entity(admire::qos::scope s, T&& data)
    : m_pimpl(std::make_unique<entity::impl>(s, std::forward<T>(data))) {}
@@ -2066,10 +2125,36 @@ entity::data<admire::transfer>() const {
    return m_pimpl->data<admire::transfer>();
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
entity::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
entity::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
entity::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
entity::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
entity::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

class limit::impl {

public:
    impl() = default;
    impl(admire::qos::subclass cls, uint64_t value, admire::qos::entity e)
        : m_subclass(cls), m_value(value), m_entity(std::move(e)) {}

@@ -2104,12 +2189,30 @@ public:
        return m_value;
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_subclass));
        ar(SCORD_SERIALIZATION_NVP(m_value));
        ar(SCORD_SERIALIZATION_NVP(m_entity));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_subclass));
        ar(SCORD_SERIALIZATION_NVP(m_value));
        ar(SCORD_SERIALIZATION_NVP(m_entity));
    }

private:
    admire::qos::subclass m_subclass;
    uint64_t m_value;
    std::optional<admire::qos::entity> m_entity;
};

limit::limit() = default;

limit::limit(admire::qos::subclass cls, uint64_t value)
    : m_pimpl(std::make_unique<limit::impl>(cls, value)) {}

@@ -2150,6 +2253,32 @@ limit::value() const {
    return m_pimpl->value();
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
limit::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
limit::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
limit::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
limit::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
limit::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

} // namespace qos

} // namespace admire
+1 −0
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@

#include <cereal/cereal.hpp>
#include <cereal/types/optional.hpp>
#include <cereal/types/variant.hpp>
#include <cereal/types/memory.hpp>
#include <thallium/serialization/proc_input_archive.hpp>
#include <thallium/serialization/proc_output_archive.hpp>
+33 −47
Original line number Diff line number Diff line
@@ -611,58 +611,44 @@ transfer_datasets(const server& srv, const job& job,
                  const std::vector<qos::limit>& limits,
                  transfer::mapping mapping) {

    (void) srv;
    (void) job;
    (void) sources;
    (void) targets;
    (void) limits;
    (void) mapping;

    return tl::make_unexpected(admire::error_code::snafu);

#if 0
    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
    scord::network::client rpc_client{srv.protocol()};

    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO(
            "rpc id: {} name: {} from: {} => "
            "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
            rpc_id, std::quoted("ADM_"s + __FUNCTION__),
            std::quoted(rpc_client.self_address()), job, sources, targets,
            limits, mapping);
    if(const auto& lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

    const auto rpc_job = api::convert(job);
    const auto rpc_sources = api::convert(sources);
    const auto rpc_targets = api::convert(targets);
    const auto rpc_qos_limits = api::convert(limits);
        LOGGER_INFO("rpc id: {} name: {} from: {} => "
                    "body: {{job_id: {}, sources: {}, targets: {}, limits: {}, "
                    "mapping: {}}}",
                    rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                    std::quoted(rpc_client.self_address()), job.id(), sources,
                    targets, limits, mapping);

    ADM_transfer_datasets_in_t in{rpc_job.get(), rpc_sources.get(),
                                  rpc_targets.get(), rpc_qos_limits.get(),
                                  static_cast<ADM_transfer_mapping_t>(mapping)};
    ADM_transfer_datasets_out_t out;
        if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id(),
                                           sources, targets, limits, mapping);
           call_rv.has_value()) {

    [[maybe_unused]] const auto rpc =
            endp.call("ADM_transfer_datasets", &in, &out);
            const scord::network::response_with_id resp{call_rv.value()};

    if(const auto rv = admire::error_code{out.retval}; !rv) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {{retval: {}}} [op_id: {}]",
            LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                        "rpc id: {} name: {} from: {} <= "
                        "body: {{retval: {}, tx_id: {}}} [op_id: {}]",
                        rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), rv, out.op_id);
        return tl::make_unexpected(rv);
                        std::quoted(endp.address()), resp.error_code(),
                        resp.value(), resp.op_id());

            if(const auto ec = resp.error_code(); !ec) {
                return tl::make_unexpected(ec);
            }

    const admire::transfer tx = api::convert(out.tx);
            return admire::transfer{resp.value()};
        }
    }

    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}, transfer: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), admire::error_code::success, tx,
                out.op_id);
    return tx;
#endif
    LOGGER_ERROR("rpc call failed");
    return tl::make_unexpected(admire::error_code::other);
}

} // namespace admire::detail
+36 −79
Original line number Diff line number Diff line
@@ -506,6 +506,42 @@ remove_pfs_storage(const request& req, std::uint64_t pfs_id) {
    req.respond(resp);
}

void
transfer_datasets(const request& req, admire::job_id job_id,
                  const std::vector<admire::dataset>& sources,
                  const std::vector<admire::dataset>& targets,
                  const std::vector<admire::qos::limit>& limits,
                  enum admire::transfer::mapping mapping) {

    using scord::network::get_address;

    const auto rpc_name = "ADM_"s + __FUNCTION__;
    const auto rpc_id = remote_procedure::new_id();

    LOGGER_INFO(
            "rpc id: {} name: {} from: {} => "
            "body: {{job_id: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
            rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
            job_id, sources, targets, limits, mapping);

    admire::error_code ec;

    std::optional<std::uint64_t> tx_id;

    // TODO: generate a global ID for the transfer and contact Cargo to
    // actually request it
    tx_id = 42;

    const auto resp = response_with_id{rpc_id, ec, tx_id};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, tx_id: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                ec, tx_id);

    req.respond(resp);
}

} // namespace scord::network::handlers

/**
@@ -1069,85 +1105,6 @@ ADM_in_transit_ops(hg_handle_t h) {

DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops)


/**
 * Transfers the dataset identified by the source_name to the storage tier
 * defined by destination_name, and apply the provided constraints during the
 * transfer. This function returns a handle that can be used to track the
 * operation (i.e., get statistics, or status).
 *
 * @param in.source A source_location identifying the source dataset/s in the
 * source storage tier.
 * @param in.destination A destination_location identifying the destination
 * dataset/s in its desired location in a storage tier.
 * @param in.qos_constraints A list of qos_constraints that must be applied to
 * the transfer. These may not exceed the global ones set at node, application,
 * or resource level (see Section 3.4).
 * @param in.distribution A distribution strategy for data (e.g. one-to-one,
 * one-to-many, many-to-many)
 * @param in.job_id A job_id identifying the originating job.
 * @param out.transfer_handle A transfer_handle allowing clients to interact
 * with the transfer (e.g. wait for its completion, query its status, cancel it,
 * etc.
 * @return out.ret Returns if the remote procedure has been completed
 * successfully or not.
 */
static void
ADM_transfer_datasets(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_transfer_datasets_in_t in;
    ADM_transfer_datasets_out_t out;

    [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    const admire::job job{in.job};
    const std::vector<admire::dataset> sources =
            admire::api::convert(in.sources);
    const std::vector<admire::dataset> targets =
            admire::api::convert(in.targets);
    const std::vector<admire::qos::limit> limits =
            admire::api::convert(in.qos_limits);
    const auto mapping = static_cast<admire::transfer::mapping>(in.mapping);

    const auto id = remote_procedure::new_id();
    LOGGER_INFO(
            "rpc id: {} name: {} from: {} => "
            "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
            id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job,
            sources, targets, limits, mapping);

    admire::error_code ec;

    const auto transfer = admire::transfer{42};

    out.op_id = id;
    out.retval = ec;
    out.tx = admire::api::convert(transfer).release();

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, transfer: {}}}",
                id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), ec,
                transfer);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);
}

DEFINE_MARGO_RPC_HANDLER(ADM_transfer_datasets)

/**
 * Sets information for the dataset identified by resource_id.
 *
Loading