Commit 81ef7b4f authored by Marc Vef's avatar Marc Vef
Browse files

Margo client now uses margo_init_pool() to start the rpc/ipc client

parent ab8f75d1
Loading
Loading
Loading
Loading
+98 −47
Original line number Diff line number Diff line
@@ -20,6 +20,9 @@

static pthread_once_t init_lib_thread = PTHREAD_ONCE_INIT;

enum class Margo_mode {
    RPC, IPC
};
// Mercury/Margo IPC Client
margo_instance_id margo_ipc_id_;
hg_addr_t daemon_svr_addr_ = HG_ADDR_NULL;
@@ -460,6 +463,29 @@ int dup2(int oldfd, int newfd) __THROW {
    return (reinterpret_cast<decltype(&dup2)>(libc_dup2))(oldfd, newfd);
}

/**
 * Initializes the Argobots environment
 * @return
 */
bool init_ld_argobots() {
    LD_LOG_DEBUG0(debug_fd, "Initializing Argobots ...\n");

    // We need no arguments to init
    auto argo_err = ABT_init(0, nullptr);
    if (argo_err != 0) {
        LD_LOG_DEBUG0(debug_fd, "ABT_init() Failed to init Argobots (client)\n");
        return false;
    }
    // Set primary execution stream to idle without polling. Normally xstreams cannot sleep. This is what ABT_snoozer does
    argo_err = ABT_snoozer_xstream_self_set();
    if (argo_err != 0) {
        LD_LOG_DEBUG0(debug_fd, "ABT_snoozer_xstream_self_set()  (client)\n");
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");
    return true;
}

bool get_addr_by_hostid(const uint64_t hostid, hg_addr_t& svr_addr) {

    if (rpc_address_cache_.tryGet(hostid, svr_addr)) {
@@ -525,23 +551,54 @@ void register_client_rpcs(margo_instance_id mid) {
                                      NULL);
}

bool init_ipc_client() {
    auto protocol_port = "na+sm"s;
    LD_LOG_DEBUG0(debug_fd, "Initializing Margo IPC client ...\n");
bool init_margo_client(Margo_mode mode, const string na_plugin) {

    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init(protocol_port.c_str(), MARGO_CLIENT_MODE, 0, 0);
    ABT_xstream xstream = ABT_XSTREAM_NULL;
    ABT_pool pool = ABT_POOL_NULL;

    if (mid == MARGO_INSTANCE_NULL) {
        LD_LOG_DEBUG0(debug_fd, "[ERR]: margo_init failed to initialize the Margo IPC client\n");
    // get execution stream and its main pools
    auto ret = ABT_xstream_self(&xstream);
    if (ret != ABT_SUCCESS)
        return false;
    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    if (ret != ABT_SUCCESS) return false;
    if (mode == Margo_mode::IPC)
        LD_LOG_DEBUG0(debug_fd, "Initializing Mercury IPC client ...\n");
    else
        LD_LOG_DEBUG0(debug_fd, "Initializing Mercury RPC client ...\n");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_context_t* hg_context;
    hg_class = HG_Init(na_plugin.c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        LD_LOG_DEBUG0(debug_fd, "HG_Init() Failed to init Mercury client layer\n");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        LD_LOG_DEBUG0(debug_fd, "HG_Context_create() Failed to create Mercury client context\n");
        HG_Finalize(hg_class);
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");

    /* MARGO PART */
    if (mode == Margo_mode::IPC)
        LD_LOG_DEBUG0(debug_fd, "Initializing Margo IPC client ...\n");
    else
        LD_LOG_DEBUG0(debug_fd, "Initializing Margo RPC client ...\n");
    // margo will run in the context of thread
    auto mid = margo_init_pool(pool, pool, hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        LD_LOG_DEBUG0(debug_fd, "[ERR]: margo_init_pool failed to initialize the Margo client\n");
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");

    if (mode == Margo_mode::IPC) {
        margo_ipc_id_ = mid;
//    margo_diag_start(mid);

        auto adafs_daemon_pid = getProcIdByName("adafs_daemon"s);
        if (adafs_daemon_pid == -1) {
            printf("[ERR] ADA-FS daemon not started. Exiting ...\n");
@@ -553,34 +610,13 @@ bool init_ipc_client() {
        margo_addr_lookup(margo_ipc_id_, sm_addr_str.c_str(), &daemon_svr_addr_);

        register_client_ipcs(mid);

//    for (int i = 0; i < 10; ++i) {
//        printf("Running %d iteration\n", i);
//        send_minimal_ipc(minimal_id);
//    }

    return true;
}

bool init_rpc_client() {
    string protocol_port = RPC_PROTOCOL;

    LD_LOG_DEBUG0(debug_fd, "Initializing Margo RPC client ...\n");

    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init(protocol_port.c_str(), MARGO_CLIENT_MODE, 0, 0);

    if (mid == MARGO_INSTANCE_NULL) {
        LD_LOG_DEBUG0(debug_fd, "[ERR]: margo_init failed to initialize the Margo RPC client\n");
        return false;
    }
    LD_LOG_DEBUG0(debug_fd, "Success.\n");

    } else {
        margo_rpc_id_ = mid;

        register_client_rpcs(mid);
    }
//    margo_diag_start(mid);

//    for (int i = 0; i < 10000; ++i) {
//    for (int i = 0; i < 10; ++i) {
//        printf("Running %d iteration\n", i);
//        send_minimal_ipc(minimal_id);
//    }
@@ -596,7 +632,6 @@ margo_instance_id ld_margo_rpc_id() {
    return margo_rpc_id_;
}


hg_addr_t daemon_addr() {
    return daemon_svr_addr_;
}
@@ -606,11 +641,13 @@ hg_addr_t daemon_addr() {
 */
void init_environment() {
    // init margo client for IPC
    auto err = init_ipc_client();
    auto err = init_ld_argobots();
    assert(err);
    err = init_margo_client(Margo_mode::IPC, "na+sm"s);
    assert(err);
    err = ipc_send_get_fs_config(ipc_config_id); // get fs configurations the daemon was started with.
    assert(err);
    err = init_rpc_client();
    err = init_margo_client(Margo_mode::RPC, RPC_PROTOCOL);
    assert(err);
    is_env_initialized = true;
    LD_LOG_DEBUG0(debug_fd, "Environment initialized.\n");
@@ -680,7 +717,9 @@ void destroy_preload() {
    LD_LOG_DEBUG0(debug_fd, "Freeing Mercury daemon addr ...\n");
    HG_Addr_free(margo_get_class(margo_ipc_id_), daemon_svr_addr_);
    LD_LOG_DEBUG0(debug_fd, "Finalizing Margo IPC client ...\n");
//    margo_finalize(margo_ipc_id_);
    auto mercury_ipc_class = margo_get_class(margo_ipc_id_);
    auto mercury_ipc_context = margo_get_context(margo_ipc_id_);
    margo_finalize(margo_ipc_id_);

    LD_LOG_DEBUG0(debug_fd, "Freeing Mercury RPC addresses ...\n");
    // free all rpc addresses in LRU map and finalize margo rpc
@@ -689,8 +728,20 @@ void destroy_preload() {
    };
    rpc_address_cache_.cwalk(free_all_addr);
    LD_LOG_DEBUG0(debug_fd, "Finalizing Margo RPC client ...\n");
//    margo_finalize(margo_rpc_id_);
    auto mercury_rpc_class = margo_get_class(margo_rpc_id_);
    auto mercury_rpc_context = margo_get_context(margo_rpc_id_);
    margo_finalize(margo_rpc_id_);

    LD_LOG_DEBUG0(debug_fd, "Destroying Mercury context ...\n");
    HG_Context_destroy(mercury_ipc_context);
    HG_Context_destroy(mercury_rpc_context);
    LD_LOG_DEBUG0(debug_fd, "Finalizing Mercury class ...\n");
    HG_Finalize(mercury_ipc_class);
    HG_Finalize(mercury_rpc_class);
    LD_LOG_DEBUG0(debug_fd, "Preload library shut down.\n");

    LD_LOG_DEBUG0(debug_fd, "Finalizing Argobots ...\n");
    ABT_finalize();

    fclose(debug_fd);
}
 No newline at end of file