Verified Commit 47bef9b1 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

scord: Rewrite `ADM_update_adhoc_storage` RPC

parent 565d9892
Loading
Loading
Loading
Loading
+25 −33
Original line number Diff line number Diff line
@@ -494,50 +494,42 @@ transfer_datasets(const server& srv, const job& job,
}

admire::error_code
update_adhoc_storage(const server& srv,
                     const adhoc_storage::ctx& adhoc_storage_ctx,
update_adhoc_storage(const server& srv, const adhoc_storage::ctx& new_ctx,
                     const adhoc_storage& adhoc_storage) {

    (void) srv;
    (void) adhoc_storage_ctx;
    (void) adhoc_storage;

    return admire::error_code::snafu;

#if 0
    scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
    scord::network::client rpc_client{srv.protocol()};

    const auto rpc_id = ::api::remote_procedure::new_id();
    auto endp = rpc_client.lookup(srv.address());

    if(const auto& lookup_rv = rpc_client.lookup(srv.address());
       lookup_rv.has_value()) {
        const auto& endp = lookup_rv.value();

        LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{adhoc_storage_id: {}}}",
                    "body: {{adhoc_id: {}, new_ctx: {}}}",
                    rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc_client.self_address()), adhoc_storage.id());
                    std::quoted(rpc_client.self_address()), adhoc_storage.id(),
                    new_ctx);

    const auto rpc_ctx = api::convert(adhoc_storage_ctx);

    ADM_update_adhoc_storage_in_t in{rpc_ctx.get(), adhoc_storage.id()};
    ADM_update_adhoc_storage_out_t out;
        if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__,
                                           adhoc_storage.id(), new_ctx);
           call_rv.has_value()) {

    const auto rpc = endp.call("ADM_update_adhoc_storage", &in, &out);
            const scord::network::generic_response resp{call_rv.value()};

    if(const auto rv = admire::error_code{out.retval}; !rv) {
        LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
            LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                        "rpc id: {} name: {} from: {} <= "
                        "body: {{retval: {}}} [op_id: {}]",
                        rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                     std::quoted(rpc.origin()), rv, out.op_id);
        return rv;
    }
                        std::quoted(endp.address()), resp.error_code(),
                        resp.op_id());

    LOGGER_INFO("rpc id: {} name: {} from: {} <= "
                "body: {{retval: {}}} [op_id: {}]",
                rpc_id, std::quoted("ADM_"s + __FUNCTION__),
                std::quoted(rpc.origin()), admire::error_code::success,
                out.op_id);
            return resp.error_code();
        }
    }

    return admire::error_code::success;
#endif
    LOGGER_ERROR("rpc call failed");
    return admire::error_code::other;
}

admire::error_code
+1 −2
Original line number Diff line number Diff line
@@ -59,8 +59,7 @@ register_adhoc_storage(const server& srv, const std::string& name,
                       const adhoc_storage::ctx& ctx);

admire::error_code
update_adhoc_storage(const server& srv,
                     const adhoc_storage::ctx& adhoc_storage_ctx,
update_adhoc_storage(const server& srv, const adhoc_storage::ctx& new_ctx,
                     const adhoc_storage& adhoc_storage);

admire::error_code
+39 −52
Original line number Diff line number Diff line
@@ -165,6 +165,45 @@ register_adhoc_storage(const request& req, const std::string& name,
    req.respond(resp);
}

void
update_adhoc_storage(const request& req, std::uint64_t adhoc_id,
                     const admire::adhoc_storage::ctx& new_ctx) {

    using scord::network::get_address;

    const auto rpc_name = "ADM_"s + __FUNCTION__;
    const auto rpc_id = remote_procedure::new_id();

    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{adhoc_id: {}, new_ctx: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                adhoc_id, new_ctx);

    auto& adhoc_manager = scord::adhoc_storage_manager::instance();
    const auto ec = adhoc_manager.update(adhoc_id, new_ctx);

    if(!ec) {
        LOGGER_ERROR(
                "rpc id: {} error_msg: \"Error updating adhoc_storage: {}\"",
                rpc_id, ec);
    }

    const auto resp = generic_response{rpc_id, ec};

    LOGGER_INFO("rpc id: {} name: {} to: {} <= "
                "body: {{retval: {}}}",
                rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)),
                ec);

    req.respond(resp);
}
                "body: {{retval: {}}}",
                rpc_id, std::quoted(__FUNCTION__),
                std::quoted(get_address(req)), ec);

    req.respond(resp);
}

} // namespace scord::network::handlers


