Program Listing for File preload_util.cpp
↰ Return to documentation for file (src/client/preload_util.cpp)
/*
Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This software was partially supported by the
the European Union’s Horizon 2020 JTI-EuroHPC research and
innovation programme, by the project ADMIRE (Project ID: 956748,
admire-eurohpc.eu)
This project was partially promoted by the Ministry for Digital Transformation
and the Civil Service, within the framework of the Recovery,
Transformation and Resilience Plan - Funded by the European Union
-NextGenerationEU.
This file is part of GekkoFS' POSIX interface.
GekkoFS' POSIX interface is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GekkoFS' POSIX interface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GekkoFS' POSIX interface. If not, see
<https://www.gnu.org/licenses/>.
SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include <client/preload_util.hpp>
#include <client/env.hpp>
#include <client/logging.hpp>
#include <client/rpc/forward_metadata.hpp>
#include <client/rpc/forward_metadata_proxy.hpp>
#include <client/cache.hpp>
#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_util.hpp>
#include <common/env_util.hpp>
#include <common/common_defs.hpp>
#include <hermes.hpp>
#include <fstream>
#include <sstream>
#include <regex>
#include <csignal>
#include <random>
#include <filesystem>
extern "C" {
#include <sys/sysmacros.h>
}
using namespace std;
namespace {
hermes::endpoint
lookup_endpoint(const std::string& uri, bool use_proxy = false,
std::size_t max_retries = 3) {
LOG(DEBUG, "Looking up address \"{}\"", uri);
std::random_device rd; // obtain a random number from hardware
std::size_t attempts = 0;
std::string error_msg;
do {
try {
if(use_proxy)
return ld_proxy_service->lookup(uri);
else
return ld_network_service->lookup(uri);
} catch(const exception& ex) {
error_msg = ex.what();
LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]", uri,
attempts + 1, max_retries);
// Wait a random amount of time and try again
std::mt19937 g(rd()); // seed the random generator
std::uniform_int_distribution<> distr(
50, 50 * (attempts + 2)); // define the range
std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
continue;
}
} while(++attempts < max_retries);
throw std::runtime_error(
fmt::format("Endpoint for address '{}' could not be found ({})",
uri, error_msg));
}
void
extract_protocol(const string& uri) {
if(uri.rfind("://") == string::npos) {
// invalid format. kill client
throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri));
}
string protocol{};
for(const auto& valid_protocol :
gkfs::rpc::protocol::all_remote_protocols) {
if(uri.find(valid_protocol) != string::npos) {
protocol = valid_protocol;
break;
}
}
// check for shared memory protocol. Can be plain shared memory or real
// ofi protocol + auto_sm
if(uri.find(gkfs::rpc::protocol::na_sm) != string::npos) {
if(protocol.empty())
protocol = gkfs::rpc::protocol::na_sm;
else
CTX->auto_sm(true);
}
if(protocol.empty()) {
// unsupported protocol. kill client
throw runtime_error(fmt::format(
"Unsupported RPC protocol found in hosts file with URI: '{}'",
uri));
}
LOG(INFO,
"RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'",
protocol, CTX->auto_sm());
CTX->rpc_protocol(protocol);
}
vector<pair<string, string>>
load_hostfile(const std::string& path) {
LOG(DEBUG, "Loading hosts file: \"{}\"", path);
ifstream lf(path);
if(!lf) {
throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
path, strerror(errno)));
}
vector<pair<string, string>> hosts;
const regex line_re("^(\\S+)\\s+(\\S+)\\s*(\\S*)$",
regex::ECMAScript | regex::optimize);
string line;
string host;
string uri;
std::smatch match;
while(getline(lf, line)) {
// if line starts with #, it indicates the end of current FS instance
// Further hosts are not part of the file system instance yet and are
// therefore skipped The hostfile is ordered, so nothgin below this line
// can contain valid hosts
if(line.find(gkfs::client::hostsfile_end_str) != string::npos)
break;
if(!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
path, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
}
host = match[1];
uri = match[2];
hosts.emplace_back(host, uri);
}
if(hosts.empty()) {
throw runtime_error(
"Hosts file found but no suitable addresses could be extracted");
}
extract_protocol(hosts[0].second);
// sort hosts so that data always hashes to the same place during restart
std::sort(hosts.begin(), hosts.end());
// remove rootdir suffix from host after sorting as no longer required
for(auto& h : hosts) {
auto idx = h.first.rfind("#");
if(idx != string::npos)
h.first.erase(idx, h.first.length());
}
return hosts;
}
} // namespace
namespace gkfs::utils {
optional<gkfs::metadata::Metadata>
get_metadata(const string& path, bool follow_links) {
std::string attr;
int err{};
// Use file metadata from dentry cache if available
if(CTX->use_dentry_cache()) {
// get parent and filename path to retrieve the cache entry
std::filesystem::path p(path);
auto parent = p.parent_path().string();
auto filename = p.filename().string();
auto cache_entry = CTX->dentry_cache()->get(parent, filename);
if(cache_entry) {
LOG(DEBUG, "{}(): Dentry cache hit for file '{}'", __func__, path);
// if cache_entry exists, generate a Metadata object from it.
mode_t mode = gkfs::config::syscall::stat::file_mode_default;
if(cache_entry->file_type == gkfs::filemap::FileType::directory) {
mode = gkfs::config::syscall::stat::dir_mode_default;
}
gkfs::metadata::Metadata md{};
md.mode(mode);
md.ctime(cache_entry->ctime);
md.size(cache_entry->size);
return md;
}
}
if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
err = gkfs::rpc::forward_stat_proxy(path, attr);
} else {
err = gkfs::rpc::forward_stat(path, attr, 0);
// TODO: retry on failure
if(err) {
auto copy = 1;
while(copy < CTX->get_replicas() + 1 && err) {
LOG(ERROR, "Retrying Stat on replica {} {}", copy,
follow_links);
err = gkfs::rpc::forward_stat(path, attr, copy);
copy++;
}
}
}
if(err) {
errno = err;
return {};
}
#ifdef HAS_SYMLINKS
if(follow_links) {
gkfs::metadata::Metadata md{attr};
while(md.is_link()) {
if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
err = gkfs::rpc::forward_stat_proxy(md.target_path(), attr);
} else {
err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
}
if(err) {
errno = err;
return {};
}
md = gkfs::metadata::Metadata{attr};
}
}
#endif
return gkfs::metadata::Metadata{attr};
}
int
metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
struct stat& attr) {
/* Populate default values */
attr.st_dev = makedev(0, 0);
attr.st_ino = std::hash<std::string>{}(path);
attr.st_nlink = 1;
attr.st_uid = CTX->fs_conf()->uid;
attr.st_gid = CTX->fs_conf()->gid;
attr.st_rdev = 0;
attr.st_blksize = gkfs::config::rpc::chunksize;
attr.st_blocks = 0;
memset(&attr.st_atim, 0, sizeof(timespec));
memset(&attr.st_mtim, 0, sizeof(timespec));
memset(&attr.st_ctim, 0, sizeof(timespec));
attr.st_mode = md.mode();
#ifdef HAS_SYMLINKS
if(md.is_link())
attr.st_size = md.target_path().size() + CTX->mountdir().size();
else
#endif
attr.st_size = md.size();
if(CTX->fs_conf()->atime_state) {
attr.st_atim.tv_sec = md.atime();
}
if(CTX->fs_conf()->mtime_state) {
attr.st_mtim.tv_sec = md.mtime();
}
if(CTX->fs_conf()->ctime_state) {
attr.st_ctim.tv_sec = md.ctime();
}
if(CTX->fs_conf()->link_cnt_state) {
attr.st_nlink = md.link_count();
}
if(CTX->fs_conf()->blocks_state) {
attr.st_blocks = md.blocks();
} else {
attr.st_blocks = md.size() / gkfs::config::syscall::stat::st_nblocksize;
}
return 0;
}
pair<int, off64_t>
update_file_size(const std::string& path, size_t count, off64_t offset,
bool is_append) {
LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'",
__func__, path, count, offset, is_append);
pair<int, long> ret_offset;
auto num_replicas = CTX->get_replicas();
if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) {
ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy(
path, count, offset, is_append);
} else {
ret_offset = gkfs::rpc::forward_update_metadentry_size(
path, count, offset, is_append, num_replicas);
}
return ret_offset;
}
map<string, uint64_t>
load_forwarding_map_file(const std::string& lfpath) {
LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath);
ifstream lf(lfpath);
if(!lf) {
throw runtime_error(
fmt::format("Failed to open forwarding map file '{}': {}",
lfpath, strerror(errno)));
}
map<string, uint64_t> forwarding_map;
const regex line_re("^(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize);
string line;
string host;
uint64_t forwarder;
std::smatch match;
while(getline(lf, line)) {
if(!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
lfpath, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
}
host = match[1];
forwarder = std::stoi(match[2].str());
forwarding_map[host] = forwarder;
}
return forwarding_map;
}
void
load_forwarding_map() {
string forwarding_map_file;
forwarding_map_file = gkfs::env::get_var(
gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
map<string, uint64_t> forwarding_map;
while(forwarding_map.size() == 0) {
try {
forwarding_map = load_forwarding_map_file(forwarding_map_file);
} catch(const exception& e) {
auto emsg = fmt::format("Failed to load forwarding map file: {}",
e.what());
throw runtime_error(emsg);
}
}
// if (forwarding_map.size() == 0) {
// throw runtime_error(fmt::format("Forwarding map file is empty: '{}'",
// forwarding_map_file));
//}
auto local_hostname = gkfs::rpc::get_my_hostname(true);
if(forwarding_map.find(local_hostname) == forwarding_map.end()) {
throw runtime_error(
fmt::format("Unable to determine the forwarder for host: '{}'",
local_hostname));
}
LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname,
forwarding_map[local_hostname]);
CTX->fwd_host_id(forwarding_map[local_hostname]);
}
vector<pair<string, string>>
read_hosts_file() {
string hostfile;
hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
gkfs::config::hostfile_path);
vector<pair<string, string>> hosts;
try {
hosts = load_hostfile(hostfile);
} catch(const exception& e) {
auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
throw runtime_error(emsg);
}
if(hosts.empty()) {
throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
}
LOG(INFO, "Hosts pool size: {}", hosts.size());
return hosts;
}
void
connect_to_hosts(const vector<pair<string, string>>& hosts) {
auto local_hostname = gkfs::rpc::get_my_hostname(true);
bool local_host_found = false;
std::vector<hermes::endpoint> addrs;
addrs.resize(hosts.size());
vector<uint64_t> host_ids(hosts.size());
// populate vector with [0, ..., host_size - 1]
::iota(::begin(host_ids), ::end(host_ids), 0);
/*
* Shuffle hosts to balance addr lookups to all hosts
* Too many concurrent lookups send to same host
* could overwhelm the server,
* returning error when addr lookup
*/
::random_device rd; // obtain a random number from hardware
::mt19937 g(rd()); // seed the random generator
::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
// lookup addresses and put abstract server addresses into rpc_addresses
for(const auto& id : host_ids) {
const auto& hostname = hosts.at(id).first;
const auto& uri = hosts.at(id).second;
addrs[id] = lookup_endpoint(uri);
if(!local_host_found && hostname == local_hostname) {
LOG(DEBUG, "Found local host: {}", hostname);
CTX->local_host_id(id);
local_host_found = true;
}
LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
}
if(!local_host_found) {
LOG(WARNING, "Failed to find local host. Using host '0' as local host");
CTX->local_host_id(0);
}
CTX->hosts(addrs);
}
void
check_for_proxy() {
auto pid_path = gkfs::env::get_var(gkfs::env::PROXY_PID_FILE,
gkfs::config::proxy::pid_path);
ifstream ifs(pid_path, ::ifstream::in);
if(!ifs) {
LOG(INFO, "Proxy pid file NOT FOUND. Proxy will NOT be used!");
return;
}
/*
* read two lines in pid file
* 1. line: process id (used to check for process existence)
* 2. line: na_sm address to connect to (which will be returned)
*/
if(ifs) {
// get PID
string running_pid;
if(getline(ifs, running_pid) && !running_pid.empty()) {
// check if process exists without killing it. Signal 0 doesn't
// kill
if(0 != ::kill(::stoi(running_pid), 0)) {
LOG(WARNING,
"Proxy pid file '{}' found but process with pid '{}' was not found. Will NOT use proxy",
pid_path, running_pid);
return;
}
} else {
LOG(WARNING,
"Proxy pid file '{}' first line is empty. Will NOT use proxy",
pid_path);
return;
}
// get proxy address
string proxy_address{};
if(getline(ifs, proxy_address) && !proxy_address.empty()) {
CTX->proxy_address_str(proxy_address);
} else {
LOG(WARNING,
"Proxy pid file '{}' second line is empty. Will NOT use proxy",
pid_path);
return;
}
} else {
LOG(WARNING,
"Proxy pid file '{}' was found but cannot be opened. Will NOT use proxy.",
pid_path);
return;
}
LOG(INFO, "Proxy is enabled and will be used!");
CTX->use_proxy(true);
}
void
lookup_proxy_addr() {
auto addr = lookup_endpoint(CTX->proxy_address_str(), true);
LOG(DEBUG, "Found proxy peer: {}", addr.to_string());
CTX->proxy_host(addr);
}
} // namespace gkfs::utils