Commit eb481551 authored by David Auer's avatar David Auer
Browse files

Fix RPC client lifecycle

parent 8f775362
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ private:
    // Margo IDs. They can also be used to retrieve the Mercury classes and
    // contexts that were created at init time
    margo_instance_id server_rpc_mid_;
    margo_instance_id client_rpc_mid_;

    // Argobots I/O pools and execution streams
    ABT_pool io_pool_;
@@ -51,9 +52,15 @@ public:
    margo_instance*
    server_rpc_mid();

    margo_instance*
    client_rpc_mid();

    void
    server_rpc_mid(margo_instance* server_rpc_mid);

    void
    client_rpc_mid(margo_instance* client_rpc_mid);

    ABT_pool
    io_pool() const;

+11 −0
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ extern "C" {
#include <margo.h>
}

#include <cassert>
#include <string>

namespace gkfs::rpc {
@@ -135,6 +136,16 @@ margo_client_cleanup(hg_handle_t* handle, OutputType* output,
    return margo_client_cleanup(handle, output, mid, addr, nullptr);
}

inline hg_id_t
get_rpc_id(const margo_instance_id& mid, const std::string& tag) {
    hg_id_t id{};
    hg_bool_t exists = HG_FALSE;
    auto ret = margo_registered_name(mid, tag.c_str(), &id, &exists);
    assert(ret == HG_SUCCESS);
    assert(exists == HG_TRUE);
    return id;
}

} // namespace gkfs::rpc


+10 −0
Original line number Diff line number Diff line
@@ -30,6 +30,16 @@ RPCData::server_rpc_mid(margo_instance* server_rpc_mid) {
    RPCData::server_rpc_mid_ = server_rpc_mid;
}

margo_instance*
RPCData::client_rpc_mid() {
    return client_rpc_mid_;
}

void
RPCData::client_rpc_mid(margo_instance* client_rpc_mid) {
    RPCData::client_rpc_mid_ = client_rpc_mid;
}

ABT_pool
RPCData::io_pool() const {
    return io_pool_;
+34 −0
Original line number Diff line number Diff line
@@ -186,6 +186,22 @@ init_rpc_server() {
    register_server_rpcs(mid);
}

void
init_rpc_client() {
    auto mid = margo_init(GKFS_DATA->bind_addr().c_str(), MARGO_CLIENT_MODE,
                          HG_TRUE, 0);
    if(mid == MARGO_INSTANCE_NULL) {
        throw runtime_error("Failed to initialize the Margo RPC client");
    }

    // Register all server rpcs also in the client to avoid code duplication.
    // IDs can be pulled later via rpc_util.hpp:get_rpc_id
    register_server_rpcs(mid);

    // Put context and class into RPC_data object
    RPC_DATA->client_rpc_mid(mid);
}

void
init_environment() {
    // Initialize metadata db
@@ -245,6 +261,17 @@ init_environment() {
        throw;
    }

    if(gkfs::config::dynamic_placement) {
        try {
            init_rpc_client();
        } catch(const std::exception& e) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Failed to initialize RPC client: {}", __func__,
                    e.what());
            throw;
        }
    }

    // Init Argobots ESs to drive IO
    try {
        GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__);
@@ -329,6 +356,12 @@ destroy_enviroment() {
        margo_finalize(RPC_DATA->server_rpc_mid());
    }

    if(RPC_DATA->client_rpc_mid() != nullptr) {
        GKFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC client",
                                      __func__);
        margo_finalize(RPC_DATA->client_rpc_mid());
    }

    GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__);
    GKFS_DATA->close_mdb();
}
@@ -589,6 +622,7 @@ main(int argc, const char* argv[]) {
    signal(SIGTERM, shutdown_handler);
    signal(SIGKILL, shutdown_handler);


    unique_lock<mutex> lk(mtx);
    // Wait for shutdown signal to initiate shutdown protocols
    shutdown_please.wait(lk);
+0 −1
Original line number Diff line number Diff line
@@ -107,7 +107,6 @@ rpc_srv_relocate_chunk(hg_handle_t handle) {
    return gkfs::rpc::cleanup_respond(&handle, &in, &out, &local_bulk);
}


/**
 * Receive the signal to start relocation.
 */
Loading