@@ -284,58 +323,6 @@ ADM_remove_job(hg_handle_t h) {

DEFINE_MARGO_RPC_HANDLER(ADM_remove_job);

static void
ADM_update_adhoc_storage(hg_handle_t h) {

    using scord::network::utils::get_address;

    [[maybe_unused]] hg_return_t ret;

    ADM_update_adhoc_storage_in_t in;
    ADM_update_adhoc_storage_out_t out;

    [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);

    ret = margo_get_input(h, &in);
    assert(ret == HG_SUCCESS);

    const admire::adhoc_storage::ctx adhoc_storage_ctx(in.adhoc_storage_ctx);
    const std::uint64_t server_id(in.server_id);

    const auto rpc_id = remote_procedure::new_id();
    LOGGER_INFO("rpc id: {} name: {} from: {} => "
                "body: {{adhoc_storage_id: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                server_id);

    auto& adhoc_manager = scord::adhoc_storage_manager::instance();
    const auto ec = adhoc_manager.update(server_id, adhoc_storage_ctx);

    if(!ec) {
        LOGGER_ERROR(
                "rpc id: {} error_msg: \"Error updating adhoc_storage: {}\"",
                rpc_id, ec);
    }

    out.op_id = rpc_id;
    out.retval = ec;

    LOGGER_INFO("rpc id: {} name: {} to: {} => "
                "body: {{retval: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                ec);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);

    ret = margo_free_input(h, &in);
    assert(ret == HG_SUCCESS);

    ret = margo_destroy(h);
    assert(ret == HG_SUCCESS);
}

DEFINE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage);

static void
ADM_remove_adhoc_storage(hg_handle_t h) {
+10 −8
Original line number Diff line number Diff line
@@ -22,7 +22,6 @@
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/

// clang-format off
#ifndef SCORD_RPC_HANDLERS_HPP
#define SCORD_RPC_HANDLERS_HPP

@@ -31,9 +30,15 @@

namespace scord::network::handlers {

void ping(const scord::network::request& req);
void register_adhoc_storage(const request& req, const std::string& name,
enum admire::adhoc_storage::type type, const admire::adhoc_storage::ctx& ctx);
void
ping(const scord::network::request& req);
void
register_adhoc_storage(const request& req, const std::string& name,
                       enum admire::adhoc_storage::type type,
                       const admire::adhoc_storage::ctx& ctx);
void
update_adhoc_storage(const request& req, std::uint64_t adhoc_id,
                     const admire::adhoc_storage::ctx& new_ctx);

void
register_job(const scord::network::request& req,
@@ -41,7 +46,7 @@ register_job(const scord::network::request& req,
             const admire::job_requirements& job_requirements,
             admire::slurm_job_id slurm_id);

}
} // namespace scord::network::handlers

#include <margo.h>

@@ -58,9 +63,6 @@ DECLARE_MARGO_RPC_HANDLER(ADM_update_job);
/// ADM_remove_job
DECLARE_MARGO_RPC_HANDLER(ADM_remove_job);

/// ADM_update_adhoc_storage
DECLARE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage);

/// ADM_remove_adhoc_storage
DECLARE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage);

+2 −0
Original line number Diff line number Diff line
@@ -184,6 +184,8 @@ main(int argc, char* argv[]) {
        daemon.set_handler("ADM_ping"s, scord::network::handlers::ping);
        daemon.set_handler("ADM_register_adhoc_storage"s,
                           scord::network::handlers::register_adhoc_storage);
        daemon.set_handler("ADM_update_adhoc_storage"s,
                           scord::network::handlers::update_adhoc_storage);
        daemon.set_handler("ADM_register_job"s,
                           scord::network::handlers::register_job);