Verified Commit 565d9892 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

scord: Rewrite `ADM_register_job` RPC

parent 22240684
Loading
Loading
Loading
Loading
+36 −0
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include <fmt/format.h>
#include <utils/ctype_ptr.hpp>
#include <optional>
#include <cereal/access.hpp>
#include "admire_types.h"

namespace admire {
@@ -171,16 +172,24 @@ struct job_requirements;
struct job {

    struct resources {
        resources();
        explicit resources(std::vector<admire::node> nodes);
        explicit resources(ADM_job_resources_t res);

        std::vector<admire::node>
        nodes() const;

        template <typename Archive>
        void
        serialize(Archive&& ar) {
            ar& m_nodes;
        }

    private:
        std::vector<admire::node> m_nodes;
    };

    job();
    job(job_id id, slurm_job_id slurm_id);
    explicit job(ADM_job_t job);
    job(const job&) noexcept;
@@ -200,6 +209,11 @@ struct job {
private:
    class impl;
    std::unique_ptr<impl> m_pimpl;

    friend class cereal::access;
    template <class Archive>
    void
    serialize(Archive& ar);
};

struct transfer {
@@ -305,6 +319,7 @@ private:


struct dataset {
    dataset();
    explicit dataset(std::string id);
    explicit dataset(ADM_dataset_t dataset);
    dataset(const dataset&) noexcept;
@@ -318,6 +333,12 @@ struct dataset {
    std::string
    id() const;

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
@@ -402,6 +423,7 @@ struct adhoc_storage {
        bool m_should_flush;
    };

    adhoc_storage();
    adhoc_storage(enum adhoc_storage::type type, std::string name,
                  std::uint64_t id, execution_mode exec_mode,
                  access_type access_type, adhoc_storage::resources res,
@@ -430,6 +452,12 @@ struct adhoc_storage {
    void
    update(admire::adhoc_storage::ctx new_ctx);

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
@@ -490,6 +518,8 @@ private:

struct job_requirements {

    job_requirements();

    job_requirements(std::vector<admire::dataset> inputs,
                     std::vector<admire::dataset> outputs);

@@ -515,6 +545,12 @@ struct job_requirements {
    std::optional<admire::adhoc_storage>
    adhoc_storage() const;

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
+168 −1
Original line number Diff line number Diff line
@@ -31,6 +31,8 @@
#include <api/convert.hpp>
#include <variant>
#include <optional>
#include <cereal/types/memory.hpp>
#include <thallium/serialization/proc_input_archive.hpp>
#include "admire_types.hpp"
#include "internal_types.hpp"

@@ -1199,6 +1201,7 @@ node::serialize<scord::network::serialization::input_archive>(
class job::impl {

public:
    impl() {}
    impl(job_id id, slurm_job_id slurm_job_id)
        : m_id(id), m_slurm_job_id(slurm_job_id) {}
    impl(const impl& rhs) = default;
@@ -1219,10 +1222,26 @@ public:
    }

private:
    friend class cereal::access;

    template <class Archive>
    void
    load(Archive& ar) {
        ar(CEREAL_NVP(m_id));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(CEREAL_NVP(m_id));
    }

    job_id m_id;
    slurm_job_id m_slurm_job_id;
};

job::resources::resources() = default;

job::resources::resources(std::vector<admire::node> nodes)
    : m_nodes(std::move(nodes)) {}

@@ -1240,6 +1259,8 @@ job::resources::nodes() const {
    return m_nodes;
}

job::job() = default;

job::job(job_id id, slurm_job_id slurm_job_id)
    : m_pimpl(std::make_unique<job::impl>(id, slurm_job_id)) {}

@@ -1271,6 +1292,19 @@ job::slurm_id() const {
    return m_pimpl->slurm_id();
}

template <class Archive>
inline void
job::serialize(Archive& ar) {
    ar(CEREAL_NVP(m_pimpl));
}

template void
job::serialize<thallium::proc_input_archive<>>(thallium::proc_input_archive<>&);
template void
job::serialize<thallium::proc_output_archive<>>(
        thallium::proc_output_archive<>&);


class transfer::impl {

public:
@@ -1321,8 +1355,8 @@ transfer::id() const {

class dataset::impl {
public:
    impl() = default;
    explicit impl(std::string id) : m_id(std::move(id)) {}

    impl(const impl& rhs) = default;
    impl(impl&& rhs) = default;
    impl&
@@ -1336,10 +1370,24 @@ public:
        return m_id;
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_id));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_id));
    }

private:
    std::string m_id;
};

dataset::dataset() = default;

dataset::dataset(std::string id)
    : m_pimpl(std::make_unique<dataset::impl>(std::move(id))) {}

@@ -1366,6 +1414,32 @@ dataset::id() const {
    return m_pimpl->id();
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
dataset::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
dataset::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
dataset::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
dataset::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
dataset::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

adhoc_storage::resources::resources(std::vector<admire::node> nodes)
    : m_nodes(std::move(nodes)) {}

@@ -1425,6 +1499,7 @@ adhoc_storage::ctx::should_flush() const {
class adhoc_storage::impl {

public:
    impl() = default;
    explicit impl(enum adhoc_storage::type type, std::string name,
                  std::uint64_t id, adhoc_storage::ctx ctx)
        : m_type(type), m_name(std::move(name)), m_id(id),
@@ -1462,6 +1537,25 @@ public:
        m_ctx = std::move(new_ctx);
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_type));
        ar(SCORD_SERIALIZATION_NVP(m_name));
        ar(SCORD_SERIALIZATION_NVP(m_id));
        ar(SCORD_SERIALIZATION_NVP(m_ctx));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_type));
        ar(SCORD_SERIALIZATION_NVP(m_name));
        ar(SCORD_SERIALIZATION_NVP(m_id));
        ar(SCORD_SERIALIZATION_NVP(m_ctx));
    }


private:
    enum type m_type;
    std::string m_name;
@@ -1469,6 +1563,8 @@ private:
    adhoc_storage::ctx m_ctx;
};

adhoc_storage::adhoc_storage() = default;

adhoc_storage::adhoc_storage(enum adhoc_storage::type type, std::string name,
                             std::uint64_t id, execution_mode exec_mode,
                             access_type access_type,
@@ -1527,6 +1623,32 @@ adhoc_storage::update(admire::adhoc_storage::ctx new_ctx) {
    return m_pimpl->update(std::move(new_ctx));
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
adhoc_storage::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
adhoc_storage::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
adhoc_storage::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
adhoc_storage::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
adhoc_storage::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

adhoc_storage::~adhoc_storage() = default;

pfs_storage::ctx::ctx(std::filesystem::path mount_point)
@@ -1645,6 +1767,7 @@ pfs_storage::update(admire::pfs_storage::ctx new_ctx) {
class job_requirements::impl {

public:
    impl() = default;
    impl(std::vector<admire::dataset> inputs,
         std::vector<admire::dataset> outputs)
        : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)) {}
@@ -1696,6 +1819,22 @@ public:
        return m_adhoc_storage;
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_inputs));
        ar(SCORD_SERIALIZATION_NVP(m_outputs));
        ar(SCORD_SERIALIZATION_NVP(m_adhoc_storage));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_inputs));
        ar(SCORD_SERIALIZATION_NVP(m_outputs));
        ar(SCORD_SERIALIZATION_NVP(m_adhoc_storage));
    }

