Skip to content
adafs_daemon.cpp 7.8 KiB
Newer Older
Marc Vef's avatar
Marc Vef committed
//
// Created by evie on 8/31/17.
//

#include "daemon/adafs_daemon.hpp"
#include "db/db_util.hpp"

void daemon_loop(void* arg) {
    ADAFS_DATA->spdlogger()->info("Starting application loop ...");
    while(true) {
        ADAFS_DATA->spdlogger()->info("sleeping");
        sleep(5);
        ADAFS_DATA->spdlogger()->info("done sleeping. exiting ...");
        break;
    }
}

void run_daemon() {
    ABT_xstream xstream;
    ABT_pool pool;
    ABT_thread loop_thread;

    auto argo_ret = ABT_xstream_self(&xstream); // get the current execution stream (1 core) to use (started with ABT_init())
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error getting the execution stream when starting the daemon.");
        return;
    }
    argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool); // get the thread pool
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error getting the thread pool when starting the daemon.");
        return;
    }
    argo_ret = ABT_thread_create(pool, daemon_loop, nullptr, ABT_THREAD_ATTR_NULL, &loop_thread);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error creating loop thread");
        return;
    }

    // wait for the daemon to be closed and free the loop thread
    ABT_thread_yield_to(loop_thread);
    argo_ret = ABT_thread_join(loop_thread);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error joining loop thread");
        return;
    }
    argo_ret = ABT_thread_free(&loop_thread);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error freeing loop thread.");
        return;
    }

}

void init_environment() {
    // Initialize rocksdb
    auto err = init_rocksdb();
    assert(err);
    err = init_argobots();
    assert(err);
    // init margo
    err = init_rpc_server();
    assert(err);
    err = init_rpc_client();
    assert(err);
}

/**
 * Destroys the margo, argobots, and mercury environments
 */
void destroy_enviroment() {
    ADAFS_DATA->spdlogger()->info("About to finalize the margo server and client");
    margo_finalize(RPC_DATA->client_mid());
    margo_finalize(RPC_DATA->server_mid());
    ADAFS_DATA->spdlogger()->info("Success.");

    destroy_argobots();

    ADAFS_DATA->spdlogger()->info("About to destroy the mercury context");
    HG_Context_destroy(RPC_DATA->client_hg_context());
    HG_Context_destroy(RPC_DATA->server_hg_context());
    ADAFS_DATA->spdlogger()->info("Success.");
    ADAFS_DATA->spdlogger()->info("About to destroy the mercury class");
    HG_Finalize(RPC_DATA->client_hg_class());
    HG_Finalize(RPC_DATA->server_hg_class());
    ADAFS_DATA->spdlogger()->info("Success.");
    ADAFS_DATA->spdlogger()->info("All services shut down.");
}

Marc Vef's avatar
Marc Vef committed
/**
 * Initializes the Argobots environment
 * @return
 */
bool init_argobots() {
    ADAFS_DATA->spdlogger()->info("Initializing Argobots ...");

    // We need no arguments to init
    auto argo_err = ABT_init(0, nullptr);
    if (argo_err != 0) {
        ADAFS_DATA->spdlogger()->error("ABT_init() Failed to init Argobots (client)");
        return false;
    }
    // Set primary execution stream to idle without polling. Normally xstreams cannot sleep. This is what ABT_snoozer does
    argo_err = ABT_snoozer_xstream_self_set();
    if (argo_err != 0) {
        ADAFS_DATA->spdlogger()->error("ABT_snoozer_xstream_self_set()  (client)");
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");
    return true;
}

/**
 * Shuts down Argobots
 */
void destroy_argobots() {
    ADAFS_DATA->spdlogger()->info("About to shut Argobots down"s);
    auto ret = ABT_finalize();
    if (ret == ABT_SUCCESS) {
        ADAFS_DATA->spdlogger()->info("Argobots successfully shutdown.");
    } else {
        ADAFS_DATA->spdlogger()->error("Argobots shutdown FAILED with err code {}", ret);
    }
}

bool init_rpc_server() {
    auto protocol_port = "bmi+tcp://localhost:" + to_string(RPCPORT);
Marc Vef's avatar
Marc Vef committed

    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];

    // Mercury class and context pointer that go into RPC_data class
    hg_class_t* hg_class;
    hg_context_t* hg_context;

    ADAFS_DATA->spdlogger()->info("Initializing Mercury server ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury server layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury server context");
        HG_Finalize(hg_class);
        return false;
    }
    // Below is just for logging purposes
    // Figure out what address this server is listening on (must be freed when finished)
    auto hg_ret = HG_Addr_self(hg_class, &addr_self);
    if (hg_ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve server address");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    // Convert the address to a cstring (with \0 terminator).
    hg_ret = HG_Addr_to_string(hg_class, addr_self_cstring, &addr_self_cstring_sz, addr_self);
    if (hg_ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("HG_Addr_to_string Failed to convert address to cstring");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        HG_Addr_free(hg_class, addr_self);
        return false;
    }
    HG_Addr_free(hg_class, addr_self);

    ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);

    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo server...");
    // Start Margo
    auto mid = margo_init(1, 16, hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo server");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");

    // Put context and class into RPC_data object
    RPC_DATA->server_hg_class(hg_class);
    RPC_DATA->server_hg_context(hg_context);
    RPC_DATA->server_mid(mid);

    // register RPCs
//    register_server_rpcs();
Marc Vef's avatar
Marc Vef committed

    return true;
Marc Vef's avatar
Marc Vef committed
}

bool init_rpc_client() {
    auto protocol_port = "bmi+tcp"s;
    ADAFS_DATA->spdlogger()->info("Initializing Mercury client ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_context_t* hg_context;
    hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury client layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury client context");
        HG_Finalize(hg_class);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");

    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo client ...");
    // Start Margo
    auto mid = margo_init(0, 0,
                          hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->info("[ERR]: margo_init failed to initialize the Margo client");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");

    // Put context and class into RPC_data object
    RPC_DATA->client_hg_class(hg_class);
    RPC_DATA->client_hg_context(hg_context);
    RPC_DATA->client_mid(mid);

//    register_client_rpcs();

    return true;
}