Commit a64a02c3 authored by Marc Vef's avatar Marc Vef
Browse files

ifs: daemon startup loop prototype

parent 646579e8
Loading
Loading
Loading
Loading
+8 −1
Original line number Diff line number Diff line
@@ -7,9 +7,16 @@

#include "../../main.hpp"

void daemon_loop(void* arg);
void run_daemon();

void init_environment();
void destroy_enviroment();

bool init_argobots();
void destroy_argobots();
bool init_rpc_server();
bool init_rpc_client();

void init_environment();

#endif //IFS_ADAFS_DAEMON_HPP
+7 −11
Original line number Diff line number Diff line
@@ -26,16 +26,13 @@ int main(int argc, const char* argv[]) {
#endif

    // Parse input
    auto fuse_argc = 1;
    vector<string> fuse_argv;
    fuse_argv.push_back(move(argv[0]));
//    auto fuse_struct = make_unique<tmp_fuse_usr>();

    po::options_description desc("Allowed options");
    desc.add_options()
            ("help,h", "Help message")
            ("foreground,f", "Run Fuse instance in foreground. (Fuse parameter)")
            ("mountdir,m", po::value<string>(), "User Fuse mountdir. (Fuse parameter)")
            ("mountdir,m", po::value<string>(), "User Fuse mountdir.")
            ("rootdir,r", po::value<string>(), "ADA-FS data directory")
            ("hostsfile", po::value<string>(), "Path to the hosts_file for all fs participants")
            ("hosts,h", po::value<string>(), "Comma separated list of hosts_ for all fs participants");
@@ -47,13 +44,7 @@ int main(int argc, const char* argv[]) {
        cout << desc << "\n";
        return 1;
    }

    if (vm.count("foreground")) {
        fuse_argc++;
        fuse_argv.push_back("-f"s);
    }
    if (vm.count("mountdir")) {
        fuse_argc++;
        fuse_argv.push_back(vm["mountdir"].as<string>());
    }
    if (vm.count("rootdir")) {
@@ -103,6 +94,11 @@ int main(int argc, const char* argv[]) {

    init_environment();

    run_daemon(); // blocks here until application loop is exited TODO don't know yet how it'll be closed :D



    destroy_enviroment();

    return 0;

+1 −0
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@ extern "C" {
}
// adafs
#include "include/classes/fs_data.hpp"
#include "include/classes/rpc_data.hpp"

namespace bfs = boost::filesystem;

+192 −7
Original line number Diff line number Diff line
@@ -5,6 +5,87 @@
#include "daemon/adafs_daemon.hpp"
#include "db/db_util.hpp"

void daemon_loop(void* arg) {
    ADAFS_DATA->spdlogger()->info("Starting application loop ...");
    while(true) {
        ADAFS_DATA->spdlogger()->info("sleeping");
        sleep(5);
        ADAFS_DATA->spdlogger()->info("done sleeping. exiting ...");
        break;
    }
}

void run_daemon() {
    ABT_xstream xstream;
    ABT_pool pool;
    ABT_thread loop_thread;

    auto argo_ret = ABT_xstream_self(&xstream); // get the current execution stream (1 core) to use (started with ABT_init())
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error getting the execution stream when starting the daemon.");
        return;
    }
    argo_ret = ABT_xstream_get_main_pools(xstream, 1, &pool); // get the thread pool
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error getting the thread pool when starting the daemon.");
        return;
    }
    argo_ret = ABT_thread_create(pool, daemon_loop, nullptr, ABT_THREAD_ATTR_NULL, &loop_thread);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error creating loop thread");
        return;
    }

    // wait for the daemon to be closed and free the loop thread
    ABT_thread_yield_to(loop_thread);
    argo_ret = ABT_thread_join(loop_thread);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error joining loop thread");
        return;
    }
    argo_ret = ABT_thread_free(&loop_thread);
    if (argo_ret != 0) {
        ADAFS_DATA->spdlogger()->error("Error freeing loop thread.");
        return;
    }

}

void init_environment() {
    // Initialize rocksdb
    auto err = init_rocksdb();
    assert(err);
    err = init_argobots();
    assert(err);
    // init margo
    err = init_rpc_server();
    assert(err);
    err = init_rpc_client();
    assert(err);
}

