LCOV - code coverage report
Current view: top level - src/daemon - daemon.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 286 407 70.3 %
Date: 2024-04-30 13:21:35 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief The main source file to launch the daemon.
      30             :  * @internal
      31             :  * This file includes the daemon's main() function and starts all daemon
      32             :  * subroutines. It deals with user input and waits on a signal to shut it down.
      33             :  * @endinternal
      34             :  */
      35             : 
      36             : #include <daemon/daemon.hpp>
      37             : #include <version.hpp>
      38             : #include <common/log_util.hpp>
      39             : #include <common/env_util.hpp>
      40             : #include <common/rpc/rpc_types.hpp>
      41             : #include <common/rpc/rpc_util.hpp>
      42             : #include <common/statistics/stats.hpp>
      43             : 
      44             : #include <daemon/env.hpp>
      45             : #include <daemon/handler/rpc_defs.hpp>
      46             : #include <daemon/ops/metadentry.hpp>
      47             : #include <daemon/backend/metadata/db.hpp>
      48             : #include <daemon/backend/data/chunk_storage.hpp>
      49             : #include <daemon/util.hpp>
      50             : #include <CLI/CLI.hpp>
      51             : 
      52             : #ifdef GKFS_ENABLE_AGIOS
      53             : #include <daemon/scheduler/agios.hpp>
      54             : #endif
      55             : 
      56             : 
      57             : #include <filesystem>
      58             : #include <iostream>
      59             : #include <fstream>
      60             : #include <csignal>
      61             : #include <condition_variable>
      62             : 
      63             : extern "C" {
      64             : #include <unistd.h>
      65             : #include <cstdlib>
      66             : }
      67             : 
      68             : using namespace std;
      69             : namespace fs = std::filesystem;
      70             : 
      71             : static condition_variable shutdown_please; // handler for shutdown signaling
      72             : static mutex mtx; // mutex to wait on shutdown conditional variable
      73             : static bool keep_rootdir = true;
      74             : 
      75             : struct cli_options {
      76             :     string mountdir;
      77             :     string rootdir;
      78             :     string rootdir_suffix;
      79             :     string metadir;
      80             :     string listen;
      81             :     string hosts_file;
      82             :     string rpc_protocol;
      83             :     string dbbackend;
      84             :     string parallax_size;
      85             :     string stats_file;
      86             :     string prometheus_gateway;
      87             : };
      88             : 
      89             : /**
      90             :  * @brief Initializes the Argobots execution streams for non-blocking I/O
      91             :  * @internal
      92             :  * The corresponding execution streams are defined in
      93             :  * gkfs::config::rpc::daemon_io_xstreams. A FIFO thread pool accomodates these
      94             :  * execution streams. Argobots tasklets are created from these pools during I/O
      95             :  * operations.
      96             :  * @endinternal
      97             :  */
      98             : void
      99          33 : init_io_tasklet_pool() {
     100          33 :     static_assert(gkfs::config::rpc::daemon_io_xstreams >= 0,
     101             :                   "Daemon IO Execution streams must be higher than 0!");
     102          33 :     unsigned int xstreams_num = gkfs::config::rpc::daemon_io_xstreams;
     103             : 
     104             :     // retrieve the pool of the just created scheduler
     105          33 :     ABT_pool pool;
     106          33 :     auto ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
     107             :                                      ABT_TRUE, &pool);
     108          33 :     if(ret != ABT_SUCCESS) {
     109           0 :         throw runtime_error("Failed to create I/O tasks pool");
     110             :     }
     111             : 
     112             :     // create all subsequent xstream and the associated scheduler, all tapping
     113             :     // into the same pool
     114          33 :     vector<ABT_xstream> xstreams(xstreams_num);
     115         297 :     for(unsigned int i = 0; i < xstreams_num; ++i) {
     116         528 :         ret = ABT_xstream_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool,
     117         264 :                                        ABT_SCHED_CONFIG_NULL, &xstreams[i]);
     118         264 :         if(ret != ABT_SUCCESS) {
     119           0 :             throw runtime_error(
     120           0 :                     "Failed to create task execution streams for I/O operations");
     121             :         }
     122             :     }
     123             : 
     124          33 :     RPC_DATA->io_streams(xstreams);
     125          33 :     RPC_DATA->io_pool(pool);
     126          33 : }
     127             : 
     128             : /**
     129             :  * @brief Registers RPC handlers to a given Margo instance.
     130             :  * @internal
     131             :  * Registering is done by associating a Margo instance id (mid) with the RPC
     132             :  * name and its handler function including defined input/out structs
     133             :  * @endinternal
     134             :  * @param margo_instance_id
     135             :  */
     136             : void
     137          33 : register_server_rpcs(margo_instance_id mid) {
     138          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::fs_config, void, rpc_config_out_t,
     139          33 :                    rpc_srv_get_fs_config);
     140          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::create, rpc_mk_node_in_t, rpc_err_out_t,
     141          33 :                    rpc_srv_create);
     142          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::stat, rpc_path_only_in_t,
     143          33 :                    rpc_stat_out_t, rpc_srv_stat);
     144          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t,
     145          33 :                    rpc_err_out_t, rpc_srv_decr_size);
     146          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t,
     147          33 :                    rpc_rm_metadata_out_t, rpc_srv_remove_metadata);
     148          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t,
     149          33 :                    rpc_err_out_t, rpc_srv_remove_data);
     150          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry,
     151             :                    rpc_update_metadentry_in_t, rpc_err_out_t,
     152          33 :                    rpc_srv_update_metadentry);
     153          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_metadentry_size, rpc_path_only_in_t,
     154          33 :                    rpc_get_metadentry_size_out_t, rpc_srv_get_metadentry_size);
     155          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size,
     156             :                    rpc_update_metadentry_size_in_t,
     157             :                    rpc_update_metadentry_size_out_t,
     158          33 :                    rpc_srv_update_metadentry_size);
     159          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents, rpc_get_dirents_in_t,
     160          33 :                    rpc_get_dirents_out_t, rpc_srv_get_dirents);
     161          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended,
     162             :                    rpc_get_dirents_in_t, rpc_get_dirents_out_t,
     163          33 :                    rpc_srv_get_dirents_extended);
     164             : #ifdef HAS_SYMLINKS
     165          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::mk_symlink, rpc_mk_symlink_in_t,
     166          33 :                    rpc_err_out_t, rpc_srv_mk_symlink);
     167             : #endif
     168          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t,
     169          33 :                    rpc_data_out_t, rpc_srv_write);
     170          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::read, rpc_read_data_in_t,
     171          33 :                    rpc_data_out_t, rpc_srv_read);
     172          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t,
     173          33 :                    rpc_srv_truncate);
     174          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
     175          33 :                    rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat);
     176          33 : }
     177             : 
     178             : /**
     179             :  * @brief Initializes the daemon RPC server.
     180             :  * @throws std::runtime_error on failure
     181             :  */
     182             : void
     183          33 : init_rpc_server() {
     184          33 :     hg_addr_t addr_self = nullptr;
     185          33 :     hg_size_t addr_self_cstring_sz = 128;
     186          33 :     char addr_self_cstring[128];
     187          33 :     struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
     188          33 :     hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE;
     189          33 :     hg_options.stats = HG_FALSE;
     190          33 :     if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol())
     191           0 :         hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
     192             :     // Start Margo (this will also initialize Argobots and Mercury internally)
     193          33 :     auto margo_config = fmt::format(
     194             :             R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})",
     195          33 :             gkfs::config::rpc::daemon_handler_xstreams);
     196          33 :     struct margo_init_info args = {nullptr};
     197          33 :     args.json_config = margo_config.c_str();
     198          33 :     args.hg_init_info = &hg_options;
     199          33 :     auto* mid = margo_init_ext(GKFS_DATA->bind_addr().c_str(),
     200             :                                MARGO_SERVER_MODE, &args);
     201             : 
     202          33 :     if(mid == MARGO_INSTANCE_NULL) {
     203           0 :         throw runtime_error("Failed to initialize the Margo RPC server");
     204             :     }
     205             :     // Figure out what address this server is listening on (must be freed when
     206             :     // finished)
     207          33 :     auto hret = margo_addr_self(mid, &addr_self);
     208          33 :     if(hret != HG_SUCCESS) {
     209           0 :         margo_finalize(mid);
     210           0 :         throw runtime_error("Failed to retrieve server RPC address");
     211             :     }
     212             :     // Convert the address to a cstring (with \0 terminator).
     213          33 :     hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz,
     214             :                                 addr_self);
     215          33 :     if(hret != HG_SUCCESS) {
     216           0 :         margo_addr_free(mid, addr_self);
     217           0 :         margo_finalize(mid);
     218           0 :         throw runtime_error("Failed to convert server RPC address to string");
     219             :     }
     220          33 :     margo_addr_free(mid, addr_self);
     221             : 
     222          66 :     std::string addr_self_str(addr_self_cstring);
     223          33 :     RPC_DATA->self_addr_str(addr_self_str);
     224             : 
     225          33 :     GKFS_DATA->spdlogger()->info("{}() Accepting RPCs on address {}", __func__,
     226          33 :                                  addr_self_cstring);
     227             : 
     228             :     // Put context and class into RPC_data object
     229          33 :     RPC_DATA->server_rpc_mid(mid);
     230             : 
     231             :     // register RPCs
     232          33 :     register_server_rpcs(mid);
     233          33 : }
     234             : 
     235             : /**
     236             :  * @brief Initializes the daemon environment and setting up its subroutines.
     237             :  * @internal
     238             :  * This includes connecting to the KV store, starting the Argobots I/O execution
     239             :  * streams, initializing the metadata and data backends, and starting the RPC
     240             :  * server.
     241             :  *
     242             :  * Finally, the root metadata entry is created.
     243             :  * @endinternal
     244             :  * @throws std::runtime_error if any step fails
     245             :  */
     246             : void
     247          33 : init_environment() {
     248             :     // Initialize metadata db
     249          33 :     auto metadata_path = fmt::format("{}/{}", GKFS_DATA->metadir(),
     250          33 :                                      gkfs::config::metadata::dir);
     251          33 :     fs::create_directories(metadata_path);
     252          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'",
     253          33 :                                   __func__, metadata_path);
     254          33 :     try {
     255          33 :         GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(
     256          66 :                 metadata_path, GKFS_DATA->dbbackend()));
     257           0 :     } catch(const std::exception& e) {
     258           0 :         GKFS_DATA->spdlogger()->error(
     259             :                 "{}() Failed to initialize metadata DB: {}", __func__,
     260           0 :                 e.what());
     261           0 :         throw;
     262             :     }
     263             : 
     264          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing Distributor ", __func__);
     265          33 :     try {
     266             : #ifdef GKFS_USE_GUIDED_DISTRIBUTION
     267          33 :         auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>();
     268             : #else
     269             :         auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>();
     270             : #endif
     271          99 :         RPC_DATA->distributor(distributor);
     272           0 :     } catch(const std::exception& e) {
     273           0 :         GKFS_DATA->spdlogger()->error(
     274             :                 "{}() Failed to initialize Distributor: {}", __func__,
     275           0 :                 e.what());
     276           0 :         throw;
     277             :     }
     278             : 
     279             : #ifdef GKFS_ENABLE_FORWARDING
     280           0 :     GKFS_DATA->spdlogger()->debug("{}() Enable I/O forwarding mode", __func__);
     281             : #endif
     282             : 
     283             : #ifdef GKFS_ENABLE_AGIOS
     284             :     // Initialize AGIOS scheduler
     285             :     GKFS_DATA->spdlogger()->debug("{}() Initializing AGIOS scheduler: '{}'",
     286             :                                   __func__, "/tmp/agios.conf");
     287             :     try {
     288             :         agios_initialize();
     289             :     } catch(const std::exception& e) {
     290             :         GKFS_DATA->spdlogger()->error(
     291             :                 "{}() Failed to initialize AGIOS scheduler: {}", __func__,
     292             :                 e.what());
     293             :         throw;
     294             :     }
     295             : #endif
     296             : 
     297             :     // Initialize Stats
     298          33 :     if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
     299          33 :         GKFS_DATA->stats(std::make_shared<gkfs::utils::Stats>(
     300          66 :                 GKFS_DATA->enable_chunkstats(), GKFS_DATA->enable_prometheus(),
     301          66 :                 GKFS_DATA->stats_file(), GKFS_DATA->prometheus_gateway()));
     302             : 
     303             :     // Initialize data backend
     304          33 :     auto chunk_storage_path = fmt::format("{}/{}", GKFS_DATA->rootdir(),
     305          66 :                                           gkfs::config::data::chunk_dir);
     306          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'",
     307          33 :                                   __func__, chunk_storage_path);
     308          33 :     fs::create_directories(chunk_storage_path);
     309          33 :     try {
     310          33 :         GKFS_DATA->storage(std::make_shared<gkfs::data::ChunkStorage>(
     311          33 :                 chunk_storage_path, gkfs::config::rpc::chunksize));
     312           0 :     } catch(const std::exception& e) {
     313           0 :         GKFS_DATA->spdlogger()->error(
     314             :                 "{}() Failed to initialize storage backend: {}", __func__,
     315           0 :                 e.what());
     316           0 :         throw;
     317             :     }
     318             : 
     319             :     // Init margo for RPC
     320          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'",
     321          33 :                                   __func__, GKFS_DATA->bind_addr());
     322          33 :     try {
     323          33 :         init_rpc_server();
     324           0 :     } catch(const std::exception& e) {
     325           0 :         GKFS_DATA->spdlogger()->error(
     326           0 :                 "{}() Failed to initialize RPC server: {}", __func__, e.what());
     327           0 :         throw;
     328             :     }
     329             : 
     330             :     // Init Argobots ESs to drive IO
     331          33 :     try {
     332          33 :         GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__);
     333          33 :         init_io_tasklet_pool();
     334           0 :     } catch(const std::exception& e) {
     335           0 :         GKFS_DATA->spdlogger()->error(
     336             :                 "{}() Failed to initialize Argobots pool for I/O: {}", __func__,
     337           0 :                 e.what());
     338           0 :         throw;
     339             :     }
     340             : 
     341             :     // TODO set metadata configurations. these have to go into a user
     342             :     // configurable file that is parsed here
     343          33 :     GKFS_DATA->atime_state(gkfs::config::metadata::use_atime);
     344          33 :     GKFS_DATA->mtime_state(gkfs::config::metadata::use_mtime);
     345          33 :     GKFS_DATA->ctime_state(gkfs::config::metadata::use_ctime);
     346          33 :     GKFS_DATA->link_cnt_state(gkfs::config::metadata::use_link_cnt);
     347          33 :     GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks);
     348             :     // Create metadentry for root directory
     349          66 :     gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO};
     350          33 :     try {
     351          66 :         gkfs::metadata::create("/", root_md);
     352           0 :     } catch(const gkfs::metadata::ExistsException& e) {
     353             :         // launched on existing directory which is no error
     354           0 :     } catch(const std::exception& e) {
     355           0 :         throw runtime_error("Failed to write root metadentry to KV store: "s +
     356           0 :                             e.what());
     357             :     }
     358             :     // setup hostfile to let clients know that a daemon is running on this host
     359          33 :     if(!GKFS_DATA->hosts_file().empty()) {
     360          33 :         gkfs::utils::populate_hosts_file();
     361             :     }
     362          66 :     GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
     363          33 : }
     364             : 
     365             : #ifdef GKFS_ENABLE_AGIOS
     366             : /**
     367             :  * @brief Initialize the AGIOS scheduling library
     368             :  */
     369             : void
     370             : agios_initialize() {
     371             :     char configuration[] = "/tmp/agios.conf";
     372             : 
     373             :     if(!agios_init(NULL, NULL, configuration, 0)) {
     374             :         GKFS_DATA->spdlogger()->error(
     375             :                 "{}() Failed to initialize AGIOS scheduler: '{}'", __func__,
     376             :                 configuration);
     377             : 
     378             :         agios_exit();
     379             : 
     380             :         throw;
     381             :     }
     382             : }
     383             : #endif
     384             : 
     385             : /**
     386             :  * @brief Destroys the daemon environment and gracefully shuts down all
     387             :  * subroutines.
     388             :  * @internal
     389             :  * Shutting down includes freeing Argobots execution streams, cleaning
     390             :  * hostsfile, and shutting down the Mercury RPC server.
     391             :  * @endinternal
     392             :  */
     393             : void
     394          33 : destroy_enviroment() {
     395          33 :     GKFS_DATA->spdlogger()->debug("{}() Removing mount directory", __func__);
     396          33 :     std::error_code ecode;
     397          33 :     fs::remove_all(GKFS_DATA->mountdir(), ecode);
     398          33 :     GKFS_DATA->spdlogger()->debug("{}() Freeing I/O executions streams",
     399          33 :                                   __func__);
     400         297 :     for(unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) {
     401         264 :         ABT_xstream_join(RPC_DATA->io_streams().at(i));
     402         264 :         ABT_xstream_free(&RPC_DATA->io_streams().at(i));
     403             :     }
     404             : 
     405          33 :     if(!GKFS_DATA->hosts_file().empty()) {
     406          33 :         GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__);
     407          33 :         try {
     408          33 :             gkfs::utils::destroy_hosts_file();
     409           0 :         } catch(const fs::filesystem_error& e) {
     410           0 :             GKFS_DATA->spdlogger()->debug("{}() hosts file not found",
     411           0 :                                           __func__);
     412             :         }
     413             :     }
     414             : 
     415          33 :     if(RPC_DATA->server_rpc_mid() != nullptr) {
     416          33 :         GKFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC server",
     417          33 :                                       __func__);
     418          33 :         margo_finalize(RPC_DATA->server_rpc_mid());
     419             :     }
     420             : 
     421          33 :     GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__);
     422          33 :     GKFS_DATA->close_mdb();
     423             : 
     424             : 
     425             :     // Delete rootdir/metadir if requested
     426          33 :     if(!keep_rootdir) {
     427           0 :         GKFS_DATA->spdlogger()->info("{}() Removing rootdir and metadir ...",
     428           0 :                                      __func__);
     429           0 :         fs::remove_all(GKFS_DATA->metadir(), ecode);
     430           0 :         fs::remove_all(GKFS_DATA->rootdir(), ecode);
     431             :     }
     432          33 :     GKFS_DATA->close_stats();
     433          33 : }
     434             : 
     435             : /**
     436             :  * @brief Handler for daemon shutdown signal handling.
     437             :  * @internal
     438             :  * Notifies the waiting thread in main() to wake up.
     439             :  * @endinternal
     440             :  * @param dummy unused but required by signal() called in main()
     441             :  */
     442             : void
     443          33 : shutdown_handler(int dummy) {
     444          33 :     GKFS_DATA->spdlogger()->info("{}() Received signal: '{}'", __func__,
     445          33 :                                  strsignal(dummy));
     446          33 :     shutdown_please.notify_all();
     447          33 : }
     448             : 
     449             : /**
     450             :  * @brief Initializes the daemon logging environment.
     451             :  * @internal
     452             :  * Reads user input via environment variables regarding the
     453             :  * log path and log level.
     454             :  * @endinternal
     455             :  * Initializes three loggers: main, metadata module, and data module
     456             :  */
     457             : void
     458          33 : initialize_loggers() {
     459          33 :     std::string path = gkfs::config::log::daemon_log_path;
     460             :     // Try to get log path from env variable
     461          66 :     std::string env_path_key = DAEMON_ENV_PREFIX;
     462          33 :     env_path_key += "LOG_PATH";
     463          33 :     char* env_path = getenv(env_path_key.c_str());
     464          33 :     if(env_path != nullptr) {
     465          33 :         path = env_path;
     466             :     }
     467             : 
     468          33 :     spdlog::level::level_enum level =
     469          33 :             gkfs::log::get_level(gkfs::config::log::daemon_log_level);
     470             :     // Try to get log path from env variable
     471          66 :     std::string env_level_key = DAEMON_ENV_PREFIX;
     472          33 :     env_level_key += "LOG_LEVEL";
     473          33 :     char* env_level = getenv(env_level_key.c_str());
     474          33 :     if(env_level != nullptr) {
     475          33 :         level = gkfs::log::get_level(env_level);
     476             :     }
     477             : 
     478          33 :     auto logger_names = std::vector<std::string>{
     479             :             "main",
     480             :             "MetadataModule",
     481             :             "DataModule",
     482         165 :     };
     483             : 
     484          33 :     gkfs::log::setup(logger_names, level, path);
     485          33 : }
     486             : 
     487             : /**
     488             :  * @brief Parses command line arguments from user
     489             :  *
     490             :  * @param opts CLI values
     491             :  * @param desc CLI allowed options
     492             :  * @throws std::runtime_error
     493             :  */
     494             : void
     495          33 : parse_input(const cli_options& opts, const CLI::App& desc) {
     496          33 :     auto rpc_protocol = string(gkfs::rpc::protocol::ofi_sockets);
     497          33 :     if(desc.count("--rpc-protocol")) {
     498           0 :         rpc_protocol = opts.rpc_protocol;
     499           0 :         if(rpc_protocol != gkfs::rpc::protocol::ofi_verbs &&
     500           0 :            rpc_protocol != gkfs::rpc::protocol::ofi_sockets &&
     501           0 :            rpc_protocol != gkfs::rpc::protocol::ofi_psm2) {
     502           0 :             throw runtime_error(fmt::format(
     503             :                     "Given RPC protocol '{}' not supported. Check --help for supported protocols.",
     504           0 :                     rpc_protocol));
     505             :         }
     506             :     }
     507             : 
     508          33 :     auto use_auto_sm = desc.count("--auto-sm") != 0;
     509          33 :     GKFS_DATA->use_auto_sm(use_auto_sm);
     510          33 :     GKFS_DATA->spdlogger()->debug(
     511             :             "{}() Shared memory (auto_sm) for intra-node communication (IPCs) set to '{}'.",
     512          33 :             __func__, use_auto_sm);
     513             : 
     514          66 :     string addr{};
     515          33 :     if(desc.count("--listen")) {
     516          33 :         addr = opts.listen;
     517             :         // ofi+verbs requires an empty addr to bind to the ib interface
     518          33 :         if(rpc_protocol == gkfs::rpc::protocol::ofi_verbs) {
     519             :             /*
     520             :              * FI_VERBS_IFACE : The prefix or the full name of the network
     521             :              * interface associated with the verbs device (default: ib) Mercury
     522             :              * does not allow to bind to an address when ofi+verbs is used
     523             :              */
     524           0 :             if(!secure_getenv("FI_VERBS_IFACE"))
     525           0 :                 setenv("FI_VERBS_IFACE", addr.c_str(), 1);
     526           0 :             addr = ""s;
     527             :         }
     528             :     } else {
     529           0 :         if(rpc_protocol != gkfs::rpc::protocol::ofi_verbs)
     530           0 :             addr = gkfs::rpc::get_my_hostname(true);
     531             :     }
     532             : 
     533          33 :     GKFS_DATA->rpc_protocol(rpc_protocol);
     534          66 :     GKFS_DATA->bind_addr(fmt::format("{}://{}", rpc_protocol, addr));
     535             : 
     536          66 :     string hosts_file;
     537          33 :     if(desc.count("--hosts-file")) {
     538           0 :         hosts_file = opts.hosts_file;
     539             :     } else {
     540          66 :         hosts_file = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
     541          66 :                                         gkfs::config::hostfile_path);
     542             :     }
     543          33 :     GKFS_DATA->hosts_file(hosts_file);
     544             : 
     545          66 :     assert(desc.count("--mountdir"));
     546          66 :     auto mountdir = opts.mountdir;
     547             :     // Create mountdir. We use this dir to get some information on the
     548             :     // underlying fs with statfs in gkfs_statfs
     549          33 :     fs::create_directories(mountdir);
     550          66 :     GKFS_DATA->mountdir(fs::canonical(mountdir).native());
     551             : 
     552          66 :     assert(desc.count("--rootdir"));
     553          66 :     auto rootdir = opts.rootdir;
     554             : 
     555             : #ifdef GKFS_ENABLE_FORWARDING
     556             :     // In forwarding mode, the backend is shared
     557           0 :     auto rootdir_path = fs::path(rootdir);
     558             : #else
     559          66 :     auto rootdir_path = fs::path(rootdir);
     560          33 :     if(desc.count("--rootdir-suffix")) {
     561           0 :         if(opts.rootdir_suffix == gkfs::config::data::chunk_dir ||
     562           0 :            opts.rootdir_suffix == gkfs::config::metadata::dir)
     563           0 :             throw runtime_error(fmt::format(
     564             :                     "rootdir_suffix '{}' is reserved and not allowed.",
     565           0 :                     opts.rootdir_suffix));
     566           0 :         if(opts.rootdir_suffix.find('#') != string::npos)
     567           0 :             throw runtime_error(fmt::format(
     568           0 :                     "The character '#' in the rootdir_suffix is reserved and not allowed."));
     569             :         // append path with a directory separator
     570           0 :         rootdir_path /= opts.rootdir_suffix;
     571           0 :         GKFS_DATA->rootdir_suffix(opts.rootdir_suffix);
     572             :     }
     573             : #endif
     574             : 
     575          33 :     if(desc.count("--clean-rootdir")) {
     576             :         // may throw exception (caught in main)
     577           0 :         GKFS_DATA->spdlogger()->debug("{}() Cleaning rootdir '{}' ...",
     578           0 :                                       __func__, rootdir_path.native());
     579           0 :         fs::remove_all(rootdir_path.native());
     580           0 :         GKFS_DATA->spdlogger()->info("{}() rootdir cleaned.", __func__);
     581             :     }
     582             : 
     583          33 :     if(desc.count("--clean-rootdir-finish")) {
     584           0 :         keep_rootdir = false;
     585             :     }
     586             : 
     587          33 :     GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'", __func__,
     588          33 :                                   rootdir_path.native());
     589          33 :     fs::create_directories(rootdir_path);
     590          33 :     GKFS_DATA->rootdir(rootdir_path.native());
     591             : 
     592          33 :     if(desc.count("--metadir")) {
     593          66 :         auto metadir = opts.metadir;
     594             : 
     595             : #ifdef GKFS_ENABLE_FORWARDING
     596           0 :         auto metadir_path = fs::path(metadir) / fmt::format_int(getpid()).str();
     597             : #else
     598          66 :         auto metadir_path = fs::path(metadir);
     599             : #endif
     600          33 :         if(desc.count("--clean-rootdir")) {
     601             :             // may throw exception (caught in main)
     602           0 :             GKFS_DATA->spdlogger()->debug("{}() Cleaning metadir '{}' ...",
     603           0 :                                           __func__, metadir_path.native());
     604           0 :             fs::remove_all(metadir_path.native());
     605           0 :             GKFS_DATA->spdlogger()->info("{}() metadir cleaned.", __func__);
     606             :         }
     607          33 :         fs::create_directories(metadir_path);
     608          33 :         GKFS_DATA->metadir(fs::canonical(metadir_path).native());
     609             : 
     610          66 :         GKFS_DATA->spdlogger()->debug("{}() Meta directory: '{}'", __func__,
     611          33 :                                       metadir_path.native());
     612             :     } else {
     613             :         // use rootdir as metadata dir
     614           0 :         auto metadir = opts.rootdir;
     615             : 
     616             : #ifdef GKFS_ENABLE_FORWARDING
     617           0 :         auto metadir_path = fs::path(metadir) / fmt::format_int(getpid()).str();
     618           0 :         fs::create_directories(metadir_path);
     619           0 :         GKFS_DATA->metadir(fs::canonical(metadir_path).native());
     620             : #else
     621           0 :         GKFS_DATA->metadir(GKFS_DATA->rootdir());
     622             : #endif
     623             :     }
     624             : 
     625          33 :     if(desc.count("--dbbackend")) {
     626          33 :         if(opts.dbbackend == gkfs::metadata::rocksdb_backend ||
     627           0 :            opts.dbbackend == gkfs::metadata::parallax_backend) {
     628             : #ifndef GKFS_ENABLE_PARALLAX
     629          33 :             if(opts.dbbackend == gkfs::metadata::parallax_backend) {
     630           0 :                 throw runtime_error(fmt::format(
     631             :                         "dbbackend '{}' was not compiled and is disabled. "
     632             :                         "Pass -DGKFS_ENABLE_PARALLAX:BOOL=ON to CMake to enable.",
     633           0 :                         opts.dbbackend));
     634             :             }
     635             : #endif
     636             : #ifndef GKFS_ENABLE_ROCKSDB
     637             :             if(opts.dbbackend == gkfs::metadata::rocksdb_backend) {
     638             :                 throw runtime_error(fmt::format(
     639             :                         "dbbackend '{}' was not compiled and is disabled. "
     640             :                         "Pass -DGKFS_ENABLE_ROCKSDB:BOOL=ON to CMake to enable.",
     641             :                         opts.dbbackend));
     642             :             }
     643             : #endif
     644          33 :             GKFS_DATA->dbbackend(opts.dbbackend);
     645             :         } else {
     646           0 :             throw runtime_error(
     647           0 :                     fmt::format("dbbackend '{}' is not valid. Consult `--help`",
     648           0 :                                 opts.dbbackend));
     649             :         }
     650             : 
     651             :     } else
     652           0 :         GKFS_DATA->dbbackend(gkfs::metadata::rocksdb_backend);
     653             : 
     654          33 :     if(desc.count("--parallaxsize")) { // Size in GB
     655           0 :         GKFS_DATA->parallax_size_md(stoi(opts.parallax_size));
     656             :     }
     657             : 
     658             :     /*
     659             :      * Statistics collection arguments
     660             :      */
     661          33 :     if(desc.count("--enable-collection")) {
     662          33 :         GKFS_DATA->enable_stats(true);
     663          66 :         GKFS_DATA->spdlogger()->info("{}() Statistic collection enabled",
     664          33 :                                      __func__);
     665             :     }
     666          33 :     if(desc.count("--enable-chunkstats")) {
     667          33 :         GKFS_DATA->enable_chunkstats(true);
     668          66 :         GKFS_DATA->spdlogger()->info("{}() Chunk statistic collection enabled",
     669          33 :                                      __func__);
     670             :     }
     671             : 
     672             : #ifdef GKFS_ENABLE_PROMETHEUS
     673          33 :     if(desc.count("--enable-prometheus")) {
     674           0 :         GKFS_DATA->enable_prometheus(true);
     675           0 :         if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
     676           0 :             GKFS_DATA->spdlogger()->info(
     677           0 :                     "{}() Statistics output to Prometheus enabled", __func__);
     678             :         else
     679           0 :             GKFS_DATA->spdlogger()->warn(
     680             :                     "{}() Prometheus statistic output enabled but no stat collection is enabled. There will be no output to Prometheus",
     681           0 :                     __func__);
     682             :     }
     683             : 
     684          33 :     if(desc.count("--prometheus-gateway")) {
     685           0 :         auto gateway = opts.prometheus_gateway;
     686           0 :         GKFS_DATA->prometheus_gateway(gateway);
     687           0 :         if(GKFS_DATA->enable_prometheus())
     688           0 :             GKFS_DATA->spdlogger()->info("{}() Prometheus gateway set to '{}'",
     689           0 :                                          __func__, gateway);
     690             :         else
     691           0 :             GKFS_DATA->spdlogger()->warn(
     692             :                     "{}() Prometheus gateway was set but Prometheus is disabled.");
     693             :     }
     694             : #endif
     695             : 
     696          33 :     if(desc.count("--output-stats")) {
     697          66 :         auto stats_file = opts.stats_file;
     698          33 :         GKFS_DATA->stats_file(stats_file);
     699          33 :         if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
     700          66 :             GKFS_DATA->spdlogger()->info(
     701             :                     "{}() Statistics are written to file '{}'", __func__,
     702          33 :                     stats_file);
     703             :         else
     704           0 :             GKFS_DATA->spdlogger()->warn(
     705             :                     "{}() --output-stats argument used but no stat collection is enabled. There will be no output to file '{}'",
     706           0 :                     __func__, stats_file);
     707             :     } else {
     708           0 :         GKFS_DATA->stats_file("");
     709           0 :         GKFS_DATA->spdlogger()->debug("{}() Statistics output disabled",
     710           0 :                                       __func__);
     711             :     }
     712          33 : }
     713             : 
     714             : /**
     715             :  * @brief The initial function called when launching the daemon.
     716             :  * @internal
     717             :  * Launches all subroutines and waits on a conditional variable to shut it down.
     718             :  * Daemon will react to the following signals:
     719             :  *
     720             :  * SIGINT - Interrupt from keyboard (ctrl-c)
     721             :  * SIGTERM - Termination signal (kill <daemon_pid>
     722             :  * SIGKILL - Kill signal (kill -9 <daemon_pid>
     723             :  * @endinternal
     724             :  * @param argc number of command line arguments
     725             :  * @param argv list of the command line arguments
     726             :  * @return exit status: EXIT_SUCCESS (0) or EXIT_FAILURE (1)
     727             :  */
     728             : int
     729          33 : main(int argc, const char* argv[]) {
     730          99 :     CLI::App desc{"Allowed options"};
     731          33 :     cli_options opts{};
     732             :     // clang-format off
     733          33 :     desc.add_option("--mountdir,-m", opts.mountdir,
     734          66 :                     "Virtual mounting directory where GekkoFS is available.")
     735          33 :                     ->required();
     736          66 :     desc.add_option(
     737             :                     "--rootdir,-r", opts.rootdir,
     738          66 :                     "Local data directory where GekkoFS data for this daemon is stored.")
     739          33 :                     ->required();
     740          66 :     desc.add_option(
     741             :                     "--rootdir-suffix,-s", opts.rootdir_suffix,
     742          66 :                     "Creates an additional directory within the rootdir, allowing multiple daemons on one node.");
     743          66 :     desc.add_option(
     744             :                     "--metadir,-i", opts.metadir,
     745          66 :                     "Metadata directory where GekkoFS RocksDB data directory is located. If not set, rootdir is used.");
     746          66 :     desc.add_option(
     747             :                     "--listen,-l", opts.listen,
     748             :                     "Address or interface to bind the daemon to. Default: local hostname.\n"
     749             :                     "When used with ofi+verbs the FI_VERBS_IFACE environment variable is set accordingly "
     750             :                     "which associates the verbs device with the network interface. In case FI_VERBS_IFACE "
     751          66 :                     "is already defined, the argument is ignored. Default 'ib'.");
     752          66 :     desc.add_option("--hosts-file,-H", opts.hosts_file,
     753             :                     "Shared file used by deamons to register their "
     754          66 :                     "endpoints. (default './gkfs_hosts.txt')");
     755          66 :     desc.add_option(
     756             :                     "--rpc-protocol,-P", opts.rpc_protocol,
     757             :                     "Used RPC protocol for inter-node communication.\n"
     758             :                     "Available: {ofi+sockets, ofi+verbs, ofi+psm2} for TCP, Infiniband, "
     759             :                     "and Omni-Path, respectively. (Default ofi+sockets)\n"
     760          66 :                     "Libfabric must have enabled support verbs or psm2.");
     761          33 :     desc.add_flag(
     762             :                 "--auto-sm",
     763             :                 "Enables intra-node communication (IPCs) via the `na+sm` (shared memory) protocol, "
     764          66 :                 "instead of using the RPC protocol. (Default off)");
     765          33 :     desc.add_flag(
     766             :                 "--clean-rootdir",
     767          66 :                 "Cleans Rootdir >before< launching the deamon");
     768          33 :     desc.add_flag(
     769             :                 "--clean-rootdir-finish,-c",
     770          66 :                 "Cleans Rootdir >after< the deamon finishes");
     771          66 :     desc.add_option(
     772             :                 "--dbbackend,-d", opts.dbbackend,
     773             :                 "Metadata database backend to use. Available: {rocksdb, parallaxdb}\n"
     774             :                 "RocksDB is default if not set. Parallax support is experimental.\n"
     775          66 :                 "Note, parallaxdb creates a file called rocksdbx with 8GB created in metadir.");
     776          66 :     desc.add_option("--parallaxsize", opts.parallax_size,
     777             :                     "parallaxdb - metadata file size in GB (default 8GB), "
     778          66 :                     "used only with new files");
     779          33 :     desc.add_flag(
     780             :                 "--enable-collection",
     781             :                 "Enables collection of general statistics. "
     782          66 :                 "Output requires either the --output-stats or --enable-prometheus argument.");
     783          33 :     desc.add_flag(
     784             :                 "--enable-chunkstats",
     785             :                 "Enables collection of data chunk statistics in I/O operations."
     786          66 :                 "Output requires either the --output-stats or --enable-prometheus argument.");
     787          66 :     desc.add_option(
     788             :                 "--output-stats", opts.stats_file,
     789          66 :                 "Creates a thread that outputs the server stats each 10s to the specified file.");
     790             :     #ifdef GKFS_ENABLE_PROMETHEUS
     791          33 :     desc.add_flag(
     792             :                 "--enable-prometheus",
     793          66 :                 "Enables prometheus output and a corresponding thread.");
     794             : 
     795          66 :     desc.add_option(
     796             :                 "--prometheus-gateway", opts.prometheus_gateway,
     797          66 :                 "Defines the prometheus gateway <ip:port> (Default 127.0.0.1:9091).");
     798             :     #endif
     799             : 
     800          66 :     desc.add_flag("--version", "Print version and exit.");
     801             :     // clang-format on
     802          33 :     try {
     803          33 :         desc.parse(argc, argv);
     804           0 :     } catch(const CLI::ParseError& e) {
     805           0 :         return desc.exit(e);
     806             :     }
     807             : 
     808             : 
     809          33 :     if(desc.count("--version")) {
     810           0 :         cout << GKFS_VERSION_STRING << endl;
     811             : #ifndef NDEBUG
     812           0 :         cout << "Debug: ON" << endl;
     813             : #else
     814             :         cout << "Debug: OFF" << endl;
     815             : #endif
     816             : #if GKFS_CREATE_CHECK_PARENTS
     817           0 :         cout << "Create check parents: ON" << endl;
     818             : #else
     819             :         cout << "Create check parents: OFF" << endl;
     820             : #endif
     821           0 :         cout << "Chunk size: " << gkfs::config::rpc::chunksize << " bytes"
     822          33 :              << endl;
     823             :         return EXIT_SUCCESS;
     824             :     }
     825             :     // intitialize logging framework
     826          33 :     initialize_loggers();
     827          66 :     GKFS_DATA->spdlogger(spdlog::get("main"));
     828             : 
     829             :     // parse all input parameters and populate singleton structures
     830          33 :     try {
     831          33 :         parse_input(opts, desc);
     832           0 :     } catch(const std::exception& e) {
     833           0 :         cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.",
     834           0 :                             e.what());
     835           0 :         return EXIT_FAILURE;
     836             :     }
     837             : 
     838             :     /*
     839             :      * Initialize environment and start daemon. Wait until signaled to cancel
     840             :      * before shutting down
     841             :      */
     842          33 :     try {
     843          33 :         GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__);
     844          33 :         init_environment();
     845           0 :     } catch(const std::exception& e) {
     846           0 :         auto emsg =
     847           0 :                 fmt::format("Failed to initialize environment: {}", e.what());
     848           0 :         GKFS_DATA->spdlogger()->error(emsg);
     849           0 :         cerr << emsg << endl;
     850           0 :         destroy_enviroment();
     851           0 :         return EXIT_FAILURE;
     852             :     }
     853             : 
     854          33 :     signal(SIGINT, shutdown_handler);
     855          33 :     signal(SIGTERM, shutdown_handler);
     856          33 :     signal(SIGKILL, shutdown_handler);
     857             : 
     858          66 :     unique_lock<mutex> lk(mtx);
     859             :     // Wait for shutdown signal to initiate shutdown protocols
     860          33 :     shutdown_please.wait(lk);
     861          33 :     GKFS_DATA->spdlogger()->info("{}() Shutting down...", __func__);
     862          33 :     destroy_enviroment();
     863          33 :     GKFS_DATA->spdlogger()->info("{}() Complete. Exiting...", __func__);
     864          33 :     return EXIT_SUCCESS;
     865             : }

Generated by: LCOV version 1.16