Commit b32a29ca authored by Marc Vef's avatar Marc Vef
Browse files

Add RPC client to daemon

parent 318e5c76
Loading
Loading
Loading
Loading
+16 −1
Original line number Diff line number Diff line
@@ -41,6 +41,10 @@ class Distributor;

namespace daemon {

struct margo_client_ids {
    hg_id_t test_rpc_id;
};

class RPCData {

private:
@@ -50,6 +54,8 @@ private:
    // contexts that were created at init time
    margo_instance_id server_rpc_mid_;
    margo_instance_id proxy_server_rpc_mid_;
    margo_instance_id client_rpc_mid_;
    margo_client_ids rpc_client_ids_{};

    // Argobots I/O pools and execution streams
    ABT_pool io_pool_;
@@ -83,7 +89,16 @@ public:
    proxy_server_rpc_mid();

    void
    proxy_server_rpc_mid(margo_instance* proxy_server_rpc_mid);
    proxy_server_rpc_mid(margo_instance* client_rpc_mid);

    margo_instance*
    client_rpc_mid();

    void
    client_rpc_mid(margo_instance* client_rpc_mid);

    margo_client_ids&
    rpc_client_ids();

    ABT_pool
    io_pool() const;
+15 −0
Original line number Diff line number Diff line
@@ -54,6 +54,21 @@ RPCData::proxy_server_rpc_mid(margo_instance* proxy_server_rpc_mid) {
    RPCData::proxy_server_rpc_mid_ = proxy_server_rpc_mid;
}

margo_instance*
RPCData::client_rpc_mid() {
    return client_rpc_mid_;
}

void
RPCData::client_rpc_mid(margo_instance* client_rpc_mid) {
    RPCData::client_rpc_mid_ = client_rpc_mid;
}

margo_client_ids&
RPCData::rpc_client_ids() {
    return rpc_client_ids_;
}

ABT_pool
RPCData::io_pool() const {
    return io_pool_;
+64 −0
Original line number Diff line number Diff line
@@ -235,6 +235,52 @@ init_rpc_server() {
    register_server_rpcs(mid);
}

/**
 * @brief Registers RPC handlers to a given Margo instance.
 * @internal
 * Registering is done by associating a Margo instance id (mid) with the RPC
 * name and its handler function including defined input/out structs
 * @endinternal
 * @param margo_instance_id
 */
void
register_client_rpcs(margo_instance_id mid) {
    // TODO
    RPC_DATA->rpc_client_ids().test_rpc_id = MARGO_REGISTER(
            mid, gkfs::rpc::tag::fs_config, void, rpc_config_out_t, NULL);
}

/**
 * @brief Initializes the daemon RPC client.
 * @throws std::runtime_error on failure
 */
void
init_rpc_client() {
    struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
    hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE;
    hg_options.stats = HG_FALSE;
    if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol())
        hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto margo_config = "{}";
    struct margo_init_info args = {nullptr};
    args.json_config = margo_config;
    args.hg_init_info = &hg_options;
    auto* mid = margo_init_ext(GKFS_DATA->bind_addr().c_str(),
                               MARGO_CLIENT_MODE, &args);

    if(mid == MARGO_INSTANCE_NULL) {
        throw runtime_error("Failed to initialize the Margo RPC client");
    }

    GKFS_DATA->spdlogger()->info(
            "{}() RPC client initialization successful for protocol {}",
            __func__, GKFS_DATA->bind_addr());

    RPC_DATA->client_rpc_mid(mid);
    register_client_rpcs(mid);
}

void
register_proxy_server_rpcs(margo_instance_id mid) {
    MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
@@ -464,6 +510,18 @@ init_environment() {
    if(!GKFS_DATA->hosts_file().empty()) {
        gkfs::utils::populate_hosts_file();
    }

    // Init margo client
    GKFS_DATA->spdlogger()->debug("{}() Initializing RPC client: '{}'",
                                  __func__, GKFS_DATA->bind_addr());
    try {
        init_rpc_client();
    } catch(const std::exception& e) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to initialize RPC client: {}", __func__, e.what());
        throw;
    }

    GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
}

@@ -524,6 +582,12 @@ destroy_enviroment() {
    GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__);
    GKFS_DATA->close_mdb();

    if(RPC_DATA->client_rpc_mid() != nullptr) {
        GKFS_DATA->spdlogger()->info("{}() Finalizing margo RPC client ...",
                                     __func__);
        margo_finalize(RPC_DATA->client_rpc_mid());
    }


    // Delete rootdir/metadir if requested
    if(!keep_rootdir) {