Commit 5857af5a authored by Marc Vef's avatar Marc Vef
Browse files

Added: register daemon to system for storing auxiliary files + better daemon...

Added: register daemon to system for storing auxiliary files + better daemon startup and shutdown handling
parent 36e14292
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -4,7 +4,7 @@

#include "../../main.hpp"

void init_environment();
bool init_environment();
void destroy_enviroment();

bool init_ipc_server();
@@ -12,4 +12,10 @@ bool init_rpc_server();

void register_server_rpcs(margo_instance_id mid);

std::string daemon_register_path();

bool register_daemon_proc();

bool deregister_daemon_proc();

#endif //IFS_ADAFS_DAEMON_HPP
+11 −11
Original line number Diff line number Diff line
@@ -152,7 +152,7 @@ int main(int argc, const char* argv[]) {
    ADAFS_DATA->chunk_path(ADAFS_DATA->rootdir() + "/data/chunks"s);
    ADAFS_DATA->mgmt_path(ADAFS_DATA->rootdir() + "/mgmt"s);

    ADAFS_DATA->spdlogger()->info("adafs_ll_init() enter"s);
    ADAFS_DATA->spdlogger()->info("{}() Initializing environment. Hold on ...", __func__);

    // Make sure directory structure exists
    bfs::create_directories(ADAFS_DATA->dentry_path());
@@ -160,8 +160,7 @@ int main(int argc, const char* argv[]) {
    bfs::create_directories(ADAFS_DATA->chunk_path());
    bfs::create_directories(ADAFS_DATA->mgmt_path());

    init_environment();

    if (init_environment()) {
        signal(SIGINT, shutdown_handler);
        signal(SIGTERM, shutdown_handler);
        signal(SIGKILL, shutdown_handler);
@@ -169,9 +168,10 @@ int main(int argc, const char* argv[]) {
        while (!shutdown_please) {
            sleep(1);
        }

    ADAFS_DATA->spdlogger()->info("Shutting done signal encountered. Shutting down ...");

        ADAFS_DATA->spdlogger()->info("{}() Shutting done signal encountered. Shutting down ...", __func__);
    } else {
        ADAFS_DATA->spdlogger()->info("{}() Starting up daemon environment failed. Shutting down ...", __func__);
    }

    destroy_enviroment();

+91 −24
Original line number Diff line number Diff line
@@ -6,15 +6,29 @@
#include <preload/ipc_types.hpp>
#include <adafs_ops/metadentry.hpp>

void init_environment() {
static const string daemon_aux_path = "/tmp/adafs"s;


bool init_environment() {
    // Initialize rocksdb
    auto err = init_rocksdb();
    assert(err);
    if (!init_rocksdb()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize RocksDB.", __func__);
        return false;
    }
    // init margo
    err = init_rpc_server();
    assert(err);
    err = init_ipc_server();
    assert(err);
    if (!init_rpc_server()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Margo RPC server.", __func__);
        return false;
    }
    if (!init_ipc_server()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Margo IPC server.", __func__);
        return false;
    }
    // Register daemon to system
    if (!register_daemon_proc()) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to register the daemon process to the system.", __func__);
        return false;
    }
    // TODO set metadata configurations. these have to go into a user configurable file that is parsed here
    ADAFS_DATA->atime_state(MDATA_USE_ATIME);
    ADAFS_DATA->mtime_state(MDATA_USE_MTIME);
@@ -25,7 +39,12 @@ void init_environment() {
    ADAFS_DATA->link_cnt_state(MDATA_USE_LINK_CNT);
    ADAFS_DATA->blocks_state(MDATA_USE_BLOCKS);
    // Create metadentry for root directory
    create_metadentry(ADAFS_DATA->mountdir(), S_IFDIR | 777);
    if (create_metadentry(ADAFS_DATA->mountdir(), S_IFDIR | 777) != 0) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to write root metadentry to KV store.", __func__);
        return false;
    }
    ADAFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
    return true;
}

/**
@@ -38,12 +57,20 @@ void destroy_enviroment() {
    cout << "\n####################\n\nMargo RPC server stats: " << endl;
    margo_diag_dump(RPC_DATA->server_rpc_mid(), "-", 0);
#endif
    ADAFS_DATA->spdlogger()->info("About to finalize the margo server");
    ADAFS_DATA->spdlogger()->info("{}() Shutting down daemon ...", __func__);
    if (!deregister_daemon_proc())
        ADAFS_DATA->spdlogger()->warn("{}() Unable to clean up auxiliary files", __func__);
    else
        ADAFS_DATA->spdlogger()->info("{}() Cleaning auxiliary files successful", __func__);
    // The shutdown order is important because the RPC server is started first, it has to be stopped last due to Argobots
    if (RPC_DATA->server_ipc_mid() != nullptr) {
        margo_finalize(RPC_DATA->server_ipc_mid());
    ADAFS_DATA->spdlogger()->info("Shut down Margo IPC server successful");
        ADAFS_DATA->spdlogger()->info("{}() Margo IPC server shut down successful", __func__);
    }
    if (RPC_DATA->server_rpc_mid() != nullptr) {
        margo_finalize(RPC_DATA->server_rpc_mid());
    ADAFS_DATA->spdlogger()->info("Shut down Margo RPC server successful");
        ADAFS_DATA->spdlogger()->info("{}() Margo RPC server shut down successful", __func__);
    }
    ADAFS_DATA->spdlogger()->info("All services shut down. ADA-FS shutdown complete.");
}

@@ -54,12 +81,12 @@ bool init_ipc_server() {
    char addr_self_cstring[128];


    ADAFS_DATA->spdlogger()->info("Initializing Margo IPC server...");
    ADAFS_DATA->spdlogger()->info("{}() Initializing Margo IPC server...", __func__);
    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, 16);

    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo IPC server");
        ADAFS_DATA->spdlogger()->error("{}() margo_init() failed to initialize the Margo IPC server", __func__);
        return false;
    }
#ifdef MARGODIAG
@@ -68,21 +95,22 @@ bool init_ipc_server() {
    // Figure out what address this server is listening on (must be freed when finished)
    auto hret = margo_addr_self(mid, &addr_self);
    if (hret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("margo_addr_self() Failed to retrieve server IPC address");
        ADAFS_DATA->spdlogger()->error("{}() margo_addr_self() Failed to retrieve server IPC address", __func__);
        margo_finalize(mid);
        return false;
    }
    // Convert the address to a cstring (with \0 terminator).
    hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self);
    if (hret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("margo_addr_to_string Failed to convert address to cstring");
        ADAFS_DATA->spdlogger()->error("{}() margo_addr_to_string() Failed to convert address to cstring", __func__);
        margo_addr_free(mid, addr_self);
        margo_finalize(mid);
        return false;
    }
    margo_addr_free(mid, addr_self);

    ADAFS_DATA->spdlogger()->info("Success. Accepting IPCs on PID {}", addr_self_cstring);
    ADAFS_DATA->spdlogger()->info("{}() Margo IPC server initialized. Accepting IPCs on PID {}", __func__,
                                  addr_self_cstring);

    // Put context and class into RPC_data object
    RPC_DATA->server_ipc_mid(mid);
@@ -98,11 +126,11 @@ bool init_rpc_server() {
    hg_addr_t addr_self;
    hg_size_t addr_self_cstring_sz = 128;
    char addr_self_cstring[128];
    ADAFS_DATA->spdlogger()->info("Initializing Margo RPC server...");
    ADAFS_DATA->spdlogger()->info("{}() Initializing Margo RPC server...", __func__);
    // Start Margo (this will also initialize Argobots and Mercury internally)
    auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, 16);
    if (mid == MARGO_INSTANCE_NULL) {
        ADAFS_DATA->spdlogger()->error("margo_init failed to initialize the Margo RPC server");
        ADAFS_DATA->spdlogger()->error("{}() margo_init failed to initialize the Margo RPC server", __func__);
        return false;
    }
#ifdef MARGODIAG
@@ -111,14 +139,14 @@ bool init_rpc_server() {
    // Figure out what address this server is listening on (must be freed when finished)
    auto hret = margo_addr_self(mid, &addr_self);
    if (hret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("margo_addr_self() Failed to retrieve server RPC address");
        ADAFS_DATA->spdlogger()->error("{}() margo_addr_self() Failed to retrieve server RPC address", __func__);
        margo_finalize(mid);
        return false;
    }
    // Convert the address to a cstring (with \0 terminator).
    hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self);
    if (hret != HG_SUCCESS) {
        ADAFS_DATA->spdlogger()->error("margo_addr_to_string Failed to convert address to cstring");
        ADAFS_DATA->spdlogger()->error("{}() margo_addr_to_string Failed to convert address to cstring", __func__);
        margo_addr_free(mid, addr_self);
        margo_finalize(mid);
        return false;
@@ -126,7 +154,8 @@ bool init_rpc_server() {
    margo_addr_free(mid, addr_self);


    ADAFS_DATA->spdlogger()->info("Success. Accepting RPCs on address {}", addr_self_cstring);
    ADAFS_DATA->spdlogger()->info("{}() Margo RPC server initialized. Accepting RPCs on address {}", __func__,
                                  addr_self_cstring);

    // Put context and class into RPC_data object
    RPC_DATA->server_rpc_mid(mid);
@@ -138,7 +167,7 @@ bool init_rpc_server() {
}

/**
 *
 * Registers RPC handlers to Margo instance
 * @param hg_class
 */
void register_server_rpcs(margo_instance_id mid) {
@@ -156,3 +185,41 @@ void register_server_rpcs(margo_instance_id mid) {
    MARGO_REGISTER(mid, hg_tag::write_data, rpc_write_data_in_t, rpc_data_out_t, rpc_srv_write_data);
    MARGO_REGISTER(mid, hg_tag::read_data, rpc_read_data_in_t, rpc_data_out_t, rpc_srv_read_data);
}

/**
 * Returns the path where daemon process writes information for the running clients
 * @return string
 */
string daemon_register_path() {
    return ("/tmp/adafs/daemon_"s + to_string(getpid()) + ".run"s);
}

/**
 * Registers the daemon process to the system.
 * This will create a file with additional information for clients started on the same node.
 * @return
 */
bool register_daemon_proc() {
    auto ret = false;
    if (!bfs::exists(daemon_aux_path) && !bfs::create_directories(daemon_aux_path)) {
        ADAFS_DATA->spdlogger()->error("{}() Unable to create adafs auxiliary directory in {}", __func__,
                                       daemon_aux_path);
        return false;
    }
    ofstream ofs(daemon_register_path().c_str(), ::ofstream::trunc);
    if (ofs) {
        ofs << ADAFS_DATA->mountdir();
        ret = true;
    }
    if (ofs.bad()) {
        perror("Error opening file to register daemon process");
        ADAFS_DATA->spdlogger()->error("{}() Error opening file to register daemon process", __func__);
        return false;
    }
    ofs.close();
    return ret;
}

bool deregister_daemon_proc() {
    return bfs::remove(daemon_register_path());
}
 No newline at end of file
+3 −3
Original line number Diff line number Diff line
@@ -28,7 +28,7 @@ bool init_rocksdb() {
//    rocksdb::OptimisticTransactionDB* txn_db;
//    rocksdb::OptimisticTransactionOptions txn_options{};
//    ADAFS_DATA->txn_rdb_options(txn_options);
    ADAFS_DATA->spdlogger()->info("RocksDB options set. About to connect...");
    ADAFS_DATA->spdlogger()->info("{}() RocksDB options set. About to connect...", __func__);
    // open DB
//    auto s = rocksdb::OptimisticTransactionDB::Open(ADAFS_DATA->rdb_options(), ADAFS_DATA->rdb_path(), &txn_db);
    auto s = rocksdb::DB::Open(ADAFS_DATA->rdb_options(), ADAFS_DATA->rdb_path(), &db);
@@ -39,10 +39,10 @@ bool init_rocksdb() {
        ADAFS_DATA->rdb(s_db);
//        shared_ptr<rocksdb::OptimisticTransactionDB> s_txn_db(txn_db);
//        ADAFS_DATA->txn_rdb(s_txn_db);
        ADAFS_DATA->spdlogger()->info("RocksDB connection established.");
        ADAFS_DATA->spdlogger()->info("{}() RocksDB connection established.", __func__);
        return true;
    } else {
        ADAFS_DATA->spdlogger()->info("[ERROR] RocksDB connection FAILURE. Exiting...");
        ADAFS_DATA->spdlogger()->error("{}() RocksDB connection FAILURE. Exiting...", __func__);
        return false;
    }
}
+1 −1
Original line number Diff line number Diff line
@@ -124,7 +124,7 @@ void init_passthrough_() {
    spdlog::set_level(spdlog::level::off);
#endif

    ld_logger->info("Passthrough initialized.");
    ld_logger->info("{}() Passthrough initialized.", __func__);
}

void init_passthrough_if_needed() {
Loading