LCOV - code coverage report
Current view: top level - src/client - preload.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 106 152 69.7 %
Date: 2024-04-23 00:09:24 Functions: 8 11 72.7 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS' POSIX interface.
      12             : 
      13             :   GekkoFS' POSIX interface is free software: you can redistribute it and/or
      14             :   modify it under the terms of the GNU Lesser General Public License as
      15             :   published by the Free Software Foundation, either version 3 of the License,
      16             :   or (at your option) any later version.
      17             : 
      18             :   GekkoFS' POSIX interface is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU Lesser General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU Lesser General Public License
      24             :   along with GekkoFS' POSIX interface.  If not, see
      25             :   <https://www.gnu.org/licenses/>.
      26             : 
      27             :   SPDX-License-Identifier: LGPL-3.0-or-later
      28             : */
      29             : 
      30             : #include <client/preload.hpp>
      31             : #include <client/path.hpp>
      32             : #include <client/logging.hpp>
      33             : #include <client/rpc/forward_management.hpp>
      34             : #include <client/preload_util.hpp>
      35             : #include <client/intercept.hpp>
      36             : 
      37             : #include <common/rpc/distributor.hpp>
      38             : #include <common/common_defs.hpp>
      39             : 
      40             : #include <ctime>
      41             : #include <cstdlib>
      42             : #include <fstream>
      43             : 
      44             : #include <hermes.hpp>
      45             : 
      46             : 
      47             : using namespace std;
      48             : 
      49             : std::unique_ptr<hermes::async_engine> ld_network_service; // extern variable
      50             : 
      51             : namespace {
      52             : 
      53             : // FORWARDING
      54             : pthread_t mapper;
      55             : bool forwarding_running;
      56             : 
      57             : pthread_mutex_t remap_mutex;
      58             : pthread_cond_t remap_signal;
      59             : // END FORWARDING
      60             : 
      61             : inline void
      62           0 : exit_error_msg(int errcode, const string& msg) {
      63             : 
      64           0 :     LOG_ERROR("{}", msg);
      65           0 :     gkfs::log::logger::log_message(stderr, "{}\n", msg);
      66             : 
      67             :     // if we don't disable interception before calling ::exit()
      68             :     // syscall hooks may find an inconsistent in shared state
      69             :     // (e.g. the logger) and thus, crash
      70           0 :     if(CTX->interception_enabled()) {
      71           0 :         gkfs::preload::stop_interception();
      72           0 :         CTX->disable_interception();
      73             :     }
      74           0 :     ::exit(errcode);
      75             : }
      76             : 
      77             : /**
      78             :  * Initializes the Hermes client for a given transport prefix
      79             :  * @return true if successfully initialized; false otherwise
      80             :  */
      81             : bool
      82         269 : init_hermes_client() {
      83             : 
      84         269 :     try {
      85             : 
      86         269 :         hermes::engine_options opts{};
      87             : 
      88         269 :         if(CTX->auto_sm())
      89           0 :             opts |= hermes::use_auto_sm;
      90         269 :         if(gkfs::rpc::protocol::ofi_psm2 == CTX->rpc_protocol()) {
      91           0 :             opts |= hermes::force_no_block_progress;
      92             :         }
      93             : 
      94         269 :         opts |= hermes::process_may_fork;
      95             : 
      96         807 :         ld_network_service = std::make_unique<hermes::async_engine>(
      97         538 :                 hermes::get_transport_type(CTX->rpc_protocol()), opts);
      98         269 :         ld_network_service->run();
      99           0 :     } catch(const std::exception& ex) {
     100           0 :         fmt::print(stderr, "Failed to initialize Hermes RPC client {}\n",
     101           0 :                    ex.what());
     102           0 :         return false;
     103             :     }
     104         269 :     return true;
     105             : }
     106             : 
     107             : void*
     108          21 : forwarding_mapper(void* p) {
     109          21 :     struct timespec timeout;
     110          21 :     clock_gettime(CLOCK_REALTIME, &timeout);
     111          21 :     timeout.tv_sec += 10; // 10 seconds
     112             : 
     113          21 :     int previous = -1;
     114             : 
     115          42 :     while(forwarding_running) {
     116          21 :         try {
     117          21 :             gkfs::utils::load_forwarding_map();
     118             : 
     119          21 :             if(previous != (int64_t) CTX->fwd_host_id()) {
     120          21 :                 LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
     121             : 
     122          21 :                 previous = CTX->fwd_host_id();
     123             :             }
     124           0 :         } catch(std::exception& e) {
     125           0 :             exit_error_msg(EXIT_FAILURE,
     126           0 :                            fmt::format("Unable set the forwarding host '{}'",
     127           0 :                                        e.what()));
     128             :         }
     129             : 
     130          21 :         pthread_mutex_lock(&remap_mutex);
     131          21 :         pthread_cond_timedwait(&remap_signal, &remap_mutex, &timeout);
     132          21 :         pthread_mutex_unlock(&remap_mutex);
     133             :     }
     134             : 
     135          21 :     return nullptr;
     136             : }
     137             : 
     138             : void
     139          21 : init_forwarding_mapper() {
     140          21 :     forwarding_running = true;
     141             : 
     142          21 :     pthread_create(&mapper, NULL, forwarding_mapper, NULL);
     143          21 : }
     144             : 
     145             : void
     146          21 : destroy_forwarding_mapper() {
     147          21 :     forwarding_running = false;
     148             : 
     149          21 :     pthread_cond_signal(&remap_signal);
     150             : 
     151          21 :     pthread_join(mapper, NULL);
     152          21 : }
     153             : 
     154             : void
     155         269 : log_prog_name() {
     156         269 :     std::string line;
     157         538 :     std::ifstream cmdline("/proc/self/cmdline");
     158         269 :     if(!cmdline.is_open()) {
     159           0 :         LOG(ERROR, "Unable to open cmdline file");
     160           0 :         throw std::runtime_error("Unable to open cmdline file");
     161             :     }
     162         269 :     if(!getline(cmdline, line)) {
     163           0 :         throw std::runtime_error("Unable to read cmdline file");
     164             :     }
     165         269 :     std::replace(line.begin(), line.end(), '\0', ' ');
     166         269 :     line.erase(line.length() - 1, line.length());
     167         269 :     LOG(INFO, "Process cmdline: '{}'", line);
     168         269 :     cmdline.close();
     169         269 : }
     170             : 
     171             : } // namespace
     172             : 
     173             : namespace gkfs::preload {
     174             : 
     175             : /**
     176             :  * This function is only called in the preload constructor and initializes
     177             :  * the file system client
     178             :  */
     179             : void
     180         269 : init_environment() {
     181             : 
     182         269 :     vector<pair<string, string>> hosts{};
     183         269 :     try {
     184         269 :         LOG(INFO, "Loading peer addresses...");
     185         538 :         hosts = gkfs::utils::read_hosts_file();
     186           0 :     } catch(const std::exception& e) {
     187           0 :         exit_error_msg(EXIT_FAILURE,
     188           0 :                        "Failed to load hosts addresses: "s + e.what());
     189             :     }
     190             : 
     191             :     // initialize Hermes interface to Mercury
     192         269 :     LOG(INFO, "Initializing RPC subsystem...");
     193             : 
     194         269 :     if(!init_hermes_client()) {
     195           0 :         exit_error_msg(EXIT_FAILURE, "Unable to initialize RPC subsystem");
     196             :     }
     197             : 
     198         269 :     try {
     199         269 :         gkfs::utils::connect_to_hosts(hosts);
     200           0 :     } catch(const std::exception& e) {
     201           0 :         exit_error_msg(EXIT_FAILURE,
     202           0 :                        "Failed to connect to hosts: "s + e.what());
     203             :     }
     204             : 
     205             :     /* Setup distributor */
     206         269 :     auto forwarding_map_file = gkfs::env::get_var(
     207         807 :             gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
     208             : 
     209         269 :     if(!forwarding_map_file.empty()) {
     210          21 :         try {
     211          21 :             gkfs::utils::load_forwarding_map();
     212             : 
     213          21 :             LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
     214           0 :         } catch(std::exception& e) {
     215           0 :             exit_error_msg(EXIT_FAILURE,
     216           0 :                            fmt::format("Unable set the forwarding host '{}'",
     217           0 :                                        e.what()));
     218             :         }
     219             : 
     220          21 :         auto forwarder_dist = std::make_shared<gkfs::rpc::ForwarderDistributor>(
     221          42 :                 CTX->fwd_host_id(), CTX->hosts().size());
     222          63 :         CTX->distributor(forwarder_dist);
     223             :     } else {
     224             : 
     225             : #ifdef GKFS_USE_GUIDED_DISTRIBUTION
     226         248 :         auto distributor = std::make_shared<gkfs::rpc::GuidedDistributor>(
     227         496 :                 CTX->local_host_id(), CTX->hosts().size());
     228             : #else
     229             :         auto distributor = std::make_shared<gkfs::rpc::SimpleHashDistributor>(
     230             :                 CTX->local_host_id(), CTX->hosts().size());
     231             : #endif
     232         744 :         CTX->distributor(distributor);
     233             :     }
     234             : 
     235         269 :     LOG(INFO, "Retrieving file system configuration...");
     236             : 
     237         269 :     if(!gkfs::rpc::forward_get_fs_config()) {
     238           0 :         exit_error_msg(
     239             :                 EXIT_FAILURE,
     240           0 :                 "Unable to fetch file system configurations from daemon process through RPC.");
     241             :     }
     242             :     // Initialize random number generator and seed for replica selection
     243             :     // in case of failure, a new replica will be selected
     244         269 :     if(CTX->get_replicas() > 0) {
     245           0 :         srand(time(nullptr));
     246             :     }
     247             : 
     248         269 :     LOG(INFO, "Environment initialization successful.");
     249         269 : }
     250             : 
     251             : } // namespace gkfs::preload
     252             : 
     253             : /**
     254             :  * Called initially ONCE when preload library is used with the LD_PRELOAD
     255             :  * environment variable
     256             :  */
     257             : void
     258         269 : init_preload() {
     259             :     // The original errno value will be restored after initialization to not
     260             :     // leak internal error codes
     261         269 :     auto oerrno = errno;
     262             : 
     263         269 :     CTX->enable_interception();
     264         269 :     gkfs::preload::start_self_interception();
     265             : 
     266         269 :     CTX->init_logging();
     267             :     // from here ownwards it is safe to print messages
     268         269 :     LOG(DEBUG, "Logging subsystem initialized");
     269             : 
     270             :     // Kernel modules such as ib_uverbs may create fds in kernel space and pass
     271             :     // them to user-space processes using ioctl()-like interfaces. if this
     272             :     // happens during our internal initialization, there's no way for us to
     273             :     // control this creation and the fd will be created in the
     274             :     // [0, MAX_USER_FDS) range rather than in our private
     275             :     // [MAX_USER_FDS, GKFS_MAX_OPEN_FDS) range.
     276             :     // with MAX_USER_FDS = GKFS_MAX_OPEN_FDS - GKFS_MAX_INTERNAL_FDS
     277             :     // To prevent this for our internal
     278             :     // initialization code, we forcefully occupy the user fd range to force
     279             :     // such modules to create fds in our private range.
     280         269 :     CTX->protect_user_fds();
     281             : 
     282         269 :     log_prog_name();
     283         269 :     gkfs::path::init_cwd();
     284             : 
     285         269 :     LOG(DEBUG, "Current working directory: '{}'", CTX->cwd());
     286         269 :     LOG(DEBUG, "Number of replicas : '{}'", CTX->get_replicas());
     287         269 :     gkfs::preload::init_environment();
     288         269 :     CTX->enable_interception();
     289             : 
     290         269 :     CTX->unprotect_user_fds();
     291             : 
     292         269 :     auto forwarding_map_file = gkfs::env::get_var(
     293         538 :             gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
     294         269 :     if(!forwarding_map_file.empty()) {
     295          21 :         init_forwarding_mapper();
     296             :     }
     297             : 
     298         269 :     gkfs::preload::start_interception();
     299         269 :     errno = oerrno;
     300         269 : }
     301             : 
     302             : /**
     303             :  * Called last when preload library is used with the LD_PRELOAD environment
     304             :  * variable
     305             :  */
     306             : void
     307         269 : destroy_preload() {
     308         269 :     auto forwarding_map_file = gkfs::env::get_var(
     309         538 :             gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
     310         269 :     if(!forwarding_map_file.empty()) {
     311          21 :         destroy_forwarding_mapper();
     312             :     }
     313         269 :     CTX->clear_hosts();
     314         269 :     LOG(DEBUG, "Peer information deleted");
     315             : 
     316         269 :     ld_network_service.reset();
     317         269 :     LOG(DEBUG, "RPC subsystem shut down");
     318             : 
     319         269 :     if(CTX->interception_enabled()) {
     320         269 :         gkfs::preload::stop_interception();
     321         269 :         CTX->disable_interception();
     322         269 :         LOG(DEBUG, "Syscall interception stopped");
     323             :     }
     324             : 
     325         269 :     LOG(INFO, "All subsystems shut down. Client shutdown complete.");
     326         269 : }
     327             : 
     328             : 
     329             : /**
     330             :  * @brief External functions to call linking the library
     331             :  *
     332             :  */
     333             : extern "C" int
     334           0 : gkfs_init() {
     335           0 :     CTX->init_logging();
     336             : 
     337             :     // from here ownwards it is safe to print messages
     338           0 :     LOG(DEBUG, "Logging subsystem initialized");
     339             : 
     340           0 :     gkfs::preload::init_environment();
     341             : 
     342           0 :     return 0;
     343             : }
     344             : 
     345             : 
     346             : extern "C" int
     347           0 : gkfs_end() {
     348           0 :     CTX->clear_hosts();
     349           0 :     LOG(DEBUG, "Peer information deleted");
     350             : 
     351           0 :     ld_network_service.reset();
     352           0 :     LOG(DEBUG, "RPC subsystem shut down");
     353             : 
     354           0 :     LOG(INFO, "All subsystems shut down. Client shutdown complete.");
     355             : 
     356           0 :     return 0;
     357             : }

Generated by: LCOV version 1.16