Loading include/daemon/main.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -32,11 +32,11 @@ extern "C" { #define ADAFS_DATA (static_cast<FsData*>(FsData::getInstance())) #define RPC_DATA (static_cast<RPCData*>(RPCData::getInstance())) bool init_environment(); void init_environment(); void destroy_enviroment(); bool init_io_tasklet_pool(); bool init_rpc_server(); void init_io_tasklet_pool(); void init_rpc_server(const std::string& protocol); void register_server_rpcs(margo_instance_id mid); Loading src/daemon/main.cpp +60 −63 Original line number Diff line number Diff line Loading @@ -41,36 +41,45 @@ namespace bfs = boost::filesystem; static condition_variable shutdown_please; static mutex mtx; bool init_environment() { void init_environment() { // Initialize metadata db std::string metadata_path = ADAFS_DATA->metadir() + "/rocksdb"s; ADAFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path); try { ADAFS_DATA->mdb(std::make_shared<MetadataDB>(metadata_path)); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize metadata DB: {}", __func__, e.what()); return false; ADAFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what()); throw; } // Initialize data backend std::string chunk_storage_path = ADAFS_DATA->rootdir() + "/data/chunks"s; ADAFS_DATA->spdlogger()->debug("{}() Creating chunk storage directory: '{}'", __func__, chunk_storage_path); ADAFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); bfs::create_directories(chunk_storage_path); try { ADAFS_DATA->storage(std::make_shared<ChunkStorage>(chunk_storage_path, CHUNKSIZE)); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize storage backend: {}", __func__, e.what()); ADAFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what()); throw; } // Init margo for RPC if (!init_rpc_server()) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize margo rpc server.", __func__); return false; auto protocol_port = fmt::format("{}://{}:{}", RPC_PROTOCOL, ADAFS_DATA->rpc_addr(), ADAFS_DATA->rpc_port()); ADAFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", __func__, protocol_port); try { init_rpc_server(protocol_port); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); throw; } // Init Argobots ESs to drive IO if (!init_io_tasklet_pool()) { ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Argobots pool for I/O.", __func__); return false; try { ADAFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__); init_io_tasklet_pool(); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what()); throw; } /* Setup distributor */ Loading @@ -88,17 +97,16 @@ bool init_environment() { try { create_metadentry("/", root_md); } catch (const std::exception& e ) { ADAFS_DATA->spdlogger()->error("{}() Unable to write root metadentry to KV store: {}", __func__, e.what()); return false; throw runtime_error("Failed to write root metadentry to KV store: "s + e.what()); } try { // Register daemon to system ADAFS_DATA->spdlogger()->debug("{}() Creating daemon pid file", __func__); try { register_daemon_proc(); } catch (const std::exception& e ) { cout << "ERROR: failed to register daemon process: " << e.what() << endl; ADAFS_DATA->spdlogger()->error("{}() Unable to register the daemon process to the system: {}", __func__, e.what()); return false; ADAFS_DATA->spdlogger()->error("Failed to register the daemon process to the system: {}", __func__, e.what()); throw; } try { Loading @@ -106,46 +114,44 @@ bool init_environment() { populate_lookup_file(); } } catch (const std::exception& e ) { cout << "ERROR: failed to populate lookup file: " << e.what() << endl; ADAFS_DATA->spdlogger()->error("{}() failed to populate lookup file: {}", __func__, e.what()); return false; ADAFS_DATA->spdlogger()->error("{}() Failed to populate lookup file: {}", __func__, e.what()); throw; } ADAFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); return true; } /** * Destroys the margo, argobots, and mercury environments */ void destroy_enviroment() { ADAFS_DATA->spdlogger()->debug("{}() Removing mount directory", __func__); bfs::remove_all(ADAFS_DATA->mountdir()); ADAFS_DATA->spdlogger()->debug("{}() Freeing I/O executions streams", __func__); for (unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) { ABT_xstream_join(RPC_DATA->io_streams().at(i)); ABT_xstream_free(&RPC_DATA->io_streams().at(i)); } ADAFS_DATA->spdlogger()->info("{}() Freeing I/O executions streams successful", __func__); if (!deregister_daemon_proc()) ADAFS_DATA->spdlogger()->warn("{}() Unable to clean up auxiliary files", __func__); else ADAFS_DATA->spdlogger()->debug("{}() Cleaning auxiliary files successful", __func__); ADAFS_DATA->spdlogger()->debug("{}() Removing pid file", __func__); if (!deregister_daemon_proc()) { ADAFS_DATA->spdlogger()->warn("{}() Failed to remove pid file", __func__); } if (!ADAFS_DATA->lookup_file().empty()) { ADAFS_DATA->spdlogger()->debug("{}() Removing lookup file", __func__); destroy_lookup_file(); } if (RPC_DATA->server_rpc_mid() != nullptr) { ADAFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC server", __func__); margo_finalize(RPC_DATA->server_rpc_mid()); ADAFS_DATA->spdlogger()->info("{}() Margo RPC server shut down successful", __func__); } ADAFS_DATA->spdlogger()->info("{}() Closing DB...", __func__); ADAFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__); ADAFS_DATA->close_mdb(); ADAFS_DATA->spdlogger()->info("Shutdown complete"); } bool init_io_tasklet_pool() { void init_io_tasklet_pool() { assert(DAEMON_IO_XSTREAMS >= 0); unsigned int xstreams_num = DAEMON_IO_XSTREAMS; Loading @@ -153,8 +159,7 @@ bool init_io_tasklet_pool() { ABT_pool pool; auto ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &pool); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() Failed to create I/O tasks pool", __func__); return false; throw runtime_error("Failed to create I/O tasks pool"); } //create all subsequent xstream and the associated scheduler, all tapping into the same pool Loading @@ -163,22 +168,18 @@ bool init_io_tasklet_pool() { ret = ABT_xstream_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool, ABT_SCHED_CONFIG_NULL, &xstreams[i]); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() Failed to create task execution streams for I/O operations", __func__); return false; throw runtime_error("Failed to create task execution streams for I/O operations"); } } RPC_DATA->io_streams(xstreams); RPC_DATA->io_pool(pool); return true; } bool init_rpc_server() { auto protocol_port = fmt::format("{}://{}:{}", RPC_PROTOCOL, ADAFS_DATA->rpc_addr(), ADAFS_DATA->rpc_port()); void init_rpc_server(const string & protocol_port) { hg_addr_t addr_self; hg_size_t addr_self_cstring_sz = 128; char addr_self_cstring[128]; ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo RPC server...", __func__); // IMPORTANT: this struct needs to be zeroed before use struct hg_init_info hg_options = {}; hg_options.auto_sm = HG_TRUE; Loading @@ -191,39 +192,33 @@ bool init_rpc_server() { HG_TRUE, DAEMON_RPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init failed to initialize the Margo RPC server", __func__); return false; throw runtime_error("Failed to initialize the Margo RPC server"); } // Figure out what address this server is listening on (must be freed when finished) auto hret = margo_addr_self(mid, &addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_self() Failed to retrieve server RPC address", __func__); margo_finalize(mid); return false; throw runtime_error("Failed to retrieve server RPC address"); } // Convert the address to a cstring (with \0 terminator). hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_to_string Failed to convert address to cstring", __func__); margo_addr_free(mid, addr_self); margo_finalize(mid); return false; throw runtime_error("Failed to convert server RPC address to string"); } margo_addr_free(mid, addr_self); std::string addr_self_str(addr_self_cstring); RPC_DATA->self_addr_str(addr_self_str); ADAFS_DATA->spdlogger()->info("{}() Margo RPC server initialized. Accepting RPCs on address {}", __func__, addr_self_cstring); ADAFS_DATA->spdlogger()->info("{}() Accepting RPCs on address {}", __func__, addr_self_cstring); // Put context and class into RPC_data object RPC_DATA->server_rpc_mid(mid); // register RPCs register_server_rpcs(mid); return true; } /** Loading Loading @@ -294,6 +289,7 @@ bool deregister_daemon_proc() { } void populate_lookup_file() { ADAFS_DATA->spdlogger()->debug("{}() Populating lookup file: '{}'", __func__, ADAFS_DATA->lookup_file()); ofstream lfstream(ADAFS_DATA->lookup_file(), ios::out | ios::app); lfstream << get_my_hostname(true) + " "s + RPC_DATA->self_addr_str() << std::endl; Loading @@ -304,6 +300,7 @@ void destroy_lookup_file() { } void shutdown_handler(int dummy) { ADAFS_DATA->spdlogger()->info("{}() Received signal: '{}'", __func__, strsignal(dummy)); shutdown_please.notify_all(); } Loading Loading @@ -459,7 +456,7 @@ int main(int argc, const char* argv[]) { ADAFS_DATA->host_size(hostmap.size()); ADAFS_DATA->hosts_raw(hosts_raw); ADAFS_DATA->spdlogger()->info("{}() Initializing environment. Hold on ...", __func__); ADAFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); assert(vm.count("mountdir")); auto mountdir = vm["mountdir"].as<string>(); Loading @@ -481,7 +478,12 @@ int main(int argc, const char* argv[]) { ADAFS_DATA->metadir(ADAFS_DATA->rootdir()); } if (init_environment()) { try { init_environment(); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() Failed to initialize environment: {}", __func__, e.what()); } signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); signal(SIGKILL, shutdown_handler); Loading @@ -489,13 +491,8 @@ int main(int argc, const char* argv[]) { unique_lock<mutex> lk(mtx); // Wait for shutdown signal to initiate shutdown protocols shutdown_please.wait(lk); ADAFS_DATA->spdlogger()->info("{}() Shutting done signal encountered. Shutting down ...", __func__); } else { ADAFS_DATA->spdlogger()->info("{}() Starting up daemon environment failed. Shutting down ...", __func__); } ADAFS_DATA->spdlogger()->info("{}() Shutting down", __func__); destroy_enviroment(); ADAFS_DATA->spdlogger()->info("{}() Exiting", __func__); return 0; } Loading
include/daemon/main.hpp +3 −3 Original line number Diff line number Diff line Loading @@ -32,11 +32,11 @@ extern "C" { #define ADAFS_DATA (static_cast<FsData*>(FsData::getInstance())) #define RPC_DATA (static_cast<RPCData*>(RPCData::getInstance())) bool init_environment(); void init_environment(); void destroy_enviroment(); bool init_io_tasklet_pool(); bool init_rpc_server(); void init_io_tasklet_pool(); void init_rpc_server(const std::string& protocol); void register_server_rpcs(margo_instance_id mid); Loading
src/daemon/main.cpp +60 −63 Original line number Diff line number Diff line Loading @@ -41,36 +41,45 @@ namespace bfs = boost::filesystem; static condition_variable shutdown_please; static mutex mtx; bool init_environment() { void init_environment() { // Initialize metadata db std::string metadata_path = ADAFS_DATA->metadir() + "/rocksdb"s; ADAFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'", __func__, metadata_path); try { ADAFS_DATA->mdb(std::make_shared<MetadataDB>(metadata_path)); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize metadata DB: {}", __func__, e.what()); return false; ADAFS_DATA->spdlogger()->error("{}() Failed to initialize metadata DB: {}", __func__, e.what()); throw; } // Initialize data backend std::string chunk_storage_path = ADAFS_DATA->rootdir() + "/data/chunks"s; ADAFS_DATA->spdlogger()->debug("{}() Creating chunk storage directory: '{}'", __func__, chunk_storage_path); ADAFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'", __func__, chunk_storage_path); bfs::create_directories(chunk_storage_path); try { ADAFS_DATA->storage(std::make_shared<ChunkStorage>(chunk_storage_path, CHUNKSIZE)); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize storage backend: {}", __func__, e.what()); ADAFS_DATA->spdlogger()->error("{}() Failed to initialize storage backend: {}", __func__, e.what()); throw; } // Init margo for RPC if (!init_rpc_server()) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize margo rpc server.", __func__); return false; auto protocol_port = fmt::format("{}://{}:{}", RPC_PROTOCOL, ADAFS_DATA->rpc_addr(), ADAFS_DATA->rpc_port()); ADAFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'", __func__, protocol_port); try { init_rpc_server(protocol_port); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() Failed to initialize RPC server: {}", __func__, e.what()); throw; } // Init Argobots ESs to drive IO if (!init_io_tasklet_pool()) { ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Argobots pool for I/O.", __func__); return false; try { ADAFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__); init_io_tasklet_pool(); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() Failed to initialize Argobots pool for I/O: {}", __func__, e.what()); throw; } /* Setup distributor */ Loading @@ -88,17 +97,16 @@ bool init_environment() { try { create_metadentry("/", root_md); } catch (const std::exception& e ) { ADAFS_DATA->spdlogger()->error("{}() Unable to write root metadentry to KV store: {}", __func__, e.what()); return false; throw runtime_error("Failed to write root metadentry to KV store: "s + e.what()); } try { // Register daemon to system ADAFS_DATA->spdlogger()->debug("{}() Creating daemon pid file", __func__); try { register_daemon_proc(); } catch (const std::exception& e ) { cout << "ERROR: failed to register daemon process: " << e.what() << endl; ADAFS_DATA->spdlogger()->error("{}() Unable to register the daemon process to the system: {}", __func__, e.what()); return false; ADAFS_DATA->spdlogger()->error("Failed to register the daemon process to the system: {}", __func__, e.what()); throw; } try { Loading @@ -106,46 +114,44 @@ bool init_environment() { populate_lookup_file(); } } catch (const std::exception& e ) { cout << "ERROR: failed to populate lookup file: " << e.what() << endl; ADAFS_DATA->spdlogger()->error("{}() failed to populate lookup file: {}", __func__, e.what()); return false; ADAFS_DATA->spdlogger()->error("{}() Failed to populate lookup file: {}", __func__, e.what()); throw; } ADAFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); return true; } /** * Destroys the margo, argobots, and mercury environments */ void destroy_enviroment() { ADAFS_DATA->spdlogger()->debug("{}() Removing mount directory", __func__); bfs::remove_all(ADAFS_DATA->mountdir()); ADAFS_DATA->spdlogger()->debug("{}() Freeing I/O executions streams", __func__); for (unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) { ABT_xstream_join(RPC_DATA->io_streams().at(i)); ABT_xstream_free(&RPC_DATA->io_streams().at(i)); } ADAFS_DATA->spdlogger()->info("{}() Freeing I/O executions streams successful", __func__); if (!deregister_daemon_proc()) ADAFS_DATA->spdlogger()->warn("{}() Unable to clean up auxiliary files", __func__); else ADAFS_DATA->spdlogger()->debug("{}() Cleaning auxiliary files successful", __func__); ADAFS_DATA->spdlogger()->debug("{}() Removing pid file", __func__); if (!deregister_daemon_proc()) { ADAFS_DATA->spdlogger()->warn("{}() Failed to remove pid file", __func__); } if (!ADAFS_DATA->lookup_file().empty()) { ADAFS_DATA->spdlogger()->debug("{}() Removing lookup file", __func__); destroy_lookup_file(); } if (RPC_DATA->server_rpc_mid() != nullptr) { ADAFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC server", __func__); margo_finalize(RPC_DATA->server_rpc_mid()); ADAFS_DATA->spdlogger()->info("{}() Margo RPC server shut down successful", __func__); } ADAFS_DATA->spdlogger()->info("{}() Closing DB...", __func__); ADAFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__); ADAFS_DATA->close_mdb(); ADAFS_DATA->spdlogger()->info("Shutdown complete"); } bool init_io_tasklet_pool() { void init_io_tasklet_pool() { assert(DAEMON_IO_XSTREAMS >= 0); unsigned int xstreams_num = DAEMON_IO_XSTREAMS; Loading @@ -153,8 +159,7 @@ bool init_io_tasklet_pool() { ABT_pool pool; auto ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC, ABT_TRUE, &pool); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() Failed to create I/O tasks pool", __func__); return false; throw runtime_error("Failed to create I/O tasks pool"); } //create all subsequent xstream and the associated scheduler, all tapping into the same pool Loading @@ -163,22 +168,18 @@ bool init_io_tasklet_pool() { ret = ABT_xstream_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool, ABT_SCHED_CONFIG_NULL, &xstreams[i]); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() Failed to create task execution streams for I/O operations", __func__); return false; throw runtime_error("Failed to create task execution streams for I/O operations"); } } RPC_DATA->io_streams(xstreams); RPC_DATA->io_pool(pool); return true; } bool init_rpc_server() { auto protocol_port = fmt::format("{}://{}:{}", RPC_PROTOCOL, ADAFS_DATA->rpc_addr(), ADAFS_DATA->rpc_port()); void init_rpc_server(const string & protocol_port) { hg_addr_t addr_self; hg_size_t addr_self_cstring_sz = 128; char addr_self_cstring[128]; ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo RPC server...", __func__); // IMPORTANT: this struct needs to be zeroed before use struct hg_init_info hg_options = {}; hg_options.auto_sm = HG_TRUE; Loading @@ -191,39 +192,33 @@ bool init_rpc_server() { HG_TRUE, DAEMON_RPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init failed to initialize the Margo RPC server", __func__); return false; throw runtime_error("Failed to initialize the Margo RPC server"); } // Figure out what address this server is listening on (must be freed when finished) auto hret = margo_addr_self(mid, &addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_self() Failed to retrieve server RPC address", __func__); margo_finalize(mid); return false; throw runtime_error("Failed to retrieve server RPC address"); } // Convert the address to a cstring (with \0 terminator). hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_to_string Failed to convert address to cstring", __func__); margo_addr_free(mid, addr_self); margo_finalize(mid); return false; throw runtime_error("Failed to convert server RPC address to string"); } margo_addr_free(mid, addr_self); std::string addr_self_str(addr_self_cstring); RPC_DATA->self_addr_str(addr_self_str); ADAFS_DATA->spdlogger()->info("{}() Margo RPC server initialized. Accepting RPCs on address {}", __func__, addr_self_cstring); ADAFS_DATA->spdlogger()->info("{}() Accepting RPCs on address {}", __func__, addr_self_cstring); // Put context and class into RPC_data object RPC_DATA->server_rpc_mid(mid); // register RPCs register_server_rpcs(mid); return true; } /** Loading Loading @@ -294,6 +289,7 @@ bool deregister_daemon_proc() { } void populate_lookup_file() { ADAFS_DATA->spdlogger()->debug("{}() Populating lookup file: '{}'", __func__, ADAFS_DATA->lookup_file()); ofstream lfstream(ADAFS_DATA->lookup_file(), ios::out | ios::app); lfstream << get_my_hostname(true) + " "s + RPC_DATA->self_addr_str() << std::endl; Loading @@ -304,6 +300,7 @@ void destroy_lookup_file() { } void shutdown_handler(int dummy) { ADAFS_DATA->spdlogger()->info("{}() Received signal: '{}'", __func__, strsignal(dummy)); shutdown_please.notify_all(); } Loading Loading @@ -459,7 +456,7 @@ int main(int argc, const char* argv[]) { ADAFS_DATA->host_size(hostmap.size()); ADAFS_DATA->hosts_raw(hosts_raw); ADAFS_DATA->spdlogger()->info("{}() Initializing environment. Hold on ...", __func__); ADAFS_DATA->spdlogger()->info("{}() Initializing environment", __func__); assert(vm.count("mountdir")); auto mountdir = vm["mountdir"].as<string>(); Loading @@ -481,7 +478,12 @@ int main(int argc, const char* argv[]) { ADAFS_DATA->metadir(ADAFS_DATA->rootdir()); } if (init_environment()) { try { init_environment(); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() Failed to initialize environment: {}", __func__, e.what()); } signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); signal(SIGKILL, shutdown_handler); Loading @@ -489,13 +491,8 @@ int main(int argc, const char* argv[]) { unique_lock<mutex> lk(mtx); // Wait for shutdown signal to initiate shutdown protocols shutdown_please.wait(lk); ADAFS_DATA->spdlogger()->info("{}() Shutting done signal encountered. Shutting down ...", __func__); } else { ADAFS_DATA->spdlogger()->info("{}() Starting up daemon environment failed. Shutting down ...", __func__); } ADAFS_DATA->spdlogger()->info("{}() Shutting down", __func__); destroy_enviroment(); ADAFS_DATA->spdlogger()->info("{}() Exiting", __func__); return 0; }