Verified Commit cbea50e9 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

scord: Rewrite `ADM_register_pfs_storage` RPC

parent ddf1a4d6
Loading
Loading
Loading
Loading
+16 −0
Original line number Diff line number Diff line
@@ -472,6 +472,8 @@ struct pfs_storage {

    struct ctx {

        ctx() = default;

        explicit ctx(std::filesystem::path mount_point);

        explicit ctx(ADM_pfs_context_t ctx);
@@ -479,10 +481,18 @@ struct pfs_storage {
        std::filesystem::path
        mount_point() const;

        template <class Archive>
        void
        serialize(Archive&& ar) {
            ar& m_mount_point;
        }

    private:
        std::filesystem::path m_mount_point;
    };

    pfs_storage();

    pfs_storage(enum pfs_storage::type type, std::string name, std::uint64_t id,
                std::filesystem::path mount_point);

@@ -511,6 +521,12 @@ struct pfs_storage {
    void
    update(admire::pfs_storage::ctx new_ctx);

    // The implementation for this must be deferred until
    // after the declaration of the PIMPL class
    template <class Archive>
    void
    serialize(Archive& ar);

private:
    class impl;
    std::unique_ptr<impl> m_pimpl;
+47 −0
Original line number Diff line number Diff line
@@ -1664,6 +1664,7 @@ pfs_storage::ctx::mount_point() const {
class pfs_storage::impl {

public:
    impl() = default;
    explicit impl(enum pfs_storage::type type, std::string name,
                  std::uint64_t id, pfs_storage::ctx ctx)
        : m_type(type), m_name(std::move(name)), m_id(id),
@@ -1701,6 +1702,24 @@ public:
        m_ctx = std::move(new_ctx);
    }

    template <class Archive>
    void
    load(Archive& ar) {
        ar(SCORD_SERIALIZATION_NVP(m_type));
        ar(SCORD_SERIALIZATION_NVP(m_name));
        ar(SCORD_SERIALIZATION_NVP(m_id));
        ar(SCORD_SERIALIZATION_NVP(m_ctx));
    }

    template <class Archive>
    void
    save(Archive& ar) const {
        ar(SCORD_SERIALIZATION_NVP(m_type));
        ar(SCORD_SERIALIZATION_NVP(m_name));
        ar(SCORD_SERIALIZATION_NVP(m_id));
        ar(SCORD_SERIALIZATION_NVP(m_ctx));
    }

private:
    enum type m_type;
    std::string m_name;
@@ -1708,6 +1727,8 @@ private:
    pfs_storage::ctx m_ctx;
};

pfs_storage::pfs_storage() = default;

pfs_storage::pfs_storage(enum pfs_storage::type type, std::string name,
                         std::uint64_t id, std::filesystem::path mount_point)
    : m_pimpl(std::make_unique<impl>(
@@ -1764,6 +1785,32 @@ pfs_storage::update(admire::pfs_storage::ctx new_ctx) {
    return m_pimpl->update(std::move(new_ctx));
}

// since the PIMPL class is fully defined at this point, we can now
// define the serialization function
template <class Archive>
inline void
pfs_storage::serialize(Archive& ar) {
    ar(SCORD_SERIALIZATION_NVP(m_pimpl));
}

//  we must also explicitly instantiate our template functions for
//  serialization in the desired archives
template void
pfs_storage::impl::save<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&) const;

template void
pfs_storage::impl::load<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

template void
pfs_storage::serialize<scord::network::serialization::output_archive>(
        scord::network::serialization::output_archive&);

template void
pfs_storage::serialize<scord::network::serialization::input_archive>(
        scord::network::serialization::input_archive&);

class job_requirements::impl {

public:
+23 −0
Original line number Diff line number Diff line
@@ -33,6 +33,29 @@
#include <thallium/serialization/stl/string.hpp>
#include <thallium/serialization/stl/vector.hpp>

// Cereal does not serialize std::filesystem::path's by default
#include <filesystem>

namespace cereal {

//! Loading for std::filesystem::path
template <class Archive>
inline void
CEREAL_LOAD_FUNCTION_NAME(Archive& ar, std::filesystem::path& out) {
    std::string tmp;
    ar(CEREAL_NVP_("data", tmp));
    out.assign(tmp);
}

//! Saving for std::filesystem::path
template <class Archive>
inline void
CEREAL_SAVE_FUNCTION_NAME(Archive& ar, const std::filesystem::path& in) {
    ar(CEREAL_NVP_("data", in.string()));
}

} // namespace cereal

namespace scord::network::serialization {

#define SCORD_SERIALIZATION_NVP CEREAL_NVP
+26 −35
Original line number Diff line number Diff line
@@ -453,51 +453,42 @@ tl::expected<admire::pfs_storage, admire::error_code>
register_pfs_storage(const server& srv, const std::string& name,
                     enum pfs_storage::type type, const pfs_storage::ctx& ctx) {

    (void) srv;
    (void) name;
    (void) type;
    (void) ctx;

    return tl::make_unexpected(admire::error_code::snafu);

#if 0
    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
    scord::network::client rpc_client{srv.protocol()};

    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    if(const auto& lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc id: {} name: {} from: {} => "
                    "body: {{name: {}, type: {}, pfs_ctx: {}}}",
                    rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                    std::quoted(rpc_client.self_address()), name, type, ctx);

    const auto rpc_name = name.c_str();
    const auto rpc_type = static_cast<ADM_pfs_storage_type_t>(type);
    const auto rpc_ctx = api::convert(ctx);

    ADM_register_pfs_storage_in_t in{rpc_name, rpc_type, rpc_ctx.get()};
    ADM_register_pfs_storage_out_t out;
        if(const auto& call_rv =
                   endp.call("ADM_"s + __FUNCTION__, name, type, ctx);
           call_rv.has_value()) {

    const auto rpc = endp.call("ADM_register_pfs_storage", &in, &out);
            const scord::network::response_with_id resp{call_rv.value()};

    if(const auto rv = admire::error_code{out.retval}; !rv) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
                     "body: {{retval: {}}} [op_id: {}]",
            LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                        "rpc id: {} name: {} from: {} <= "
                        "body: {{retval: {}, pfs_id: {}}} [op_id: {}]",
                        rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc_client.self_address()), rv, out.op_id);
        return tl::make_unexpected(rv);
    }
                        std::quoted(endp.address()), resp.error_code(),
                        resp.value(), resp.op_id());

    auto rpc_pfs_storage = admire::pfs_storage{type, name, out.id, ctx};
            if(const auto ec = resp.error_code(); !ec) {
                return tl::make_unexpected(ec);
            }

    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}, id: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()),
                admire::error_code::success, out.id, out.op_id);
            return admire::pfs_storage{type, name, resp.value(), ctx};
        }
    }

    return rpc_pfs_storage;
