/* Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany This software was partially supported by the EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). This software was partially supported by the ADA-FS project under the SPPEXA project funded by the DFG. This file is part of GekkoFS' POSIX interface. GekkoFS' POSIX interface is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. GekkoFS' POSIX interface is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with GekkoFS' POSIX interface. If not, see <https://www.gnu.org/licenses/>. SPDX-License-Identifier: LGPL-3.0-or-later */ #include <client/preload_util.hpp> #include <client/env.hpp> #include <client/logging.hpp> #include <client/rpc/forward_metadata.hpp> #include <client/rpc/forward_metadata_proxy.hpp> #include <client/cache.hpp> #include <common/rpc/distributor.hpp> #include <common/rpc/rpc_util.hpp> #include <common/env_util.hpp> #include <common/common_defs.hpp> #include <hermes.hpp> #include <fstream> #include <sstream> #include <regex> #include <csignal> #include <random> #include <filesystem> extern "C" { #include <sys/sysmacros.h> } using namespace std; namespace { /** * Looks up a host endpoint via Hermes * @param uri * @param max_retries * @return hermes endpoint, if successful * @throws std::runtime_error */ hermes::endpoint lookup_endpoint(const std::string& uri, bool use_proxy = false, std::size_t max_retries = 3) { LOG(DEBUG, "Looking up address \"{}\"", uri); std::random_device rd; // obtain a random number from hardware std::size_t attempts = 0; std::string error_msg; do { try { if(use_proxy) return ld_proxy_service->lookup(uri); else return ld_network_service->lookup(uri); } catch(const exception& ex) { error_msg = ex.what(); LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]", uri, attempts + 1, max_retries); // Wait a random amount of time and try again std::mt19937 g(rd()); // seed the random generator std::uniform_int_distribution<> distr( 50, 50 * (attempts + 2)); // define the range std::this_thread::sleep_for(std::chrono::milliseconds(distr(g))); continue; } } while(++attempts < max_retries); throw std::runtime_error( fmt::format("Endpoint for address '{}' could not be found ({})", uri, error_msg)); } /** * extracts protocol from a given URI generated by the RPC server of the daemon * @param uri * @throws std::runtime_error */ void extract_protocol(const string& uri) { if(uri.rfind("://") == string::npos) { // invalid format. kill client throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri)); } string protocol{}; for(const auto& valid_protocol : gkfs::rpc::protocol::all_remote_protocols) { if(uri.find(valid_protocol) != string::npos) { protocol = valid_protocol; break; } } // check for shared memory protocol. Can be plain shared memory or real // ofi protocol + auto_sm if(uri.find(gkfs::rpc::protocol::na_sm) != string::npos) { if(protocol.empty()) protocol = gkfs::rpc::protocol::na_sm; else CTX->auto_sm(true); } if(protocol.empty()) { // unsupported protocol. kill client throw runtime_error(fmt::format( "Unsupported RPC protocol found in hosts file with URI: '{}'", uri)); } LOG(INFO, "RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'", protocol, CTX->auto_sm()); CTX->rpc_protocol(protocol); } /** * Reads the daemon generator hosts file by a given path, returning hosts and * URI addresses * @param path to hosts file * @return vector<pair<hosts, URI>> * @throws std::runtime_error */ vector<pair<string, string>> load_hostfile(const std::string& path) { LOG(DEBUG, "Loading hosts file: \"{}\"", path); ifstream lf(path); if(!lf) { throw runtime_error(fmt::format("Failed to open hosts file '{}': {}", path, strerror(errno))); } vector<pair<string, string>> hosts; const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$", regex::ECMAScript | regex::optimize); string line; string host; string uri; std::smatch match; while(getline(lf, line)) { // if line starts with #, it indicates the end of current FS instance // Further hosts are not part of the file system instance yet and are // therefore skipped The hostfile is ordered, so nothgin below this line // can contain valid hosts if(line.find(gkfs::client::hostsfile_end_str) != string::npos) break; if(!regex_match(line, match, line_re)) { LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", path, line); throw runtime_error( fmt::format("unrecognized line format: '{}'", line)); } host = match[1]; uri = match[2]; hosts.emplace_back(host, uri); } if(hosts.empty()) { throw runtime_error( "Hosts file found but no suitable addresses could be extracted"); } extract_protocol(hosts[0].second); // sort hosts so that data always hashes to the same place during restart std::sort(hosts.begin(), hosts.end()); // remove rootdir suffix from host after sorting as no longer required for(auto& h : hosts) { auto idx = h.first.rfind("#"); if(idx != string::npos) h.first.erase(idx, h.first.length()); } return hosts; } } // namespace namespace gkfs::utils { /** * Retrieve metadata from daemon and return Metadata object * errno may be set * @param path * @param follow_links * @return Metadata */ optional<gkfs::metadata::Metadata> get_metadata(const string& path, bool follow_links) { std::string attr; int err{}; // Use file metadata from dentry cache if available if(CTX->use_dentry_cache()) { // get parent and filename path to retrieve the cache entry std::filesystem::path p(path); auto parent = p.parent_path().string(); auto filename = p.filename().string(); auto cache_entry = CTX->dentry_cache()->get(parent, filename); if(cache_entry) { LOG(DEBUG, "{}(): Dentry cache hit for file '{}'", __func__, path); // if cache_entry exists, generate a Metadata object from it. mode_t mode = gkfs::config::syscall::stat::file_mode_default; if(cache_entry->file_type == gkfs::filemap::FileType::directory) { mode = gkfs::config::syscall::stat::dir_mode_default; } gkfs::metadata::Metadata md{}; md.mode(mode); md.ctime(cache_entry->ctime); md.size(cache_entry->size); return md; } } if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) { err = gkfs::rpc::forward_stat_proxy(path, attr); } else { err = gkfs::rpc::forward_stat(path, attr, 0); // TODO: retry on failure if(err) { auto copy = 1; while(copy < CTX->get_replicas() + 1 && err) { LOG(ERROR, "Retrying Stat on replica {} {}", copy, follow_links); err = gkfs::rpc::forward_stat(path, attr, copy); copy++; } } } if(err) { errno = err; return {}; } #ifdef HAS_SYMLINKS if(follow_links) { gkfs::metadata::Metadata md{attr}; while(md.is_link()) { if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) { err = gkfs::rpc::forward_stat_proxy(md.target_path(), attr); } else { err = gkfs::rpc::forward_stat(md.target_path(), attr, 0); } if(err) { errno = err; return {}; } md = gkfs::metadata::Metadata{attr}; } } #endif return gkfs::metadata::Metadata{attr}; } /** * Converts the Metadata object into a stat struct, which is needed by Linux * @param path * @param md * @param attr * @return */ int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, struct stat& attr) { /* Populate default values */ attr.st_dev = makedev(0, 0); attr.st_ino = std::hash<std::string>{}(path); attr.st_nlink = 1; attr.st_uid = CTX->fs_conf()->uid; attr.st_gid = CTX->fs_conf()->gid; attr.st_rdev = 0; attr.st_blksize = gkfs::config::rpc::chunksize; attr.st_blocks = 0; memset(&attr.st_atim, 0, sizeof(timespec)); memset(&attr.st_mtim, 0, sizeof(timespec)); memset(&attr.st_ctim, 0, sizeof(timespec)); attr.st_mode = md.mode(); #ifdef HAS_SYMLINKS if(md.is_link()) attr.st_size = md.target_path().size() + CTX->mountdir().size(); else #endif attr.st_size = md.size(); if(CTX->fs_conf()->atime_state) { attr.st_atim.tv_sec = md.atime(); } if(CTX->fs_conf()->mtime_state) { attr.st_mtim.tv_sec = md.mtime(); } if(CTX->fs_conf()->ctime_state) { attr.st_ctim.tv_sec = md.ctime(); } if(CTX->fs_conf()->link_cnt_state) { attr.st_nlink = md.link_count(); } if(CTX->fs_conf()->blocks_state) { attr.st_blocks = md.blocks(); } else { attr.st_blocks = md.size() / gkfs::config::syscall::stat::st_nblocksize; } return 0; } pair<int, off64_t> update_file_size(const std::string& path, size_t count, off64_t offset, bool is_append) { LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'", __func__, path, count, offset, is_append); pair<int, long> ret_offset; auto num_replicas = CTX->get_replicas(); if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) { ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy( path, count, offset, is_append); } else { ret_offset = gkfs::rpc::forward_update_metadentry_size( path, count, offset, is_append, num_replicas); } return ret_offset; } map<string, uint64_t> load_forwarding_map_file(const std::string& lfpath) { LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath); ifstream lf(lfpath); if(!lf) { throw runtime_error( fmt::format("Failed to open forwarding map file '{}': {}", lfpath, strerror(errno))); } map<string, uint64_t> forwarding_map; const regex line_re("^(\\S+)\\s+(\\S+)$", regex::ECMAScript | regex::optimize); string line; string host; uint64_t forwarder; std::smatch match; while(getline(lf, line)) { if(!regex_match(line, match, line_re)) { LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']", lfpath, line); throw runtime_error( fmt::format("unrecognized line format: '{}'", line)); } host = match[1]; forwarder = std::stoi(match[2].str()); forwarding_map[host] = forwarder; } return forwarding_map; } void load_forwarding_map() { string forwarding_map_file; forwarding_map_file = gkfs::env::get_var( gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path); map<string, uint64_t> forwarding_map; while(forwarding_map.size() == 0) { try { forwarding_map = load_forwarding_map_file(forwarding_map_file); } catch(const exception& e) { auto emsg = fmt::format("Failed to load forwarding map file: {}", e.what()); throw runtime_error(emsg); } } // if (forwarding_map.size() == 0) { // throw runtime_error(fmt::format("Forwarding map file is empty: '{}'", // forwarding_map_file)); //} auto local_hostname = gkfs::rpc::get_my_hostname(true); if(forwarding_map.find(local_hostname) == forwarding_map.end()) { throw runtime_error( fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname)); } LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname, forwarding_map[local_hostname]); CTX->fwd_host_id(forwarding_map[local_hostname]); } vector<pair<string, string>> read_hosts_file() { string hostfile; hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path); vector<pair<string, string>> hosts; try { hosts = load_hostfile(hostfile); } catch(const exception& e) { auto emsg = fmt::format("Failed to load hosts file: {}", e.what()); throw runtime_error(emsg); } if(hosts.empty()) { throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile)); } LOG(INFO, "Hosts pool size: {}", hosts.size()); return hosts; } /** * Connects to daemons and lookup Mercury URI addresses via Hermes * @param hosts vector<pair<hostname, Mercury URI address>> * @throws std::runtime_error through lookup_endpoint() */ void connect_to_hosts(const vector<pair<string, string>>& hosts) { auto local_hostname = gkfs::rpc::get_my_hostname(true); bool local_host_found = false; std::vector<hermes::endpoint> addrs; addrs.resize(hosts.size()); vector<uint64_t> host_ids(hosts.size()); // populate vector with [0, ..., host_size - 1] ::iota(::begin(host_ids), ::end(host_ids), 0); /* * Shuffle hosts to balance addr lookups to all hosts * Too many concurrent lookups send to same host * could overwhelm the server, * returning error when addr lookup */ ::random_device rd; // obtain a random number from hardware ::mt19937 g(rd()); // seed the random generator ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector // lookup addresses and put abstract server addresses into rpc_addresses for(const auto& id : host_ids) { const auto& hostname = hosts.at(id).first; const auto& uri = hosts.at(id).second; addrs[id] = lookup_endpoint(uri); if(!local_host_found && hostname == local_hostname) { LOG(DEBUG, "Found local host: {}", hostname); CTX->local_host_id(id); local_host_found = true; } LOG(DEBUG, "Found peer: {}", addrs[id].to_string()); } if(!local_host_found) { LOG(WARNING, "Failed to find local host. Using host '0' as local host"); CTX->local_host_id(0); } CTX->hosts(addrs); } /** * Looks for a proxy pid file. If it exists, we set address string in preload * context. */ void check_for_proxy() { auto pid_path = gkfs::env::get_var(gkfs::env::PROXY_PID_FILE, gkfs::config::proxy::pid_path); ifstream ifs(pid_path, ::ifstream::in); if(!ifs) { LOG(INFO, "Proxy pid file NOT FOUND. Proxy will NOT be used!"); return; } /* * read two lines in pid file * 1. line: process id (used to check for process existence) * 2. line: na_sm address to connect to (which will be returned) */ if(ifs) { // get PID string running_pid; if(getline(ifs, running_pid) && !running_pid.empty()) { // check if process exists without killing it. Signal 0 doesn't // kill if(0 != ::kill(::stoi(running_pid), 0)) { LOG(WARNING, "Proxy pid file '{}' found but process with pid '{}' was not found. Will NOT use proxy", pid_path, running_pid); return; } } else { LOG(WARNING, "Proxy pid file '{}' first line is empty. Will NOT use proxy", pid_path); return; } // get proxy address string proxy_address{}; if(getline(ifs, proxy_address) && !proxy_address.empty()) { CTX->proxy_address_str(proxy_address); } else { LOG(WARNING, "Proxy pid file '{}' second line is empty. Will NOT use proxy", pid_path); return; } } else { LOG(WARNING, "Proxy pid file '{}' was found but cannot be opened. Will NOT use proxy.", pid_path); return; } LOG(INFO, "Proxy is enabled and will be used!"); CTX->use_proxy(true); } /** * Lookup proxy address via hermes RPC client * @throws runtime_error */ void lookup_proxy_addr() { auto addr = lookup_endpoint(CTX->proxy_address_str(), true); LOG(DEBUG, "Found proxy peer: {}", addr.to_string()); CTX->proxy_host(addr); } } // namespace gkfs::utils