#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; namespace po = boost::program_options; namespace bfs = boost::filesystem; static condition_variable shutdown_please; static mutex mtx; bool init_environment() { // Register daemon to system if (!register_daemon_proc()) { ADAFS_DATA->spdlogger()->error("{}() Unable to register the daemon process to the system.", __func__); return false; } // Initialize metadata db std::string metadata_path = ADAFS_DATA->metadir() + "/rocksdb"s; try { ADAFS_DATA->mdb(std::make_shared(metadata_path)); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize metadata DB: {}", __func__, e.what()); } // Initialize data backend std::string chunk_storage_path = ADAFS_DATA->rootdir() + "/data/chunks"s; ADAFS_DATA->spdlogger()->debug("{}() Creating chunk storage directory: '{}'", __func__, chunk_storage_path); bfs::create_directories(chunk_storage_path); try { ADAFS_DATA->storage(std::make_shared(chunk_storage_path)); } catch (const std::exception & e) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize storage backend: {}", __func__, e.what()); } // Init margo for RPC if (!init_rpc_server()) { ADAFS_DATA->spdlogger()->error("{}() unable to initialize margo rpc server.", __func__); return false; } // Init margo for RPC if (!init_ipc_server()) { ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Margo IPC server.", __func__); return false; } // Init Argobots ESs to drive IO if (!init_io_tasklet_pool()) { ADAFS_DATA->spdlogger()->error("{}() Unable to initialize Argobots pool for I/O.", __func__); return false; } // TODO set metadata configurations. these have to go into a user configurable file that is parsed here ADAFS_DATA->atime_state(MDATA_USE_ATIME); ADAFS_DATA->mtime_state(MDATA_USE_MTIME); ADAFS_DATA->ctime_state(MDATA_USE_CTIME); ADAFS_DATA->uid_state(MDATA_USE_UID); ADAFS_DATA->gid_state(MDATA_USE_GID); ADAFS_DATA->inode_no_state(MDATA_USE_INODE_NO); ADAFS_DATA->link_cnt_state(MDATA_USE_LINK_CNT); ADAFS_DATA->blocks_state(MDATA_USE_BLOCKS); // Create metadentry for root directory try { create_metadentry("/", S_IFDIR | 777); } catch (const std::exception& e ) { ADAFS_DATA->spdlogger()->error("{}() Unable to write root metadentry to KV store: {}", __func__, e.what()); return false; } ADAFS_DATA->spdlogger()->info("Startup successful. Daemon is ready."); return true; } /** * Destroys the margo, argobots, and mercury environments */ void destroy_enviroment() { #ifdef MARGODIAG cout << "\n####################\n\nMargo IPC server stats: " << endl; margo_diag_dump(RPC_DATA->server_ipc_mid(), "-", 0); cout << "\n####################\n\nMargo RPC server stats: " << endl; margo_diag_dump(RPC_DATA->server_rpc_mid(), "-", 0); #endif bfs::remove_all(ADAFS_DATA->mountdir()); for (unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) { ABT_xstream_join(RPC_DATA->io_streams().at(i)); ABT_xstream_free(&RPC_DATA->io_streams().at(i)); } ADAFS_DATA->spdlogger()->info("{}() Freeing I/O executions streams successful", __func__); if (!deregister_daemon_proc()) ADAFS_DATA->spdlogger()->warn("{}() Unable to clean up auxiliary files", __func__); else ADAFS_DATA->spdlogger()->debug("{}() Cleaning auxiliary files successful", __func__); // The shutdown order is important because the RPC server is started first, it has to be stopped last due to Argobots if (RPC_DATA->server_ipc_mid() != nullptr) { margo_finalize(RPC_DATA->server_ipc_mid()); ADAFS_DATA->spdlogger()->info("{}() Margo IPC server shut down successful", __func__); } if (RPC_DATA->server_rpc_mid() != nullptr) { margo_finalize(RPC_DATA->server_rpc_mid()); ADAFS_DATA->spdlogger()->info("{}() Margo RPC server shut down successful", __func__); } ADAFS_DATA->spdlogger()->info("{}() Closing DB...", __func__); ADAFS_DATA->close_mdb(); ADAFS_DATA->spdlogger()->info("All services shut down. ADA-FS shutdown complete."); } bool init_io_tasklet_pool() { vector io_streams_tmp(DAEMON_IO_XSTREAMS); ABT_pool io_pools_tmp; auto ret = ABT_snoozer_xstream_create(DAEMON_IO_XSTREAMS, &io_pools_tmp, io_streams_tmp.data()); if (ret != ABT_SUCCESS) { ADAFS_DATA->spdlogger()->error( "{}() ABT_snoozer_xstream_create() failed to initialize ABT_pool for I/O operations", __func__); return false; } RPC_DATA->io_streams(io_streams_tmp); RPC_DATA->io_pool(io_pools_tmp); return true; } bool init_ipc_server() { auto protocol_port = "na+sm://"s; hg_addr_t addr_self; hg_size_t addr_self_cstring_sz = 128; char addr_self_cstring[128]; ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo IPC server...", __func__); // Start Margo (this will also initialize Argobots and Mercury internally) auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, DAEMON_IPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init() failed to initialize the Margo IPC server", __func__); return false; } #ifdef MARGODIAG margo_diag_start(mid); #endif // Figure out what address this server is listening on (must be freed when finished) auto hret = margo_addr_self(mid, &addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_self() Failed to retrieve server IPC address", __func__); margo_finalize(mid); return false; } // Convert the address to a cstring (with \0 terminator). hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_to_string() Failed to convert address to cstring", __func__); margo_addr_free(mid, addr_self); margo_finalize(mid); return false; } margo_addr_free(mid, addr_self); ADAFS_DATA->spdlogger()->info("{}() Margo IPC server initialized. Accepting IPCs on PID {}", __func__, addr_self_cstring); // Put context and class into RPC_data object RPC_DATA->server_ipc_mid(mid); // register RPCs register_server_rpcs(mid); return true; } bool init_rpc_server() { auto protocol_port = RPC_PROTOCOL + "://localhost:"s + to_string(RPC_PORT); hg_addr_t addr_self; hg_size_t addr_self_cstring_sz = 128; char addr_self_cstring[128]; ADAFS_DATA->spdlogger()->debug("{}() Initializing Margo RPC server...", __func__); // Start Margo (this will also initialize Argobots and Mercury internally) auto mid = margo_init(protocol_port.c_str(), MARGO_SERVER_MODE, 1, DAEMON_RPC_HANDLER_XSTREAMS); if (mid == MARGO_INSTANCE_NULL) { ADAFS_DATA->spdlogger()->error("{}() margo_init failed to initialize the Margo RPC server", __func__); return false; } #ifdef MARGODIAG margo_diag_start(mid); #endif // Figure out what address this server is listening on (must be freed when finished) auto hret = margo_addr_self(mid, &addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_self() Failed to retrieve server RPC address", __func__); margo_finalize(mid); return false; } // Convert the address to a cstring (with \0 terminator). hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz, addr_self); if (hret != HG_SUCCESS) { ADAFS_DATA->spdlogger()->error("{}() margo_addr_to_string Failed to convert address to cstring", __func__); margo_addr_free(mid, addr_self); margo_finalize(mid); return false; } margo_addr_free(mid, addr_self); ADAFS_DATA->spdlogger()->info("{}() Margo RPC server initialized. Accepting RPCs on address {}", __func__, addr_self_cstring); // Put context and class into RPC_data object RPC_DATA->server_rpc_mid(mid); // register RPCs register_server_rpcs(mid); return true; } /** * Registers RPC handlers to Margo instance * @param hg_class */ void register_server_rpcs(margo_instance_id mid) { if (RPC_DATA->server_ipc_mid() == mid) MARGO_REGISTER(mid, hg_tag::fs_config, ipc_config_in_t, ipc_config_out_t, ipc_srv_fs_config); MARGO_REGISTER(mid, hg_tag::minimal, rpc_minimal_in_t, rpc_minimal_out_t, rpc_minimal); MARGO_REGISTER(mid, hg_tag::create, rpc_mk_node_in_t, rpc_err_out_t, rpc_srv_mk_node); MARGO_REGISTER(mid, hg_tag::access, rpc_access_in_t, rpc_err_out_t, rpc_srv_access); MARGO_REGISTER(mid, hg_tag::stat, rpc_path_only_in_t, rpc_stat_out_t, rpc_srv_stat); MARGO_REGISTER(mid, hg_tag::remove, rpc_rm_node_in_t, rpc_err_out_t, rpc_srv_rm_node); MARGO_REGISTER(mid, hg_tag::update_metadentry, rpc_update_metadentry_in_t, rpc_err_out_t, rpc_srv_update_metadentry); MARGO_REGISTER(mid, hg_tag::get_metadentry_size, rpc_path_only_in_t, rpc_get_metadentry_size_out_t, rpc_srv_get_metadentry_size); MARGO_REGISTER(mid, hg_tag::update_metadentry_size, rpc_update_metadentry_size_in_t, rpc_update_metadentry_size_out_t, rpc_srv_update_metadentry_size); MARGO_REGISTER(mid, hg_tag::get_dirents, rpc_get_dirents_in_t, rpc_get_dirents_out_t, rpc_srv_get_dirents); MARGO_REGISTER(mid, hg_tag::write_data, rpc_write_data_in_t, rpc_data_out_t, rpc_srv_write_data); MARGO_REGISTER(mid, hg_tag::read_data, rpc_read_data_in_t, rpc_data_out_t, rpc_srv_read_data); } /** * Registers the daemon process to the system. * This will create a file with additional information for clients started on the same node. * @return */ bool register_daemon_proc() { auto daemon_aux_path = DAEMON_AUX_PATH; if (!bfs::exists(daemon_aux_path) && !bfs::create_directories(daemon_aux_path)) { ADAFS_DATA->spdlogger()->error("{}() Unable to create adafs auxiliary directory in {}", __func__, daemon_aux_path); return false; } auto pid_file = daemon_pid_path(); // check if a pid file exists from another adafs_daemon if (bfs::exists(pid_file)) { // check if another daemon is still running ifstream ifs(pid_file, ::ifstream::in); if (ifs) { string running_pid; // first line is pid of daemon if (getline(ifs, running_pid) && !running_pid.empty()) { // check if pid is running and kill it if (kill(::stoi(running_pid), 0) == 0) { ADAFS_DATA->spdlogger()->warn("{}() Daemon with pid {} is already running. Proceed to force kill", __func__, running_pid); kill(::stoi(running_pid), SIGKILL); sleep(1); if (kill(::stoi(running_pid), 0) == 0) { ADAFS_DATA->spdlogger()->error("{}() Running daemon with pid {} cannot be killed. Exiting ...", __func__, running_pid); ifs.close(); return false; } else ADAFS_DATA->spdlogger()->info("{}() Kill successful", __func__); } } } ifs.close(); } auto my_pid = getpid(); if (my_pid == -1) { ADAFS_DATA->spdlogger()->error("{}() Unable to get pid", __func__); return false; } ofstream ofs(pid_file, ::ofstream::trunc); if (ofs) { ofs << to_string(my_pid); ofs << "\n"; ofs << ADAFS_DATA->mountdir(); } else { cerr << "Unable to create daemon pid file at " << pid_file << endl; ADAFS_DATA->spdlogger()->error("{}() Unable to create daemon pid file at {}. No permissions?", __func__, pid_file); return false; } ofs.close(); return true; } bool deregister_daemon_proc() { return bfs::remove(daemon_pid_path()); } /** * Returns the machine's hostname * @return */ string get_my_hostname(bool short_hostname) { char hostname[1024]; auto ret = gethostname(hostname, 1024); if (ret == 0) { string hostname_s(hostname); if (!short_hostname) return hostname_s; // get short hostname auto pos = hostname_s.find("."s); if (pos != std::string::npos) hostname_s = hostname_s.substr(0, pos); return hostname_s; } else return ""s; } void shutdown_handler(int dummy) { shutdown_please.notify_all(); } void initialize_loggers() { auto logger_names = std::vector{ "main", "MetadataDB", "ChunkStorage", }; /* Create common sink */ auto file_sink = std::make_shared(LOG_DAEMON_PATH); /* Create and configure loggers */ auto loggers = std::list>(); for(const auto& name: logger_names){ auto logger = std::make_shared(name, file_sink); logger->flush_on(spdlog::level::trace); loggers.push_back(logger); } /* register loggers */ for(const auto& logger: loggers){ spdlog::register_logger(logger); } // set logger format spdlog::set_pattern("[%C-%m-%d %H:%M:%S.%f] %P [%L][%n] %v"); #if defined(LOG_TRACE) spdlog::set_level(spdlog::level::trace); #elif defined(LOG_DEBUG) spdlog::set_level(spdlog::level::debug); #elif defined(LOG_INFO) spdlog::set_level(spdlog::level::info); #else spdlog::set_level(spdlog::level::off); #endif } int main(int argc, const char* argv[]) { initialize_loggers(); ADAFS_DATA->spdlogger(spdlog::get("main")); // Parse input po::options_description desc("Allowed options"); desc.add_options() ("help,h", "Help message") ("mountdir,m", po::value()->required(), "User Fuse mountdir") ("rootdir,r", po::value()->required(), "ADA-FS data directory") ("metadir,i", po::value(), "ADA-FS metadata directory, if not set rootdir is used for metadata ") ("hostfile", po::value(), "Path to the hosts_file for all fs participants") ("hosts,h", po::value(), "Comma separated list of hosts_ for all fs participants"); po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); if (vm.count("help")) { cout << desc << "\n"; return 1; } try { po::notify(vm); } catch (po::required_option& e) { std::cerr << "Error: " << e.what() << "\n"; return 1; } if (vm.count("mountdir")) { ADAFS_DATA->mountdir(vm["mountdir"].as()); } if (vm.count("rootdir")) { ADAFS_DATA->rootdir(vm["rootdir"].as()); } if (vm.count("metadir")) { ADAFS_DATA->metadir(vm["metadir"].as()); } else if (vm.count("rootdir")) { ADAFS_DATA->metadir(vm["rootdir"].as()); } // parse host parameters vector hosts{}; if (vm.count("hostfile")) { auto host_path = vm["hostfile"].as(); fstream host_file(host_path); if (host_file.is_open()) { for (string line; getline(host_file, line);) { if (line.at(0) != '#') { auto subline = line.substr(0, line.find(' ')); hosts.push_back(subline); } } } else { cerr << "Hostfile path does not exist. Exiting ..." << endl; ADAFS_DATA->spdlogger()->error("{}() Hostfile path does not exist. Exiting ...", __func__); assert(host_file.is_open()); } } else if (vm.count("hosts")) { // split comma separated host string boost::char_separator sep(","); boost::tokenizer> tok(vm["hosts"].as(), sep); for (auto&& s : tok) { hosts.push_back(s); } } // convert host parameters into datastructures std::map hostmap; auto hosts_raw = ""s; if (!hosts.empty()) { auto i = static_cast(0); auto found_hostname = false; auto hostname = get_my_hostname(true); if (hostname.empty()) { cerr << "Unable to read the machine's hostname" << endl; ADAFS_DATA->spdlogger()->error("{}() Unable to read the machine's hostname", __func__); assert(!hostname.empty()); } for (auto&& host : hosts) { hostmap[i] = host; hosts_raw += host + ","s; if (hostname == host) { ADAFS_DATA->host_id(i); found_hostname = true; } i++; } if (!found_hostname) { ADAFS_DATA->spdlogger()->error("{}() Hostname was not found in given parameters. Exiting ...", __func__); cerr << "Hostname was not found in given parameters. Exiting ..." << endl; assert(found_hostname); } hosts_raw = hosts_raw.substr(0, hosts_raw.size() - 1); } else { // single node mode ADAFS_DATA->spdlogger()->info("{}() Single node mode set to self", __func__); auto hostname = get_my_hostname(false); hostmap[0] = hostname; hosts_raw = hostname; ADAFS_DATA->host_id(0); } ADAFS_DATA->hosts(hostmap); ADAFS_DATA->host_size(hostmap.size()); ADAFS_DATA->rpc_port(fmt::FormatInt(RPC_PORT).str()); ADAFS_DATA->hosts_raw(hosts_raw); ADAFS_DATA->spdlogger()->info("{}() Initializing environment. Hold on ...", __func__); // Make sure directory structure exists bfs::create_directories(ADAFS_DATA->metadir()); // Create mountdir. We use this dir to get some information on the underlying fs with statfs in adafs_statfs bfs::create_directories(ADAFS_DATA->mountdir()); if (init_environment()) { signal(SIGINT, shutdown_handler); signal(SIGTERM, shutdown_handler); signal(SIGKILL, shutdown_handler); unique_lock lk(mtx); // Wait for shutdown signal to initiate shutdown protocols shutdown_please.wait(lk); ADAFS_DATA->spdlogger()->info("{}() Shutting done signal encountered. Shutting down ...", __func__); } else { ADAFS_DATA->spdlogger()->info("{}() Starting up daemon environment failed. Shutting down ...", __func__); } destroy_enviroment(); return 0; }