LCOV - code coverage report
Current view: top level - src/client - preload_util.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 112 205 54.6 %
Date: 2024-04-30 13:21:35 Functions: 7 9 77.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS' POSIX interface.
      12             : 
      13             :   GekkoFS' POSIX interface is free software: you can redistribute it and/or
      14             :   modify it under the terms of the GNU Lesser General Public License as
      15             :   published by the Free Software Foundation, either version 3 of the License,
      16             :   or (at your option) any later version.
      17             : 
      18             :   GekkoFS' POSIX interface is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU Lesser General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU Lesser General Public License
      24             :   along with GekkoFS' POSIX interface.  If not, see
      25             :   <https://www.gnu.org/licenses/>.
      26             : 
      27             :   SPDX-License-Identifier: LGPL-3.0-or-later
      28             : */
      29             : 
      30             : #include <client/preload_util.hpp>
      31             : #include <client/env.hpp>
      32             : #include <client/logging.hpp>
      33             : #include <client/rpc/forward_metadata.hpp>
      34             : 
      35             : #include <common/rpc/distributor.hpp>
      36             : #include <common/rpc/rpc_util.hpp>
      37             : #include <common/env_util.hpp>
      38             : #include <common/common_defs.hpp>
      39             : 
      40             : #include <hermes.hpp>
      41             : 
      42             : #include <fstream>
      43             : #include <sstream>
      44             : #include <regex>
      45             : #include <csignal>
      46             : #include <random>
      47             : 
      48             : extern "C" {
      49             : #include <sys/sysmacros.h>
      50             : }
      51             : 
      52             : using namespace std;
      53             : 
      54             : namespace {
      55             : 
      56             : /**
      57             :  * Looks up a host endpoint via Hermes
      58             :  * @param uri
      59             :  * @param max_retries
      60             :  * @return hermes endpoint, if successful
      61             :  * @throws std::runtime_error
      62             :  */
      63             : hermes::endpoint
      64         248 : lookup_endpoint(const std::string& uri, std::size_t max_retries = 3) {
      65             : 
      66         248 :     LOG(DEBUG, "Looking up address \"{}\"", uri);
      67             : 
      68         248 :     std::random_device rd; // obtain a random number from hardware
      69         248 :     std::size_t attempts = 0;
      70         496 :     std::string error_msg;
      71             : 
      72         248 :     do {
      73         248 :         try {
      74         496 :             return ld_network_service->lookup(uri);
      75           0 :         } catch(const exception& ex) {
      76           0 :             error_msg = ex.what();
      77             : 
      78           0 :             LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]", uri,
      79           0 :                 attempts + 1, max_retries);
      80             : 
      81             :             // Wait a random amount of time and try again
      82           0 :             std::mt19937 g(rd()); // seed the random generator
      83           0 :             std::uniform_int_distribution<> distr(
      84           0 :                     50, 50 * (attempts + 2)); // define the range
      85           0 :             std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
      86           0 :             continue;
      87             :         }
      88           0 :     } while(++attempts < max_retries);
      89             : 
      90           0 :     throw std::runtime_error(
      91           0 :             fmt::format("Endpoint for address '{}' could not be found ({})",
      92           0 :                         uri, error_msg));
      93             : }
      94             : 
      95             : /**
      96             :  * extracts protocol from a given URI generated by the RPC server of the daemon
      97             :  * @param uri
      98             :  * @throws std::runtime_error
      99             :  */
     100             : void
     101         248 : extract_protocol(const string& uri) {
     102         248 :     if(uri.rfind("://") == string::npos) {
     103             :         // invalid format. kill client
     104           0 :         throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri));
     105             :     }
     106         248 :     string protocol{};
     107             : 
     108         248 :     if(uri.find(gkfs::rpc::protocol::ofi_sockets) != string::npos) {
     109         248 :         protocol = gkfs::rpc::protocol::ofi_sockets;
     110           0 :     } else if(uri.find(gkfs::rpc::protocol::ofi_psm2) != string::npos) {
     111           0 :         protocol = gkfs::rpc::protocol::ofi_psm2;
     112           0 :     } else if(uri.find(gkfs::rpc::protocol::ofi_verbs) != string::npos) {
     113           0 :         protocol = gkfs::rpc::protocol::ofi_verbs;
     114             :     }
     115             :     // check for shared memory protocol. Can be plain shared memory or real ofi
     116             :     // protocol + auto_sm
     117         248 :     if(uri.find(gkfs::rpc::protocol::na_sm) != string::npos) {
     118           0 :         if(protocol.empty())
     119           0 :             protocol = gkfs::rpc::protocol::na_sm;
     120             :         else
     121           0 :             CTX->auto_sm(true);
     122             :     }
     123         248 :     if(protocol.empty()) {
     124             :         // unsupported protocol. kill client
     125           0 :         throw runtime_error(fmt::format(
     126             :                 "Unsupported RPC protocol found in hosts file with URI: '{}'",
     127           0 :                 uri));
     128             :     }
     129         248 :     LOG(INFO,
     130             :         "RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'",
     131         248 :         protocol, CTX->auto_sm());
     132         248 :     CTX->rpc_protocol(protocol);
     133         248 : }
     134             : 
     135             : /**
     136             :  * Reads the daemon generator hosts file by a given path, returning hosts and
     137             :  * URI addresses
     138             :  * @param path to hosts file
     139             :  * @return vector<pair<hosts, URI>>
     140             :  * @throws std::runtime_error
     141             :  */
     142             : vector<pair<string, string>>
     143         248 : load_hostfile(const std::string& path) {
     144             : 
     145         248 :     LOG(DEBUG, "Loading hosts file: \"{}\"", path);
     146             : 
     147         496 :     ifstream lf(path);
     148         248 :     if(!lf) {
     149           0 :         throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
     150           0 :                                         path, strerror(errno)));
     151             :     }
     152         248 :     vector<pair<string, string>> hosts;
     153         248 :     const regex line_re("^(\\S+)\\s+(\\S+)$",
     154         248 :                         regex::ECMAScript | regex::optimize);
     155         496 :     string line;
     156         248 :     string host;
     157         248 :     string uri;
     158         496 :     std::smatch match;
     159         496 :     while(getline(lf, line)) {
     160         248 :         if(!regex_match(line, match, line_re)) {
     161             : 
     162           0 :             LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
     163           0 :                 path, line);
     164             : 
     165           0 :             throw runtime_error(
     166           0 :                     fmt::format("unrecognized line format: '{}'", line));
     167             :         }
     168         248 :         host = match[1];
     169         248 :         uri = match[2];
     170         248 :         hosts.emplace_back(host, uri);
     171             :     }
     172         248 :     if(hosts.empty()) {
     173           0 :         throw runtime_error(
     174           0 :                 "Hosts file found but no suitable addresses could be extracted");
     175             :     }
     176         248 :     extract_protocol(hosts[0].second);
     177             :     // sort hosts so that data always hashes to the same place during restart
     178         248 :     std::sort(hosts.begin(), hosts.end());
     179             :     // remove rootdir suffix from host after sorting as no longer required
     180         496 :     for(auto& h : hosts) {
     181         248 :         auto idx = h.first.rfind("#");
     182         248 :         if(idx != string::npos)
     183           0 :             h.first.erase(idx, h.first.length());
     184             :     }
     185         496 :     return hosts;
     186             : }
     187             : 
     188             : } // namespace
     189             : 
     190             : namespace gkfs::utils {
     191             : 
     192             : 
     193             : /**
     194             :  * Retrieve metadata from daemon and return Metadata object
     195             :  * errno may be set
     196             :  * @param path
     197             :  * @param follow_links
     198             :  * @return Metadata
     199             :  */
     200             : optional<gkfs::metadata::Metadata>
     201        1359 : get_metadata(const string& path, bool follow_links) {
     202        2718 :     std::string attr;
     203        1359 :     auto err = gkfs::rpc::forward_stat(path, attr, 0);
     204             :     // TODO: retry on failure
     205             : 
     206        1359 :     if(err) {
     207          24 :         auto copy = 1;
     208          24 :         while(copy < CTX->get_replicas() + 1 && err) {
     209           0 :             LOG(ERROR, "Retrying Stat on replica {} {}", copy, follow_links);
     210           0 :             err = gkfs::rpc::forward_stat(path, attr, copy);
     211           0 :             copy++;
     212             :         }
     213          24 :         if(err) {
     214          24 :             errno = err;
     215          24 :             return {};
     216             :         }
     217             :     }
     218             : #ifdef HAS_SYMLINKS
     219        1335 :     if(follow_links) {
     220         144 :         gkfs::metadata::Metadata md{attr};
     221          72 :         while(md.is_link()) {
     222           0 :             err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
     223           0 :             if(err) {
     224           0 :                 errno = err;
     225           0 :                 return {};
     226             :             }
     227           0 :             md = gkfs::metadata::Metadata{attr};
     228             :         }
     229             :     }
     230             : #endif
     231        2670 :     return gkfs::metadata::Metadata{attr};
     232             : }
     233             : 
     234             : 
     235             : /**
     236             :  * Converts the Metadata object into a stat struct, which is needed by Linux
     237             :  * @param path
     238             :  * @param md
     239             :  * @param attr
     240             :  * @return
     241             :  */
     242             : int
     243          53 : metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
     244             :                  struct stat& attr) {
     245             : 
     246             :     /* Populate default values */
     247          53 :     attr.st_dev = makedev(0, 0);
     248          53 :     attr.st_ino = std::hash<std::string>{}(path);
     249          53 :     attr.st_nlink = 1;
     250          53 :     attr.st_uid = CTX->fs_conf()->uid;
     251          53 :     attr.st_gid = CTX->fs_conf()->gid;
     252          53 :     attr.st_rdev = 0;
     253          53 :     attr.st_blksize = gkfs::config::rpc::chunksize;
     254          53 :     attr.st_blocks = 0;
     255             : 
     256          53 :     memset(&attr.st_atim, 0, sizeof(timespec));
     257          53 :     memset(&attr.st_mtim, 0, sizeof(timespec));
     258          53 :     memset(&attr.st_ctim, 0, sizeof(timespec));
     259             : 
     260          53 :     attr.st_mode = md.mode();
     261             : 
     262             : #ifdef HAS_SYMLINKS
     263          53 :     if(md.is_link())
     264           0 :         attr.st_size = md.target_path().size() + CTX->mountdir().size();
     265             :     else
     266             : #endif
     267          53 :         attr.st_size = md.size();
     268             : 
     269          53 :     if(CTX->fs_conf()->atime_state) {
     270          53 :         attr.st_atim.tv_sec = md.atime();
     271             :     }
     272          53 :     if(CTX->fs_conf()->mtime_state) {
     273          53 :         attr.st_mtim.tv_sec = md.mtime();
     274             :     }
     275          53 :     if(CTX->fs_conf()->ctime_state) {
     276          53 :         attr.st_ctim.tv_sec = md.ctime();
     277             :     }
     278          53 :     if(CTX->fs_conf()->link_cnt_state) {
     279          53 :         attr.st_nlink = md.link_count();
     280             :     }
     281          53 :     if(CTX->fs_conf()->blocks_state) {
     282          53 :         attr.st_blocks = md.blocks();
     283             :     } else {
     284           0 :         attr.st_blocks = md.size() / gkfs::config::syscall::stat::st_nblocksize;
     285             :     }
     286          53 :     return 0;
     287             : }
     288             : 
     289             : #ifdef GKFS_ENABLE_FORWARDING
     290             : map<string, uint64_t>
     291           0 : load_forwarding_map_file(const std::string& lfpath) {
     292             : 
     293           0 :     LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath);
     294             : 
     295           0 :     ifstream lf(lfpath);
     296           0 :     if(!lf) {
     297           0 :         throw runtime_error(
     298           0 :                 fmt::format("Failed to open forwarding map file '{}': {}",
     299           0 :                             lfpath, strerror(errno)));
     300             :     }
     301           0 :     map<string, uint64_t> forwarding_map;
     302           0 :     const regex line_re("^(\\S+)\\s+(\\S+)$",
     303           0 :                         regex::ECMAScript | regex::optimize);
     304           0 :     string line;
     305           0 :     string host;
     306           0 :     uint64_t forwarder;
     307           0 :     std::smatch match;
     308           0 :     while(getline(lf, line)) {
     309           0 :         if(!regex_match(line, match, line_re)) {
     310             : 
     311           0 :             LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
     312           0 :                 lfpath, line);
     313             : 
     314           0 :             throw runtime_error(
     315           0 :                     fmt::format("unrecognized line format: '{}'", line));
     316             :         }
     317           0 :         host = match[1];
     318           0 :         forwarder = std::stoi(match[2].str());
     319           0 :         forwarding_map[host] = forwarder;
     320             :     }
     321           0 :     return forwarding_map;
     322             : }
     323             : #endif
     324             : 
     325             : #ifdef GKFS_ENABLE_FORWARDING
     326             : void
     327           0 : load_forwarding_map() {
     328           0 :     string forwarding_map_file;
     329             : 
     330           0 :     forwarding_map_file = gkfs::env::get_var(
     331           0 :             gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
     332             : 
     333           0 :     map<string, uint64_t> forwarding_map;
     334             : 
     335           0 :     while(forwarding_map.size() == 0) {
     336           0 :         try {
     337           0 :             forwarding_map = load_forwarding_map_file(forwarding_map_file);
     338           0 :         } catch(const exception& e) {
     339           0 :             auto emsg = fmt::format("Failed to load forwarding map file: {}",
     340           0 :                                     e.what());
     341           0 :             throw runtime_error(emsg);
     342             :         }
     343             :     }
     344             : 
     345             :     // if (forwarding_map.size() == 0) {
     346             :     //    throw runtime_error(fmt::format("Forwarding map file is empty: '{}'",
     347             :     //    forwarding_map_file));
     348             :     //}
     349             : 
     350           0 :     auto local_hostname = gkfs::rpc::get_my_hostname(true);
     351             : 
     352           0 :     if(forwarding_map.find(local_hostname) == forwarding_map.end()) {
     353           0 :         throw runtime_error(
     354           0 :                 fmt::format("Unable to determine the forwarder for host: '{}'",
     355           0 :                             local_hostname));
     356             :     }
     357           0 :     LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname,
     358           0 :         forwarding_map[local_hostname]);
     359             : 
     360           0 :     CTX->fwd_host_id(forwarding_map[local_hostname]);
     361           0 : }
     362             : #endif
     363             : 
     364             : vector<pair<string, string>>
     365         248 : read_hosts_file() {
     366         248 :     string hostfile;
     367             : 
     368         248 :     hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
     369         496 :                                   gkfs::config::hostfile_path);
     370             : 
     371         248 :     vector<pair<string, string>> hosts;
     372         248 :     try {
     373         496 :         hosts = load_hostfile(hostfile);
     374           0 :     } catch(const exception& e) {
     375           0 :         auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
     376           0 :         throw runtime_error(emsg);
     377             :     }
     378             : 
     379         248 :     if(hosts.empty()) {
     380           0 :         throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
     381             :     }
     382             : 
     383         248 :     LOG(INFO, "Hosts pool size: {}", hosts.size());
     384         248 :     return hosts;
     385             : }
     386             : 
     387             : /**
     388             :  * Connects to daemons and lookup Mercury URI addresses via Hermes
     389             :  * @param hosts vector<pair<hostname, Mercury URI address>>
     390             :  * @throws std::runtime_error through lookup_endpoint()
     391             :  */
     392             : void
     393         248 : connect_to_hosts(const vector<pair<string, string>>& hosts) {
     394         248 :     auto local_hostname = gkfs::rpc::get_my_hostname(true);
     395         248 :     bool local_host_found = false;
     396             : 
     397         496 :     std::vector<hermes::endpoint> addrs;
     398         248 :     addrs.resize(hosts.size());
     399             : 
     400         496 :     vector<uint64_t> host_ids(hosts.size());
     401             :     // populate vector with [0, ..., host_size - 1]
     402         248 :     ::iota(::begin(host_ids), ::end(host_ids), 0);
     403             :     /*
     404             :      * Shuffle hosts to balance addr lookups to all hosts
     405             :      * Too many concurrent lookups send to same host
     406             :      * could overwhelm the server,
     407             :      * returning error when addr lookup
     408             :      */
     409         496 :     ::random_device rd; // obtain a random number from hardware
     410         248 :     ::mt19937 g(rd());  // seed the random generator
     411         248 :     ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
     412             :     // lookup addresses and put abstract server addresses into rpc_addresses
     413             : 
     414         496 :     for(const auto& id : host_ids) {
     415         248 :         const auto& hostname = hosts.at(id).first;
     416         248 :         const auto& uri = hosts.at(id).second;
     417             : 
     418         248 :         addrs[id] = lookup_endpoint(uri);
     419             : 
     420         248 :         if(!local_host_found && hostname == local_hostname) {
     421         248 :             LOG(DEBUG, "Found local host: {}", hostname);
     422         248 :             CTX->local_host_id(id);
     423             :             local_host_found = true;
     424             :         }
     425             : 
     426         496 :         LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
     427             :     }
     428             : 
     429         248 :     if(!local_host_found) {
     430           0 :         LOG(WARNING, "Failed to find local host. Using host '0' as local host");
     431           0 :         CTX->local_host_id(0);
     432             :     }
     433             : 
     434         248 :     CTX->hosts(addrs);
     435         248 : }
     436             : 
     437             : } // namespace gkfs::utils

Generated by: LCOV version 1.16