LCOV - code coverage report
Current view: top level - src/daemon - daemon.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 286 405 70.6 %
Date: 2024-04-23 00:09:24 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief The main source file to launch the daemon.
      30             :  * @internal
      31             :  * This file includes the daemon's main() function and starts all daemon
      32             :  * subroutines. It deals with user input and waits on a signal to shut it down.
      33             :  * @endinternal
      34             :  */
      35             : 
      36             : #include <daemon/daemon.hpp>
      37             : #include <version.hpp>
      38             : #include <common/log_util.hpp>
      39             : #include <common/env_util.hpp>
      40             : #include <common/rpc/rpc_types.hpp>
      41             : #include <common/rpc/rpc_util.hpp>
      42             : #include <common/statistics/stats.hpp>
      43             : 
      44             : #include <daemon/env.hpp>
      45             : #include <daemon/handler/rpc_defs.hpp>
      46             : #include <daemon/ops/metadentry.hpp>
      47             : #include <daemon/backend/metadata/db.hpp>
      48             : #include <daemon/backend/data/chunk_storage.hpp>
      49             : #include <daemon/util.hpp>
      50             : #include <CLI/CLI.hpp>
      51             : 
      52             : #ifdef GKFS_ENABLE_AGIOS
      53             : #include <daemon/scheduler/agios.hpp>
      54             : #endif
      55             : 
      56             : 
      57             : #include <filesystem>
      58             : #include <iostream>
      59             : #include <fstream>
      60             : #include <csignal>
      61             : #include <condition_variable>
      62             : 
      63             : extern "C" {
      64             : #include <unistd.h>
      65             : #include <cstdlib>
      66             : }
      67             : 
      68             : using namespace std;
      69             : namespace fs = std::filesystem;
      70             : 
      71             : static condition_variable shutdown_please; // handler for shutdown signaling
      72             : static mutex mtx; // mutex to wait on shutdown conditional variable
      73             : static bool keep_rootdir = true;
      74             : 
      75             : struct cli_options {
      76             :     string mountdir;
      77             :     string rootdir;
      78             :     string rootdir_suffix;
      79             :     string metadir;
      80             :     string listen;
      81             :     string hosts_file;
      82             :     string rpc_protocol;
      83             :     string dbbackend;
      84             :     string parallax_size;
      85             :     string stats_file;
      86             :     string prometheus_gateway;
      87             : };
      88             : 
      89             : /**
      90             :  * @brief Initializes the Argobots execution streams for non-blocking I/O
      91             :  * @internal
      92             :  * The corresponding execution streams are defined in
      93             :  * gkfs::config::rpc::daemon_io_xstreams. A FIFO thread pool accomodates these
      94             :  * execution streams. Argobots tasklets are created from these pools during I/O
      95             :  * operations.
      96             :  * @endinternal
      97             :  */
      98             : void
      99          33 : init_io_tasklet_pool() {
     100          33 :     static_assert(gkfs::config::rpc::daemon_io_xstreams >= 0,
     101             :                   "Daemon IO Execution streams must be higher than 0!");
     102          33 :     unsigned int xstreams_num = gkfs::config::rpc::daemon_io_xstreams;
     103             : 
     104             :     // retrieve the pool of the just created scheduler
     105          33 :     ABT_pool pool;
     106          33 :     auto ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
     107             :                                      ABT_TRUE, &pool);
     108          33 :     if(ret != ABT_SUCCESS) {
     109           0 :         throw runtime_error("Failed to create I/O tasks pool");
     110             :     }
     111             : 
     112             :     // create all subsequent xstream and the associated scheduler, all tapping
     113             :     // into the same pool
     114          33 :     vector<ABT_xstream> xstreams(xstreams_num);
     115         297 :     for(unsigned int i = 0; i < xstreams_num; ++i) {
     116         528 :         ret = ABT_xstream_create_basic(ABT_SCHED_BASIC_WAIT, 1, &pool,
     117         264 :                                        ABT_SCHED_CONFIG_NULL, &xstreams[i]);
     118         264 :         if(ret != ABT_SUCCESS) {
     119           0 :             throw runtime_error(
     120           0 :                     "Failed to create task execution streams for I/O operations");
     121             :         }
     122             :     }
     123             : 
     124          33 :     RPC_DATA->io_streams(xstreams);
     125          33 :     RPC_DATA->io_pool(pool);
     126          33 : }
     127             : 
     128             : /**
     129             :  * @brief Registers RPC handlers to a given Margo instance.
     130             :  * @internal
     131             :  * Registering is done by associating a Margo instance id (mid) with the RPC
     132             :  * name and its handler function including defined input/out structs
     133             :  * @endinternal
     134             :  * @param margo_instance_id
     135             :  */
     136             : void
     137          33 : register_server_rpcs(margo_instance_id mid) {
     138          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::fs_config, void, rpc_config_out_t,
     139          33 :                    rpc_srv_get_fs_config);
     140          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::create, rpc_mk_node_in_t, rpc_err_out_t,
     141          33 :                    rpc_srv_create);
     142          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::stat, rpc_path_only_in_t,
     143          33 :                    rpc_stat_out_t, rpc_srv_stat);
     144          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::decr_size, rpc_trunc_in_t,
     145          33 :                    rpc_err_out_t, rpc_srv_decr_size);
     146          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::remove_metadata, rpc_rm_node_in_t,
     147          33 :                    rpc_rm_metadata_out_t, rpc_srv_remove_metadata);
     148          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::remove_data, rpc_rm_node_in_t,
     149          33 :                    rpc_err_out_t, rpc_srv_remove_data);
     150          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry,
     151             :                    rpc_update_metadentry_in_t, rpc_err_out_t,
     152          33 :                    rpc_srv_update_metadentry);
     153          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_metadentry_size, rpc_path_only_in_t,
     154          33 :                    rpc_get_metadentry_size_out_t, rpc_srv_get_metadentry_size);
     155          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::update_metadentry_size,
     156             :                    rpc_update_metadentry_size_in_t,
     157             :                    rpc_update_metadentry_size_out_t,
     158          33 :                    rpc_srv_update_metadentry_size);
     159          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents, rpc_get_dirents_in_t,
     160          33 :                    rpc_get_dirents_out_t, rpc_srv_get_dirents);
     161          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_dirents_extended,
     162             :                    rpc_get_dirents_in_t, rpc_get_dirents_out_t,
     163          33 :                    rpc_srv_get_dirents_extended);
     164             : #ifdef HAS_SYMLINKS
     165          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::mk_symlink, rpc_mk_symlink_in_t,
     166          33 :                    rpc_err_out_t, rpc_srv_mk_symlink);
     167             : #endif
     168          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::write, rpc_write_data_in_t,
     169          33 :                    rpc_data_out_t, rpc_srv_write);
     170          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::read, rpc_read_data_in_t,
     171          33 :                    rpc_data_out_t, rpc_srv_read);
     172          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::truncate, rpc_trunc_in_t, rpc_err_out_t,
     173          33 :                    rpc_srv_truncate);
     174          33 :     MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
     175          33 :                    rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat);
     176          33 : }
     177             : 
     178             : /**
     179             :  * @brief Initializes the daemon RPC server.
     180             :  * @throws std::runtime_error on failure
     181             :  */
     182             : void
     183          33 : init_rpc_server() {
     184          33 :     hg_addr_t addr_self = nullptr;
     185          33 :     hg_size_t addr_self_cstring_sz = 128;
     186          33 :     char addr_self_cstring[128];
     187          33 :     struct hg_init_info hg_options = HG_INIT_INFO_INITIALIZER;
     188          33 :     hg_options.auto_sm = GKFS_DATA->use_auto_sm() ? HG_TRUE : HG_FALSE;
     189          33 :     hg_options.stats = HG_FALSE;
     190          33 :     if(gkfs::rpc::protocol::ofi_psm2 == GKFS_DATA->rpc_protocol())
     191           0 :         hg_options.na_init_info.progress_mode = NA_NO_BLOCK;
     192             :     // Start Margo (this will also initialize Argobots and Mercury internally)
     193          33 :     auto margo_config = fmt::format(
     194             :             R"({{ "use_progress_thread" : true, "rpc_thread_count" : {} }})",
     195          33 :             gkfs::config::rpc::daemon_handler_xstreams);
     196          33 :     struct margo_init_info args = {nullptr};
     197          33 :     args.json_config = margo_config.c_str();
     198          33 :     args.hg_init_info = &hg_options;
     199          33 :     auto* mid = margo_init_ext(GKFS_DATA->bind_addr().c_str(),
     200             :                                MARGO_SERVER_MODE, &args);
     201             : 
     202          33 :     if(mid == MARGO_INSTANCE_NULL) {
     203           0 :         throw runtime_error("Failed to initialize the Margo RPC server");
     204             :     }
     205             :     // Figure out what address this server is listening on (must be freed when
     206             :     // finished)
     207          33 :     auto hret = margo_addr_self(mid, &addr_self);
     208          33 :     if(hret != HG_SUCCESS) {
     209           0 :         margo_finalize(mid);
     210           0 :         throw runtime_error("Failed to retrieve server RPC address");
     211             :     }
     212             :     // Convert the address to a cstring (with \0 terminator).
     213          33 :     hret = margo_addr_to_string(mid, addr_self_cstring, &addr_self_cstring_sz,
     214             :                                 addr_self);
     215          33 :     if(hret != HG_SUCCESS) {
     216           0 :         margo_addr_free(mid, addr_self);
     217           0 :         margo_finalize(mid);
     218           0 :         throw runtime_error("Failed to convert server RPC address to string");
     219             :     }
     220          33 :     margo_addr_free(mid, addr_self);
     221             : 
     222          66 :     std::string addr_self_str(addr_self_cstring);
     223          33 :     RPC_DATA->self_addr_str(addr_self_str);
     224             : 
     225          33 :     GKFS_DATA->spdlogger()->info("{}() Accepting RPCs on address {}", __func__,
     226          33 :                                  addr_self_cstring);
     227             : 
     228             :     // Put context and class into RPC_data object
     229          33 :     RPC_DATA->server_rpc_mid(mid);
     230             : 
     231             :     // register RPCs
     232          33 :     register_server_rpcs(mid);
     233          33 : }
     234             : 
     235             : /**
     236             :  * @brief Initializes the daemon environment and setting up its subroutines.
     237             :  * @internal
     238             :  * This includes connecting to the KV store, starting the Argobots I/O execution
     239             :  * streams, initializing the metadata and data backends, and starting the RPC
     240             :  * server.
     241             :  *
     242             :  * Finally, the root metadata entry is created.
     243             :  * @endinternal
     244             :  * @throws std::runtime_error if any step fails
     245             :  */
     246             : void
     247          33 : init_environment() {
     248             :     // Initialize metadata db
     249          33 :     auto metadata_path = fmt::format("{}/{}", GKFS_DATA->metadir(),
     250          33 :                                      gkfs::config::metadata::dir);
     251          33 :     fs::create_directories(metadata_path);
     252          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing metadata DB: '{}'",
     253          33 :                                   __func__, metadata_path);
     254          33 :     try {
     255          33 :         GKFS_DATA->mdb(std::make_shared<gkfs::metadata::MetadataDB>(
     256          66 :                 metadata_path, GKFS_DATA->dbbackend()));
     257           0 :     } catch(const std::exception& e) {
     258           0 :         GKFS_DATA->spdlogger()->error(
     259             :                 "{}() Failed to initialize metadata DB: {}", __func__,
     260           0 :                 e.what());
     261           0 :         throw;
     262             :     }
     263             : 
     264             : #ifdef GKFS_ENABLE_AGIOS
     265             :     // Initialize AGIOS scheduler
     266             :     GKFS_DATA->spdlogger()->debug("{}() Initializing AGIOS scheduler: '{}'",
     267             :                                   __func__, "/tmp/agios.conf");
     268             :     try {
     269             :         agios_initialize();
     270             :     } catch(const std::exception& e) {
     271             :         GKFS_DATA->spdlogger()->error(
     272             :                 "{}() Failed to initialize AGIOS scheduler: {}", __func__,
     273             :                 e.what());
     274             :         throw;
     275             :     }
     276             : #endif
     277             : 
     278             :     // Initialize Stats
     279          33 :     if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
     280          33 :         GKFS_DATA->stats(std::make_shared<gkfs::utils::Stats>(
     281          66 :                 GKFS_DATA->enable_chunkstats(), GKFS_DATA->enable_prometheus(),
     282          66 :                 GKFS_DATA->stats_file(), GKFS_DATA->prometheus_gateway()));
     283             : 
     284             :     // Initialize data backend
     285          33 :     auto chunk_storage_path = fmt::format("{}/{}", GKFS_DATA->rootdir(),
     286          66 :                                           gkfs::config::data::chunk_dir);
     287          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing storage backend: '{}'",
     288          33 :                                   __func__, chunk_storage_path);
     289          33 :     fs::create_directories(chunk_storage_path);
     290          33 :     try {
     291          33 :         GKFS_DATA->storage(std::make_shared<gkfs::data::ChunkStorage>(
     292          33 :                 chunk_storage_path, gkfs::config::rpc::chunksize));
     293           0 :     } catch(const std::exception& e) {
     294           0 :         GKFS_DATA->spdlogger()->error(
     295             :                 "{}() Failed to initialize storage backend: {}", __func__,
     296           0 :                 e.what());
     297           0 :         throw;
     298             :     }
     299             : 
     300             :     // Init margo for RPC
     301          33 :     GKFS_DATA->spdlogger()->debug("{}() Initializing RPC server: '{}'",
     302          33 :                                   __func__, GKFS_DATA->bind_addr());
     303          33 :     try {
     304          33 :         init_rpc_server();
     305           0 :     } catch(const std::exception& e) {
     306           0 :         GKFS_DATA->spdlogger()->error(
     307           0 :                 "{}() Failed to initialize RPC server: {}", __func__, e.what());
     308           0 :         throw;
     309             :     }
     310             : 
     311             :     // Init Argobots ESs to drive IO
     312          33 :     try {
     313          33 :         GKFS_DATA->spdlogger()->debug("{}() Initializing I/O pool", __func__);
     314          33 :         init_io_tasklet_pool();
     315           0 :     } catch(const std::exception& e) {
     316           0 :         GKFS_DATA->spdlogger()->error(
     317             :                 "{}() Failed to initialize Argobots pool for I/O: {}", __func__,
     318           0 :                 e.what());
     319           0 :         throw;
     320             :     }
     321             : 
     322             :     // TODO set metadata configurations. these have to go into a user
     323             :     // configurable file that is parsed here
     324          33 :     GKFS_DATA->atime_state(gkfs::config::metadata::use_atime);
     325          33 :     GKFS_DATA->mtime_state(gkfs::config::metadata::use_mtime);
     326          33 :     GKFS_DATA->ctime_state(gkfs::config::metadata::use_ctime);
     327          33 :     GKFS_DATA->link_cnt_state(gkfs::config::metadata::use_link_cnt);
     328          33 :     GKFS_DATA->blocks_state(gkfs::config::metadata::use_blocks);
     329             :     // Create metadentry for root directory
     330          66 :     gkfs::metadata::Metadata root_md{S_IFDIR | S_IRWXU | S_IRWXG | S_IRWXO};
     331          33 :     try {
     332          66 :         gkfs::metadata::create("/", root_md);
     333           0 :     } catch(const gkfs::metadata::ExistsException& e) {
     334             :         // launched on existing directory which is no error
     335           0 :     } catch(const std::exception& e) {
     336           0 :         throw runtime_error("Failed to write root metadentry to KV store: "s +
     337           0 :                             e.what());
     338             :     }
     339             :     // setup hostfile to let clients know that a daemon is running on this host
     340          33 :     if(!GKFS_DATA->hosts_file().empty()) {
     341          33 :         gkfs::utils::populate_hosts_file();
     342             :     }
     343          66 :     GKFS_DATA->spdlogger()->info("Startup successful. Daemon is ready.");
     344          33 : }
     345             : 
     346             : #ifdef GKFS_ENABLE_AGIOS
     347             : /**
     348             :  * @brief Initialize the AGIOS scheduling library
     349             :  */
     350             : void
     351             : agios_initialize() {
     352             :     char configuration[] = "/tmp/agios.conf";
     353             : 
     354             :     if(!agios_init(NULL, NULL, configuration, 0)) {
     355             :         GKFS_DATA->spdlogger()->error(
     356             :                 "{}() Failed to initialize AGIOS scheduler: '{}'", __func__,
     357             :                 configuration);
     358             : 
     359             :         agios_exit();
     360             : 
     361             :         throw;
     362             :     }
     363             : }
     364             : #endif
     365             : 
     366             : /**
     367             :  * @brief Destroys the daemon environment and gracefully shuts down all
     368             :  * subroutines.
     369             :  * @internal
     370             :  * Shutting down includes freeing Argobots execution streams, cleaning
     371             :  * hostsfile, and shutting down the Mercury RPC server.
     372             :  * @endinternal
     373             :  */
     374             : void
     375          33 : destroy_enviroment() {
     376          33 :     GKFS_DATA->spdlogger()->debug("{}() Removing mount directory", __func__);
     377          33 :     std::error_code ecode;
     378          33 :     fs::remove_all(GKFS_DATA->mountdir(), ecode);
     379          33 :     GKFS_DATA->spdlogger()->debug("{}() Freeing I/O executions streams",
     380          33 :                                   __func__);
     381         297 :     for(unsigned int i = 0; i < RPC_DATA->io_streams().size(); i++) {
     382         264 :         ABT_xstream_join(RPC_DATA->io_streams().at(i));
     383         264 :         ABT_xstream_free(&RPC_DATA->io_streams().at(i));
     384             :     }
     385             : 
     386          33 :     if(!GKFS_DATA->hosts_file().empty()) {
     387          33 :         GKFS_DATA->spdlogger()->debug("{}() Removing hosts file", __func__);
     388          33 :         try {
     389          33 :             gkfs::utils::destroy_hosts_file();
     390           0 :         } catch(const fs::filesystem_error& e) {
     391           0 :             GKFS_DATA->spdlogger()->debug("{}() hosts file not found",
     392           0 :                                           __func__);
     393             :         }
     394             :     }
     395             : 
     396          33 :     if(RPC_DATA->server_rpc_mid() != nullptr) {
     397          33 :         GKFS_DATA->spdlogger()->debug("{}() Finalizing margo RPC server",
     398          33 :                                       __func__);
     399          33 :         margo_finalize(RPC_DATA->server_rpc_mid());
     400             :     }
     401             : 
     402          33 :     GKFS_DATA->spdlogger()->info("{}() Closing metadata DB", __func__);
     403          33 :     GKFS_DATA->close_mdb();
     404             : 
     405             : 
     406             :     // Delete rootdir/metadir if requested
     407          33 :     if(!keep_rootdir) {
     408           0 :         GKFS_DATA->spdlogger()->info("{}() Removing rootdir and metadir ...",
     409           0 :                                      __func__);
     410           0 :         fs::remove_all(GKFS_DATA->metadir(), ecode);
     411           0 :         fs::remove_all(GKFS_DATA->rootdir(), ecode);
     412             :     }
     413          33 :     GKFS_DATA->close_stats();
     414          33 : }
     415             : 
     416             : /**
     417             :  * @brief Handler for daemon shutdown signal handling.
     418             :  * @internal
     419             :  * Notifies the waiting thread in main() to wake up.
     420             :  * @endinternal
     421             :  * @param dummy unused but required by signal() called in main()
     422             :  */
     423             : void
     424          33 : shutdown_handler(int dummy) {
     425          33 :     GKFS_DATA->spdlogger()->info("{}() Received signal: '{}'", __func__,
     426          33 :                                  strsignal(dummy));
     427          33 :     shutdown_please.notify_all();
     428          33 : }
     429             : 
     430             : /**
     431             :  * @brief Initializes the daemon logging environment.
     432             :  * @internal
     433             :  * Reads user input via environment variables regarding the
     434             :  * log path and log level.
     435             :  * @endinternal
     436             :  * Initializes three loggers: main, metadata module, and data module
     437             :  */
     438             : void
     439          33 : initialize_loggers() {
     440          33 :     std::string path = gkfs::config::log::daemon_log_path;
     441             :     // Try to get log path from env variable
     442          66 :     std::string env_path_key = DAEMON_ENV_PREFIX;
     443          33 :     env_path_key += "LOG_PATH";
     444          33 :     char* env_path = getenv(env_path_key.c_str());
     445          33 :     if(env_path != nullptr) {
     446          33 :         path = env_path;
     447             :     }
     448             : 
     449          33 :     spdlog::level::level_enum level =
     450          33 :             gkfs::log::get_level(gkfs::config::log::daemon_log_level);
     451             :     // Try to get log path from env variable
     452          66 :     std::string env_level_key = DAEMON_ENV_PREFIX;
     453          33 :     env_level_key += "LOG_LEVEL";
     454          33 :     char* env_level = getenv(env_level_key.c_str());
     455          33 :     if(env_level != nullptr) {
     456          33 :         level = gkfs::log::get_level(env_level);
     457             :     }
     458             : 
     459          33 :     auto logger_names = std::vector<std::string>{
     460             :             "main",
     461             :             "MetadataModule",
     462             :             "DataModule",
     463         165 :     };
     464             : 
     465          33 :     gkfs::log::setup(logger_names, level, path);
     466          33 : }
     467             : 
     468             : /**
     469             :  * @brief Parses command line arguments from user
     470             :  *
     471             :  * @param opts CLI values
     472             :  * @param desc CLI allowed options
     473             :  * @throws std::runtime_error
     474             :  */
     475             : void
     476          33 : parse_input(const cli_options& opts, const CLI::App& desc) {
     477          33 :     auto rpc_protocol = string(gkfs::rpc::protocol::ofi_sockets);
     478          33 :     if(desc.count("--rpc-protocol")) {
     479           0 :         rpc_protocol = opts.rpc_protocol;
     480           0 :         if(rpc_protocol != gkfs::rpc::protocol::ofi_verbs &&
     481           0 :            rpc_protocol != gkfs::rpc::protocol::ofi_sockets &&
     482           0 :            rpc_protocol != gkfs::rpc::protocol::ofi_psm2) {
     483           0 :             throw runtime_error(fmt::format(
     484             :                     "Given RPC protocol '{}' not supported. Check --help for supported protocols.",
     485           0 :                     rpc_protocol));
     486             :         }
     487             :     }
     488             : 
     489          33 :     auto use_auto_sm = desc.count("--auto-sm") != 0;
     490          33 :     GKFS_DATA->use_auto_sm(use_auto_sm);
     491          33 :     GKFS_DATA->spdlogger()->debug(
     492             :             "{}() Shared memory (auto_sm) for intra-node communication (IPCs) set to '{}'.",
     493          33 :             __func__, use_auto_sm);
     494             : 
     495          66 :     string addr{};
     496          33 :     if(desc.count("--listen")) {
     497          33 :         addr = opts.listen;
     498             :         // ofi+verbs requires an empty addr to bind to the ib interface
     499          33 :         if(rpc_protocol == gkfs::rpc::protocol::ofi_verbs) {
     500             :             /*
     501             :              * FI_VERBS_IFACE : The prefix or the full name of the network
     502             :              * interface associated with the verbs device (default: ib) Mercury
     503             :              * does not allow to bind to an address when ofi+verbs is used
     504             :              */
     505           0 :             if(!secure_getenv("FI_VERBS_IFACE"))
     506           0 :                 setenv("FI_VERBS_IFACE", addr.c_str(), 1);
     507           0 :             addr = ""s;
     508             :         }
     509             :     } else {
     510           0 :         if(rpc_protocol != gkfs::rpc::protocol::ofi_verbs)
     511           0 :             addr = gkfs::rpc::get_my_hostname(true);
     512             :     }
     513             : 
     514          33 :     GKFS_DATA->rpc_protocol(rpc_protocol);
     515          66 :     GKFS_DATA->bind_addr(fmt::format("{}://{}", rpc_protocol, addr));
     516             : 
     517          66 :     string hosts_file;
     518          33 :     if(desc.count("--hosts-file")) {
     519           0 :         hosts_file = opts.hosts_file;
     520             :     } else {
     521          66 :         hosts_file = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
     522          66 :                                         gkfs::config::hostfile_path);
     523             :     }
     524          33 :     GKFS_DATA->hosts_file(hosts_file);
     525             : 
     526          66 :     assert(desc.count("--mountdir"));
     527          66 :     auto mountdir = opts.mountdir;
     528             :     // Create mountdir. We use this dir to get some information on the
     529             :     // underlying fs with statfs in gkfs_statfs
     530          33 :     fs::create_directories(mountdir);
     531          66 :     GKFS_DATA->mountdir(fs::canonical(mountdir).native());
     532             : 
     533          66 :     assert(desc.count("--rootdir"));
     534          66 :     auto rootdir = opts.rootdir;
     535             : 
     536          66 :     auto rootdir_path = fs::path(rootdir);
     537          33 :     if(desc.count("--rootdir-suffix")) {
     538           0 :         if(opts.rootdir_suffix == gkfs::config::data::chunk_dir ||
     539           0 :            opts.rootdir_suffix == gkfs::config::metadata::dir)
     540           0 :             throw runtime_error(fmt::format(
     541             :                     "rootdir_suffix '{}' is reserved and not allowed.",
     542           0 :                     opts.rootdir_suffix));
     543           0 :         if(opts.rootdir_suffix.find('#') != string::npos)
     544           0 :             throw runtime_error(fmt::format(
     545           0 :                     "The character '#' in the rootdir_suffix is reserved and not allowed."));
     546             :         // append path with a directory separator
     547           0 :         rootdir_path /= opts.rootdir_suffix;
     548           0 :         GKFS_DATA->rootdir_suffix(opts.rootdir_suffix);
     549             :     }
     550             : 
     551          33 :     if(desc.count("--clean-rootdir")) {
     552             :         // may throw exception (caught in main)
     553           0 :         GKFS_DATA->spdlogger()->debug("{}() Cleaning rootdir '{}' ...",
     554           0 :                                       __func__, rootdir_path.native());
     555           0 :         fs::remove_all(rootdir_path.native());
     556           0 :         GKFS_DATA->spdlogger()->info("{}() rootdir cleaned.", __func__);
     557             :     }
     558             : 
     559          33 :     if(desc.count("--clean-rootdir-finish")) {
     560           0 :         keep_rootdir = false;
     561             :     }
     562             : 
     563          33 :     GKFS_DATA->spdlogger()->debug("{}() Root directory: '{}'", __func__,
     564          33 :                                   rootdir_path.native());
     565          33 :     fs::create_directories(rootdir_path);
     566          33 :     GKFS_DATA->rootdir(rootdir_path.native());
     567             : 
     568          33 :     if(desc.count("--enable-forwarding")) {
     569           0 :         GKFS_DATA->enable_forwarding(true);
     570           0 :         GKFS_DATA->spdlogger()->info("{}() Forwarding mode enabled", __func__);
     571             :     }
     572             : 
     573          33 :     if(desc.count("--metadir")) {
     574          66 :         auto metadir = opts.metadir;
     575             : 
     576             : 
     577          66 :         auto metadir_path = fs::path(metadir);
     578          33 :         if(GKFS_DATA->enable_forwarding()) {
     579             :             // As we store normally he metadata to the pfs, we need to put each
     580             :             // daemon in a separate directory.
     581           0 :             metadir_path = fs::path(metadir) / fmt::format_int(getpid()).str();
     582             :         }
     583             : 
     584             : 
     585          33 :         if(desc.count("--clean-rootdir")) {
     586             :             // may throw exception (caught in main)
     587           0 :             GKFS_DATA->spdlogger()->debug("{}() Cleaning metadir '{}' ...",
     588           0 :                                           __func__, metadir_path.native());
     589           0 :             fs::remove_all(metadir_path.native());
     590           0 :             GKFS_DATA->spdlogger()->info("{}() metadir cleaned.", __func__);
     591             :         }
     592          33 :         fs::create_directories(metadir_path);
     593          33 :         GKFS_DATA->metadir(fs::canonical(metadir_path).native());
     594             : 
     595          66 :         GKFS_DATA->spdlogger()->debug("{}() Meta directory: '{}'", __func__,
     596          33 :                                       metadir_path.native());
     597             :     } else {
     598             :         // use rootdir as metadata dir
     599           0 :         auto metadir = opts.rootdir;
     600             : 
     601             : 
     602           0 :         if(GKFS_DATA->enable_forwarding()) {
     603             :             // As we store normally he metadata to the pfs, we need to put each
     604             :             // daemon in a separate directory.
     605           0 :             auto metadir_path =
     606           0 :                     fs::path(metadir) / fmt::format_int(getpid()).str();
     607           0 :             fs::create_directories(metadir_path);
     608           0 :             GKFS_DATA->metadir(fs::canonical(metadir_path).native());
     609             :         } else
     610           0 :             GKFS_DATA->metadir(GKFS_DATA->rootdir());
     611             :     }
     612             : 
     613          33 :     if(desc.count("--dbbackend")) {
     614          33 :         if(opts.dbbackend == gkfs::metadata::rocksdb_backend ||
     615           0 :            opts.dbbackend == gkfs::metadata::parallax_backend) {
     616             : #ifndef GKFS_ENABLE_PARALLAX
     617          33 :             if(opts.dbbackend == gkfs::metadata::parallax_backend) {
     618           0 :                 throw runtime_error(fmt::format(
     619             :                         "dbbackend '{}' was not compiled and is disabled. "
     620             :                         "Pass -DGKFS_ENABLE_PARALLAX:BOOL=ON to CMake to enable.",
     621           0 :                         opts.dbbackend));
     622             :             }
     623             : #endif
     624             : #ifndef GKFS_ENABLE_ROCKSDB
     625             :             if(opts.dbbackend == gkfs::metadata::rocksdb_backend) {
     626             :                 throw runtime_error(fmt::format(
     627             :                         "dbbackend '{}' was not compiled and is disabled. "
     628             :                         "Pass -DGKFS_ENABLE_ROCKSDB:BOOL=ON to CMake to enable.",
     629             :                         opts.dbbackend));
     630             :             }
     631             : #endif
     632          33 :             GKFS_DATA->dbbackend(opts.dbbackend);
     633             :         } else {
     634           0 :             throw runtime_error(
     635           0 :                     fmt::format("dbbackend '{}' is not valid. Consult `--help`",
     636           0 :                                 opts.dbbackend));
     637             :         }
     638             : 
     639             :     } else
     640           0 :         GKFS_DATA->dbbackend(gkfs::metadata::rocksdb_backend);
     641             : 
     642          33 :     if(desc.count("--parallaxsize")) { // Size in GB
     643           0 :         GKFS_DATA->parallax_size_md(stoi(opts.parallax_size));
     644             :     }
     645             : 
     646             :     /*
     647             :      * Statistics collection arguments
     648             :      */
     649          33 :     if(desc.count("--enable-collection")) {
     650          33 :         GKFS_DATA->enable_stats(true);
     651          66 :         GKFS_DATA->spdlogger()->info("{}() Statistic collection enabled",
     652          33 :                                      __func__);
     653             :     }
     654          33 :     if(desc.count("--enable-chunkstats")) {
     655          33 :         GKFS_DATA->enable_chunkstats(true);
     656          66 :         GKFS_DATA->spdlogger()->info("{}() Chunk statistic collection enabled",
     657          33 :                                      __func__);
     658             :     }
     659             : 
     660             : #ifdef GKFS_ENABLE_PROMETHEUS
     661          33 :     if(desc.count("--enable-prometheus")) {
     662           0 :         GKFS_DATA->enable_prometheus(true);
     663           0 :         if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
     664           0 :             GKFS_DATA->spdlogger()->info(
     665           0 :                     "{}() Statistics output to Prometheus enabled", __func__);
     666             :         else
     667           0 :             GKFS_DATA->spdlogger()->warn(
     668             :                     "{}() Prometheus statistic output enabled but no stat collection is enabled. There will be no output to Prometheus",
     669           0 :                     __func__);
     670             :     }
     671             : 
     672          33 :     if(desc.count("--prometheus-gateway")) {
     673           0 :         auto gateway = opts.prometheus_gateway;
     674           0 :         GKFS_DATA->prometheus_gateway(gateway);
     675           0 :         if(GKFS_DATA->enable_prometheus())
     676           0 :             GKFS_DATA->spdlogger()->info("{}() Prometheus gateway set to '{}'",
     677           0 :                                          __func__, gateway);
     678             :         else
     679           0 :             GKFS_DATA->spdlogger()->warn(
     680             :                     "{}() Prometheus gateway was set but Prometheus is disabled.");
     681             :     }
     682             : #endif
     683             : 
     684          33 :     if(desc.count("--output-stats")) {
     685          66 :         auto stats_file = opts.stats_file;
     686          33 :         GKFS_DATA->stats_file(stats_file);
     687          33 :         if(GKFS_DATA->enable_stats() || GKFS_DATA->enable_chunkstats())
     688          66 :             GKFS_DATA->spdlogger()->info(
     689             :                     "{}() Statistics are written to file '{}'", __func__,
     690          33 :                     stats_file);
     691             :         else
     692           0 :             GKFS_DATA->spdlogger()->warn(
     693             :                     "{}() --output-stats argument used but no stat collection is enabled. There will be no output to file '{}'",
     694           0 :                     __func__, stats_file);
     695             :     } else {
     696           0 :         GKFS_DATA->stats_file("");
     697           0 :         GKFS_DATA->spdlogger()->debug("{}() Statistics output disabled",
     698           0 :                                       __func__);
     699             :     }
     700          33 : }
     701             : 
     702             : /**
     703             :  * @brief The initial function called when launching the daemon.
     704             :  * @internal
     705             :  * Launches all subroutines and waits on a conditional variable to shut it down.
     706             :  * Daemon will react to the following signals:
     707             :  *
     708             :  * SIGINT - Interrupt from keyboard (ctrl-c)
     709             :  * SIGTERM - Termination signal (kill <daemon_pid>
     710             :  * SIGKILL - Kill signal (kill -9 <daemon_pid>
     711             :  * @endinternal
     712             :  * @param argc number of command line arguments
     713             :  * @param argv list of the command line arguments
     714             :  * @return exit status: EXIT_SUCCESS (0) or EXIT_FAILURE (1)
     715             :  */
     716             : int
     717          33 : main(int argc, const char* argv[]) {
     718          99 :     CLI::App desc{"Allowed options"};
     719          33 :     cli_options opts{};
     720             :     // clang-format off
     721          33 :     desc.add_option("--mountdir,-m", opts.mountdir,
     722          66 :                     "Virtual mounting directory where GekkoFS is available.")
     723          33 :                     ->required();
     724          66 :     desc.add_option(
     725             :                     "--rootdir,-r", opts.rootdir,
     726          66 :                     "Local data directory where GekkoFS data for this daemon is stored.")
     727          33 :                     ->required();
     728          66 :     desc.add_option(
     729             :                     "--rootdir-suffix,-s", opts.rootdir_suffix,
     730          66 :                     "Creates an additional directory within the rootdir, allowing multiple daemons on one node.");
     731          66 :     desc.add_option(
     732             :                     "--metadir,-i", opts.metadir,
     733          66 :                     "Metadata directory where GekkoFS RocksDB data directory is located. If not set, rootdir is used.");
     734          66 :     desc.add_option(
     735             :                     "--listen,-l", opts.listen,
     736             :                     "Address or interface to bind the daemon to. Default: local hostname.\n"
     737             :                     "When used with ofi+verbs the FI_VERBS_IFACE environment variable is set accordingly "
     738             :                     "which associates the verbs device with the network interface. In case FI_VERBS_IFACE "
     739          66 :                     "is already defined, the argument is ignored. Default 'ib'.");
     740          66 :     desc.add_option("--hosts-file,-H", opts.hosts_file,
     741             :                     "Shared file used by deamons to register their "
     742          66 :                     "endpoints. (default './gkfs_hosts.txt')");
     743          66 :     desc.add_option(
     744             :                     "--rpc-protocol,-P", opts.rpc_protocol,
     745             :                     "Used RPC protocol for inter-node communication.\n"
     746             :                     "Available: {ofi+sockets, ofi+verbs, ofi+psm2} for TCP, Infiniband, "
     747             :                     "and Omni-Path, respectively. (Default ofi+sockets)\n"
     748          66 :                     "Libfabric must have enabled support verbs or psm2.");
     749          33 :     desc.add_flag(
     750             :                 "--auto-sm",
     751             :                 "Enables intra-node communication (IPCs) via the `na+sm` (shared memory) protocol, "
     752          66 :                 "instead of using the RPC protocol. (Default off)");
     753          33 :     desc.add_flag(
     754             :                 "--clean-rootdir",
     755          66 :                 "Cleans Rootdir >before< launching the deamon");
     756          33 :     desc.add_flag(
     757             :                 "--clean-rootdir-finish,-c",
     758          66 :                 "Cleans Rootdir >after< the deamon finishes");
     759          66 :     desc.add_option(
     760             :                 "--dbbackend,-d", opts.dbbackend,
     761             :                 "Metadata database backend to use. Available: {rocksdb, parallaxdb}\n"
     762             :                 "RocksDB is default if not set. Parallax support is experimental.\n"
     763          66 :                 "Note, parallaxdb creates a file called rocksdbx with 8GB created in metadir.");
     764          66 :     desc.add_option("--parallaxsize", opts.parallax_size,
     765             :                     "parallaxdb - metadata file size in GB (default 8GB), "
     766          66 :                     "used only with new files");
     767          33 :     desc.add_flag(
     768             :                 "--enable-collection",
     769             :                 "Enables collection of general statistics. "
     770          66 :                 "Output requires either the --output-stats or --enable-prometheus argument.");
     771          33 :     desc.add_flag(
     772             :                 "--enable-chunkstats",
     773             :                 "Enables collection of data chunk statistics in I/O operations."
     774          66 :                 "Output requires either the --output-stats or --enable-prometheus argument.");
     775          66 :     desc.add_option(
     776             :                 "--output-stats", opts.stats_file,
     777          66 :                 "Creates a thread that outputs the server stats each 10s to the specified file.");
     778          33 :     desc.add_flag(
     779             :                 "--enable-forwarding",
     780          66 :                 "Enables forwarding mode, so the metadata is stored in a separate directory (pid).");
     781             :     #ifdef GKFS_ENABLE_PROMETHEUS
     782          33 :     desc.add_flag(
     783             :                 "--enable-prometheus",
     784          66 :                 "Enables prometheus output and a corresponding thread.");
     785             : 
     786          66 :     desc.add_option(
     787             :                 "--prometheus-gateway", opts.prometheus_gateway,
     788          66 :                 "Defines the prometheus gateway <ip:port> (Default 127.0.0.1:9091).");
     789             :     #endif
     790             : 
     791          66 :     desc.add_flag("--version", "Print version and exit.");
     792             :     // clang-format on
     793          33 :     try {
     794          33 :         desc.parse(argc, argv);
     795           0 :     } catch(const CLI::ParseError& e) {
     796           0 :         return desc.exit(e);
     797             :     }
     798             : 
     799             : 
     800          33 :     if(desc.count("--version")) {
     801           0 :         cout << GKFS_VERSION_STRING << endl;
     802             : #ifndef NDEBUG
     803           0 :         cout << "Debug: ON" << endl;
     804             : #else
     805             :         cout << "Debug: OFF" << endl;
     806             : #endif
     807             : #if GKFS_CREATE_CHECK_PARENTS
     808           0 :         cout << "Create check parents: ON" << endl;
     809             : #else
     810             :         cout << "Create check parents: OFF" << endl;
     811             : #endif
     812           0 :         cout << "Chunk size: " << gkfs::config::rpc::chunksize << " bytes"
     813          33 :              << endl;
     814             :         return EXIT_SUCCESS;
     815             :     }
     816             :     // intitialize logging framework
     817          33 :     initialize_loggers();
     818          66 :     GKFS_DATA->spdlogger(spdlog::get("main"));
     819             : 
     820             :     // parse all input parameters and populate singleton structures
     821          33 :     try {
     822          33 :         parse_input(opts, desc);
     823           0 :     } catch(const std::exception& e) {
     824           0 :         cerr << fmt::format("Parsing arguments failed: '{}'. Exiting.",
     825           0 :                             e.what());
     826           0 :         return EXIT_FAILURE;
     827             :     }
     828             : 
     829             :     /*
     830             :      * Initialize environment and start daemon. Wait until signaled to cancel
     831             :      * before shutting down
     832             :      */
     833          33 :     try {
     834          33 :         GKFS_DATA->spdlogger()->info("{}() Initializing environment", __func__);
     835          33 :         init_environment();
     836           0 :     } catch(const std::exception& e) {
     837           0 :         auto emsg =
     838           0 :                 fmt::format("Failed to initialize environment: {}", e.what());
     839           0 :         GKFS_DATA->spdlogger()->error(emsg);
     840           0 :         cerr << emsg << endl;
     841           0 :         destroy_enviroment();
     842           0 :         return EXIT_FAILURE;
     843             :     }
     844             : 
     845          33 :     signal(SIGINT, shutdown_handler);
     846          33 :     signal(SIGTERM, shutdown_handler);
     847          33 :     signal(SIGKILL, shutdown_handler);
     848             : 
     849          66 :     unique_lock<mutex> lk(mtx);
     850             :     // Wait for shutdown signal to initiate shutdown protocols
     851          33 :     shutdown_please.wait(lk);
     852          33 :     GKFS_DATA->spdlogger()->info("{}() Shutting down...", __func__);
     853          33 :     destroy_enviroment();
     854          33 :     GKFS_DATA->spdlogger()->info("{}() Complete. Exiting...", __func__);
     855          33 :     return EXIT_SUCCESS;
     856             : }

Generated by: LCOV version 1.16