private:
    std::vector<admire::dataset> m_inputs;
    std::vector<admire::dataset> m_outputs;
@@ -1703,6 +1842,8 @@ private:
};


job_requirements::job_requirements() = default;

job_requirements::job_requirements(std::vector<admire::dataset> inputs,
                                   std::vector<admire::dataset> outputs)
    : m_pimpl(std::make_unique<impl>(std::move(inputs), std::move(outputs))) {}
@@ -1747,6 +1888,32 @@ job_requirements::adhoc_storage() const {
    return m_pimpl->adhoc_storage();
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
job_requirements::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
job_requirements::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
job_requirements::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
job_requirements::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
job_requirements::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

namespace qos {

class entity::impl {
+29 −36
Original line number Diff line number Diff line
@@ -213,55 +213,48 @@ ping(const server& srv) {

tl::expected<admire::job, admire::error_code>
register_job(const server& srv, const job::resources& job_resources,
             const job_requirements& reqs, admire::slurm_job_id slurm_id) {
             const job_requirements& job_requirements,
             admire::slurm_job_id slurm_id) {

    (void) srv;
    (void) job_resources;
    (void) reqs;
    (void) slurm_id;

    return tl::make_unexpected(admire::error_code::snafu);

#if 0
    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
    scord::network::client rpc_client{srv.protocol()};

    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
    if(const auto lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO(
                "rpc id: {} name: {} from: {} => "
                "body: {{job_resources: {}, job_requirements: {}, slurm_id: "
                "{}}}",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), job_resources, reqs,
                slurm_id);

    auto rpc_job_resources = api::convert(job_resources);
    auto rpc_reqs = api::convert(reqs);
                std::quoted(rpc_client.self_address()), job_resources,
                job_requirements, slurm_id);

    ADM_register_job_in_t in{rpc_job_resources.get(), *rpc_reqs.get(),
                             slurm_id};
    ADM_register_job_out_t out;
        if(const auto call_rv = endp.call("ADM_"s + __FUNCTION__, job_resources,
                                          job_requirements, slurm_id);
           call_rv.has_value()) {

    const auto rpc = endp.call("ADM_register_job", &in, &out);
            const scord::network::response_with_id resp{call_rv.value()};

    if(const auto rv = admire::error_code{out.retval}; !rv) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {} [op_id: {}]",
            LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                        "rpc id: {} name: {} from: {} <= "
                        "body: {{retval: {}, job_id: {}}} [op_id: {}]",
                        rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), rv, out.op_id);
        return tl::make_unexpected(rv);
    }
                        std::quoted(endp.address()), resp.error_code(),
                        resp.value(), resp.op_id());

    const admire::job job = api::convert(out.job);
            if(const auto ec = resp.error_code(); !ec) {
                return tl::make_unexpected(resp.error_code());
            }

    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}, job: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), admire::error_code::success, job,
                out.op_id);
            return admire::job{resp.value(), slurm_id};
        }
    }

    return job;