#endif
    LOGGER_ERROR("rpc call failed");
    return tl::make_unexpected(admire::error_code::other);
}

admire::error_code
+19 −41
Original line number Diff line number Diff line
@@ -404,69 +404,47 @@ deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) {
    req.respond(resp);
}

} // namespace scord::network::handlers


static void
ADM_register_pfs_storage(hg_handle_t h) {

    using admire::pfs_storage;
    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_register_pfs_storage_in_t in;
    ADM_register_pfs_storage_out_t out;

    [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);
void
register_pfs_storage(const request& req, const std::string& name,
                     enum admire::pfs_storage::type type,
                     const admire::pfs_storage::ctx& ctx) {

    const std::string pfs_name{in.name};
    const auto pfs_type = static_cast<enum pfs_storage::type>(in.type);
    const pfs_storage::ctx pfs_ctx{in.ctx};
    using scord::network::get_address;

    const auto rpc_name = "ADM_"s + __FUNCTION__;
    const auto rpc_id = remote_procedure::new_id();

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{name: {}, type: {}, pfs_ctx: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                pfs_name, pfs_type, pfs_ctx);
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                name, type, ctx);

    admire::error_code ec;
    std::uint64_t out_pfs_id = 0;
    std::optional<std::uint64_t> pfs_id = 0;
    auto& pfs_manager = scord::pfs_storage_manager::instance();

    if(const auto pm_result = pfs_manager.create(pfs_type, pfs_name, pfs_ctx);
    if(const auto pm_result = pfs_manager.create(type, name, ctx);
       pm_result.has_value()) {
        const auto& adhoc_storage_info = pm_result.value();
        out_pfs_id = adhoc_storage_info->pfs_storage().id();
        pfs_id = adhoc_storage_info->pfs_storage().id();
    } else {
        LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"",
                     rpc_id, pm_result.error());
        ec = pm_result.error();
    }

    out.op_id = rpc_id;
    out.retval = ec;
    out.id = out_pfs_id;
    const auto resp = response_with_id{rpc_id, ec, pfs_id};

    LOGGER_INFO("rpc id: {} name: {} to: {} => "
                "body: {{retval: {}, id: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                ec, out.id);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);
                "body: {{retval: {}, pfs_id: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                ec, pfs_id);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);
    req.respond(resp);
}

DEFINE_MARGO_RPC_HANDLER(ADM_register_pfs_storage);
} // namespace scord::network::handlers


static void
ADM_update_pfs_storage(hg_handle_t h) {
Loading