/**
 * Destroys the margo, argobots, and mercury environments
 */
void destroy_enviroment() {
    ADAFS_DATA->spdlogger()->info("About to finalize the margo server and client");
    margo_finalize(RPC_DATA->client_mid());
    margo_finalize(RPC_DATA->server_mid());
    ADAFS_DATA->spdlogger()->info("Success.");

    destroy_argobots();

    ADAFS_DATA->spdlogger()->info("About to destroy the mercury context");
    HG_Context_destroy(RPC_DATA->client_hg_context());
    HG_Context_destroy(RPC_DATA->server_hg_context());
    ADAFS_DATA->spdlogger()->info("Success.");
    ADAFS_DATA->spdlogger()->info("About to destroy the mercury class");
    HG_Finalize(RPC_DATA->client_hg_class());
    HG_Finalize(RPC_DATA->server_hg_class());
    ADAFS_DATA->spdlogger()->info("Success.");
    ADAFS_DATA->spdlogger()->info("All services shut down.");
}

/**
 * Initializes the Argobots environment
 * @return
@@ -41,13 +122,117 @@ void destroy_argobots() {
    }
}

void init_environment() {
    // Initialize rocksdb
    auto err = init_rocksdb();
    assert(err);
    err = init_argobots();
    assert(err);
bool init_rpc_server() {
    auto protocol_port = "bmi+tcp://localhost:" + to_string(RPCPORT);

    destroy_argobots();
    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];

    // Mercury class and context pointer that go into RPC_data class
    hg_class_t* hg_class;
    hg_context_t* hg_context;

    ADAFS_DATA->spdlogger()->info("Initializing Mercury server ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class = HG_Init(protocol_port.c_str(), HG_TRUE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury server layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury server context");
        HG_Finalize(hg_class);
        return false;
    }
    // Below is just for logging purposes
    // Figure out what address this server is listening on (must be freed when finished)
    auto hg_ret = HG_Addr_self(hg_class, &addr_self);
    if (hg_ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("HG_Addr_self() Failed to retrieve server address");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    // Convert the address to a cstring (with \0 terminator).
    hg_ret = HG_Addr_to_string(hg_class, addr_self_cstring, &addr_self_cstring_sz, addr_self);
    if (hg_ret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("HG_Addr_to_string Failed to convert address to cstring");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        HG_Addr_free(hg_class, addr_self);
        return false;
    }
    HG_Addr_free(hg_class, addr_self);

    ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);

    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo server...");
    // Start Margo
    auto mid = margo_init(1, 16, hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo server");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");

    // Put context and class into RPC_data object
    RPC_DATA->server_hg_class(hg_class);
    RPC_DATA->server_hg_context(hg_context);
    RPC_DATA->server_mid(mid);

    // register RPCs
//    register_server_rpcs();

    return true;
}

bool init_rpc_client() {
    auto protocol_port = "bmi+tcp"s;
    ADAFS_DATA->spdlogger()->info("Initializing Mercury client ...");
    /* MERCURY PART */
    // Init Mercury layer (must be finalized when finished)
    hg_class_t* hg_class;
    hg_context_t* hg_context;
    hg_class = HG_Init(protocol_port.c_str(), HG_FALSE);
    if (hg_class == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Init() Failed to init Mercury client layer");
        return false;
    }
    // Create a new Mercury context (must be destroyed when finished)
    hg_context = HG_Context_create(hg_class);
    if (hg_context == nullptr) {
        ADAFS_DATA->spdlogger()->error("HG_Context_create() Failed to create Mercury client context");
        HG_Finalize(hg_class);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");

    /* MARGO PART */
    ADAFS_DATA->spdlogger()->info("Initializing Margo client ...");
    // Start Margo
    auto mid = margo_init(0, 0,
                          hg_context);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->info("[ERR]: margo_init failed to initialize the Margo client");
        HG_Context_destroy(hg_context);
        HG_Finalize(hg_class);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Success.");

    // Put context and class into RPC_data object
    RPC_DATA->client_hg_class(hg_class);
    RPC_DATA->client_hg_context(hg_context);
    RPC_DATA->client_mid(mid);

//    register_client_rpcs();

    return true;
}
 No newline at end of file