#endif
    LOGGER_ERROR("rpc call failed");
    return tl::make_unexpected(admire::error_code::other);
}

admire::error_code
+2 −1
Original line number Diff line number Diff line
@@ -36,7 +36,8 @@ ping(const server& srv);

tl::expected<admire::job, admire::error_code>
register_job(const server& srv, const job::resources& job_resources,
             const job_requirements& reqs, admire::slurm_job_id slurm_id);
             const job_requirements& job_requirements,
             admire::slurm_job_id slurm_id);

admire::error_code
update_job(const server& srv, const job& job,
+51 −72
Original line number Diff line number Diff line
@@ -73,6 +73,57 @@ ping(const scord::network::request& req) {
    req.respond(resp);
}

void
register_job(const scord::network::request& req,
             const admire::job::resources& job_resources,
             const admire::job_requirements& job_requirements,
             admire::slurm_job_id slurm_id) {

    using scord::network::get_address;

    const auto rpc_name = "ADM_"s + __FUNCTION__;
    const auto rpc_id = remote_procedure::new_id();

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{job_resources: {}, job_requirements: {}, slurm_id: "
                "{}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                job_resources, job_requirements, slurm_id);

    admire::error_code ec;
    std::optional<admire::job_id> job_id;
    auto& jm = scord::job_manager::instance();

    if(const auto jm_result =
               jm.create(slurm_id, job_resources, job_requirements);
       jm_result.has_value()) {

        const auto& job_info = jm_result.value();

        // if the job requires an adhoc storage instance, inform the appropriate
        // adhoc_storage instance (if registered)
        if(job_requirements.adhoc_storage()) {
            const auto adhoc_id = job_requirements.adhoc_storage()->id();
            auto& adhoc_manager = scord::adhoc_storage_manager::instance();
            ec = adhoc_manager.add_client_info(adhoc_id, job_info);
        }

        job_id = job_info->job().id();
    } else {
        LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc_id,
                     jm_result.error());
        ec = jm_result.error();
    }

    const auto resp = response_with_id{rpc_id, ec, job_id};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, job_id: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                ec, job_id);

    req.respond(resp);
}

void
register_adhoc_storage(const request& req, const std::string& name,
@@ -117,78 +168,6 @@ register_adhoc_storage(const request& req, const std::string& name,
} // namespace scord::network::handlers


static void
ADM_register_job(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_register_job_in_t in;
    ADM_register_job_out_t out;

    [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    const admire::job_requirements reqs(&in.reqs);
    const admire::job::resources job_resources(in.job_resources);
    const admire::slurm_job_id slurm_id = in.slurm_job_id;

    const auto rpc_id = remote_procedure::new_id();
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{job_resources: {}, job_requirements: {}, slurm_id: "
                "{}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                job_resources, reqs, slurm_id);

    admire::error_code ec = admire::error_code::success;
    std::optional<admire::job> out_job;
    auto& jm = scord::job_manager::instance();

    if(const auto jm_result = jm.create(slurm_id, job_resources, reqs);
       jm_result.has_value()) {

        const auto& job_info = jm_result.value();

        // if the job requires an adhoc storage instance, inform the appropriate
        // adhoc_storage instance (if registered)
        if(reqs.adhoc_storage()) {
            const auto adhoc_id = reqs.adhoc_storage()->id();
            auto& adhoc_manager = scord::adhoc_storage_manager::instance();
            ec = adhoc_manager.add_client_info(adhoc_id, job_info);
        }

        out_job = job_info->job();
    } else {
        LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc_id,
                     jm_result.error());
        ec = jm_result.error();
    }

    out.op_id = rpc_id;
    out.retval = ec;
    out.job = out_job ? admire::api::convert(*out_job).release() : nullptr;

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}, job: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                ec, out_job);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);
}

DEFINE_MARGO_RPC_HANDLER(ADM_register_job);


static void
ADM_update_job(hg_handle_t h) {

Loading