LCOV - code coverage report
Current view: top level - src/client - preload.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 64 128 50.0 %
Date: 2024-04-30 13:21:35 Functions: 5 9 55.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS' POSIX interface.
      12             : 
      13             :   GekkoFS' POSIX interface is free software: you can redistribute it and/or
      14             :   modify it under the terms of the GNU Lesser General Public License as
      15             :   published by the Free Software Foundation, either version 3 of the License,
      16             :   or (at your option) any later version.
      17             : 
      18             :   GekkoFS' POSIX interface is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU Lesser General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU Lesser General Public License
      24             :   along with GekkoFS' POSIX interface.  If not, see
      25             :   <https://www.gnu.org/licenses/>.
      26             : 
      27             :   SPDX-License-Identifier: LGPL-3.0-or-later
      28             : */
      29             : 
      30             : #include <client/preload.hpp>
      31             : #include <client/path.hpp>
      32             : #include <client/logging.hpp>
      33             : #include <client/rpc/forward_management.hpp>
      34             : #include <client/preload_util.hpp>
      35             : #include <client/intercept.hpp>
      36             : 
      37             : #include <common/rpc/distributor.hpp>
      38             : #include <common/common_defs.hpp>
      39             : 
      40             : #include <ctime>
      41             : #include <cstdlib>
      42             : #include <fstream>
      43             : 
      44             : #include <hermes.hpp>
      45             : 
      46             : 
      47             : using namespace std;
      48             : 
      49             : std::unique_ptr<hermes::async_engine> ld_network_service; // extern variable
      50             : 
      51             : namespace {
      52             : 
      53             : #ifdef GKFS_ENABLE_FORWARDING
      54             : pthread_t mapper;
      55             : bool forwarding_running;
      56             : 
      57             : pthread_mutex_t remap_mutex;
      58             : pthread_cond_t remap_signal;
      59             : #endif
      60             : 
      61             : inline void
      62           0 : exit_error_msg(int errcode, const string& msg) {
      63             : 
      64           0 :     LOG_ERROR("{}", msg);
      65           0 :     gkfs::log::logger::log_message(stderr, "{}\n", msg);
      66             : 
      67             :     // if we don't disable interception before calling ::exit()
      68             :     // syscall hooks may find an inconsistent in shared state
      69             :     // (e.g. the logger) and thus, crash
      70           0 :     gkfs::preload::stop_interception();
      71           0 :     CTX->disable_interception();
      72           0 :     ::exit(errcode);
      73             : }
      74             : 
      75             : /**
      76             :  * Initializes the Hermes client for a given transport prefix
      77             :  * @return true if successfully initialized; false otherwise
      78             :  */
      79             : bool
      80         248 : init_hermes_client() {
      81             : 
      82         248 :     try {
      83             : 
      84         248 :         hermes::engine_options opts{};
      85             : 
      86         248 :         if(CTX->auto_sm())
      87           0 :             opts |= hermes::use_auto_sm;
      88         248 :         if(gkfs::rpc::protocol::ofi_psm2 == CTX->rpc_protocol()) {
      89           0 :             opts |= hermes::force_no_block_progress;
      90             :         }
      91             : 
      92         248 :         opts |= hermes::process_may_fork;
      93             : 
      94         744 :         ld_network_service = std::make_unique<hermes::async_engine>(
      95         496 :                 hermes::get_transport_type(CTX->rpc_protocol()), opts);
      96         248 :         ld_network_service->run();
      97           0 :     } catch(const std::exception& ex) {
      98           0 :         fmt::print(stderr, "Failed to initialize Hermes RPC client {}\n",
      99           0 :                    ex.what());
     100           0 :         return false;
     101             :     }
     102         248 :     return true;
     103             : }
     104             : 
     105             : #ifdef GKFS_ENABLE_FORWARDING
     106             : void*
     107           0 : forwarding_mapper(void* p) {
     108           0 :     struct timespec timeout;
     109           0 :     clock_gettime(CLOCK_REALTIME, &timeout);
     110           0 :     timeout.tv_sec += 10; // 10 seconds
     111             : 
     112           0 :     int previous = -1;
     113             : 
     114           0 :     while(forwarding_running) {
     115           0 :         try {
     116           0 :             gkfs::utils::load_forwarding_map();
     117             : 
     118           0 :             if(previous != CTX->fwd_host_id()) {
     119           0 :                 LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
     120             : 
     121           0 :                 previous = CTX->fwd_host_id();
     122             :             }
     123           0 :         } catch(std::exception& e) {
     124           0 :             exit_error_msg(EXIT_FAILURE,
     125           0 :                            fmt::format("Unable set the forwarding host '{}'",
     126           0 :                                        e.what()));
     127             :         }
     128             : 
     129           0 :         pthread_mutex_lock(&remap_mutex);
     130           0 :         pthread_cond_timedwait(&remap_signal, &remap_mutex, &timeout);
     131           0 :         pthread_mutex_unlock(&remap_mutex);
     132             :     }
     133             : 
     134           0 :     return nullptr;
     135             : }
     136             : #endif
     137             : 
     138             : #ifdef GKFS_ENABLE_FORWARDING
     139             : void
     140           0 : init_forwarding_mapper() {
     141           0 :     forwarding_running = true;
     142             : 
     143           0 :     pthread_create(&mapper, NULL, forwarding_mapper, NULL);
     144           0 : }
     145             : #endif
     146             : 
     147             : #ifdef GKFS_ENABLE_FORWARDING
     148             : void
     149           0 : destroy_forwarding_mapper() {
     150           0 :     forwarding_running = false;
     151             : 
     152           0 :     pthread_cond_signal(&remap_signal);
     153             : 
     154           0 :     pthread_join(mapper, NULL);
     155           0 : }
     156             : #endif
     157             : 
     158             : void
     159         248 : log_prog_name() {
     160         248 :     std::string line;
     161         496 :     std::ifstream cmdline("/proc/self/cmdline");
     162         248 :     if(!cmdline.is_open()) {
     163           0 :         LOG(ERROR, "Unable to open cmdline file");
     164           0 :         throw std::runtime_error("Unable to open cmdline file");
     165             :     }
     166         248 :     if(!getline(cmdline, line)) {
     167           0 :         throw std::runtime_error("Unable to read cmdline file");
     168             :     }
     169         248 :     std::replace(line.begin(), line.end(), '\0', ' ');
     170         248 :     line.erase(line.length() - 1, line.length());
     171         248 :     LOG(INFO, "Process cmdline: '{}'", line);
     172         248 :     cmdline.close();
     173         248 : }
     174             : 
     175             : } // namespace
     176             : 
     177             : namespace gkfs::preload {
     178             : 
     179             : /**
     180             :  * This function is only called in the preload constructor and initializes
     181             :  * the file system client
     182             :  */
     183             : void
     184         248 : init_environment() {
     185             : 
     186         248 :     vector<pair<string, string>> hosts{};
     187         248 :     try {
     188         248 :         LOG(INFO, "Loading peer addresses...");
     189         496 :         hosts = gkfs::utils::read_hosts_file();
     190           0 :     } catch(const std::exception& e) {
     191           0 :         exit_error_msg(EXIT_FAILURE,
     192           0 :                        "Failed to load hosts addresses: "s + e.what());
     193             :     }
     194             : 
     195             :     // initialize Hermes interface to Mercury
     196         248 :     LOG(INFO, "Initializing RPC subsystem...");
     197             : 
     198         248 :     if(!init_hermes_client()) {
     199           0 :         exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
     200             :     }
     201             : 
     202         248 :     try {
     203         248 :         gkfs::utils::connect_to_hosts(hosts);
     204           0 :     } catch(const std::exception& e) {
     205           0 :         exit_error_msg(EXIT_FAILURE,
     206           0 :                        "Failed to connect to hosts: "s + e.what());
     207             :     }
     208             : 
     209             :     /* Setup distributor */
     210             : #ifdef GKFS_ENABLE_FORWARDING
     211           0 :     try {
     212           0 :         gkfs::utils::load_forwarding_map();
     213             : 
     214           0 :         LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
     215           0 :     } catch(std::exception& e) {
     216           0 :         exit_error_msg(
     217             :                 EXIT_FAILURE,
     218           0 :                 fmt::format("Unable set the forwarding host '{}'", e.what()));
     219             :     }
     220             : 
     221           0 :     auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(
     222           0 :             CTX->fwd_host_id(), CTX->hosts().size());
     223           0 :     CTX->distributor(forwarder_dist);
     224             : #else
     225             : #ifdef GKFS_USE_GUIDED_DISTRIBUTION
     226         248 :     auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
     227         496 :             CTX->local_host_id(), CTX->hosts().size());
     228             : #else
     229             :     auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
     230             :             CTX->local_host_id(), CTX->hosts().size());
     231             : #endif
     232         496 :     CTX->distributor(distributor);
     233             : #endif
     234             : 
     235             : 
     236         248 :     LOG(INFO, "Retrieving file system configuration...");
     237             : 
     238         248 :     if(!gkfs::rpc::forward_get_fs_config()) {
     239           0 :         exit_error_msg(
     240             :                 EXIT_FAILURE,
     241           0 :                 "Unable to fetch file system configurations from daemon process through RPC.");
     242             :     }
     243             :     // Initialize random number generator and seed for replica selection
     244             :     // in case of failure, a new replica will be selected
     245         248 :     if(CTX->get_replicas() > 0) {
     246           0 :         srand(time(nullptr));
     247             :     }
     248             : 
     249         248 :     LOG(INFO, "Environment initialization successful.");
     250         248 : }
     251             : 
     252             : } // namespace gkfs::preload
     253             : 
     254             : /**
     255             :  * Called initially ONCE when preload library is used with the LD_PRELOAD
     256             :  * environment variable
     257             :  */
     258             : void
     259         248 : init_preload() {
     260             :     // The original errno value will be restored after initialization to not
     261             :     // leak internal error codes
     262         248 :     auto oerrno = errno;
     263             : 
     264         248 :     CTX->enable_interception();
     265         248 :     gkfs::preload::start_self_interception();
     266             : 
     267         248 :     CTX->init_logging();
     268             :     // from here ownwards it is safe to print messages
     269         248 :     LOG(DEBUG, "Logging subsystem initialized");
     270             : 
     271             :     // Kernel modules such as ib_uverbs may create fds in kernel space and pass
     272             :     // them to user-space processes using ioctl()-like interfaces. if this
     273             :     // happens during our internal initialization, there's no way for us to
     274             :     // control this creation and the fd will be created in the
     275             :     // [0, MAX_USER_FDS) range rather than in our private
     276             :     // [MAX_USER_FDS, GKFS_MAX_OPEN_FDS) range.
     277             :     // with MAX_USER_FDS = GKFS_MAX_OPEN_FDS - GKFS_MAX_INTERNAL_FDS
     278             :     // To prevent this for our internal
     279             :     // initialization code, we forcefully occupy the user fd range to force
     280             :     // such modules to create fds in our private range.
     281         248 :     CTX->protect_user_fds();
     282             : 
     283         248 :     log_prog_name();
     284         248 :     gkfs::path::init_cwd();
     285             : 
     286         248 :     LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
     287         248 :     LOG(DEBUG, "Number of replicas : '{}'", CTX->get_replicas());
     288         248 :     gkfs::preload::init_environment();
     289         248 :     CTX->enable_interception();
     290             : 
     291         248 :     CTX->unprotect_user_fds();
     292             : 
     293             : #ifdef GKFS_ENABLE_FORWARDING
     294           0 :     init_forwarding_mapper();
     295             : #endif
     296             : 
     297         248 :     gkfs::preload::start_interception();
     298         248 :     errno = oerrno;
     299         248 : }
     300             : 
     301             : /**
     302             :  * Called last when preload library is used with the LD_PRELOAD environment
     303             :  * variable
     304             :  */
     305             : void
     306         248 : destroy_preload() {
     307             : #ifdef GKFS_ENABLE_FORWARDING
     308           0 :     destroy_forwarding_mapper();
     309             : #endif
     310             : 
     311         248 :     CTX->clear_hosts();
     312         248 :     LOG(DEBUG, "Peer information deleted");
     313             : 
     314         248 :     ld_network_service.reset();
     315         248 :     LOG(DEBUG, "RPC subsystem shut down");
     316             : 
     317         248 :     gkfs::preload::stop_interception();
     318         248 :     CTX->disable_interception();
     319         248 :     LOG(DEBUG, "Syscall interception stopped");
     320             : 
     321         248 :     LOG(INFO, "All subsystems shut down. Client shutdown complete.");
     322         248 : }

Generated by: LCOV version 1.16