Newer
Older
Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS' POSIX interface.
GekkoFS' POSIX interface is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GekkoFS' POSIX interface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GekkoFS' POSIX interface. If not, see
<https://www.gnu.org/licenses/>.
SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include <client/env.hpp>
#include <client/logging.hpp>
#include <client/rpc/forward_metadata.hpp>
#include <client/cache.hpp>
#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_util.hpp>
#include <common/env_util.hpp>
#include <fstream>
#include <csignal>
#include <filesystem>
extern "C" {
using namespace std;
/**
* Looks up a host endpoint via Hermes
* @param uri
* @param max_retries
* @return hermes endpoint, if successful
* @throws std::runtime_error
*/
lookup_endpoint(const std::string& uri, bool use_proxy = false,
std::size_t max_retries = 3) {
LOG(DEBUG, "Looking up address \"{}\"", uri);
std::random_device rd; // obtain a random number from hardware
std::size_t attempts = 0;
std::string error_msg;
do {
try {
if(use_proxy)
return ld_proxy_service->lookup(uri);
else
return ld_network_service->lookup(uri);
} catch(const exception& ex) {
error_msg = ex.what();
LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]", uri,
attempts + 1, max_retries);
// Wait a random amount of time and try again
std::mt19937 g(rd()); // seed the random generator
std::uniform_int_distribution<> distr(
50, 50 * (attempts + 2)); // define the range
std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
continue;
}
} while(++attempts < max_retries);
throw std::runtime_error(
fmt::format("Endpoint for address '{}' could not be found ({})",
uri, error_msg));
}
/**
* extracts protocol from a given URI generated by the RPC server of the daemon
* @param uri
* @throws std::runtime_error
*/
void
extract_protocol(const string& uri) {
if(uri.rfind("://") == string::npos) {
// invalid format. kill client
throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri));
}
string protocol{};
for(const auto& valid_protocol :
gkfs::rpc::protocol::all_remote_protocols) {
if(uri.find(valid_protocol) != string::npos) {
protocol = valid_protocol;
break;
}
// check for shared memory protocol. Can be plain shared memory or real
// ofi protocol + auto_sm
if(uri.find(gkfs::rpc::protocol::na_sm) != string::npos) {
if(protocol.empty())
protocol = gkfs::rpc::protocol::na_sm;
else
CTX->auto_sm(true);
}
// unsupported protocol. kill client
throw runtime_error(fmt::format(
"Unsupported RPC protocol found in hosts file with URI: '{}'",
uri));
LOG(INFO,
"RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'",
protocol, CTX->auto_sm());
CTX->rpc_protocol(protocol);
}
/**
* Reads the daemon generator hosts file by a given path, returning hosts and
* URI addresses
* @param path to hosts file
* @return vector<pair<hosts, URI>>
* @throws std::runtime_error
*/
vector<pair<string, string>>
load_hostfile(const std::string& path) {
LOG(DEBUG, "Loading hosts file: \"{}\"", path);
ifstream lf(path);
throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
path, strerror(errno)));
}
vector<pair<string, string>> hosts;
regex::ECMAScript | regex::optimize);
string line;
string host;
string uri;
std::smatch match;
while(getline(lf, line)) {
// if line starts with #, it indicates the end of current FS instance
// Further hosts are not part of the file system instance yet and are
// therefore skipped The hostfile is ordered, so nothgin below this line
// can contain valid hosts
if(line.find(gkfs::client::hostsfile_end_str) != string::npos)
break;
if(!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
path, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
}
host = match[1];
uri = match[2];
hosts.emplace_back(host, uri);
}
if(hosts.empty()) {
throw runtime_error(
"Hosts file found but no suitable addresses could be extracted");
}
extract_protocol(hosts[0].second);
// sort hosts so that data always hashes to the same place during restart
std::sort(hosts.begin(), hosts.end());
// remove rootdir suffix from host after sorting as no longer required
for(auto& h : hosts) {
auto idx = h.first.rfind("#");
if(idx != string::npos)
h.first.erase(idx, h.first.length());
}
return hosts;
}
* Retrieve metadata from daemon and return Metadata object
* errno may be set
* @param path
* @param follow_links
* @return Metadata
optional<gkfs::metadata::Metadata>
get_metadata(const string& path, bool follow_links) {
// Use file metadata from dentry cache if available
if(CTX->use_dentry_cache()) {
// get parent and filename path to retrieve the cache entry
std::filesystem::path p(path);
auto parent = p.parent_path().string();
auto filename = p.filename().string();
auto cache_entry = CTX->dentry_cache()->get(parent, filename);
if(cache_entry) {
LOG(DEBUG, "{}(): Dentry cache hit for file '{}'", __func__, path);
// if cache_entry exists, generate a Metadata object from it.
mode_t mode = gkfs::config::syscall::stat::file_mode_default;
if(cache_entry->file_type == gkfs::filemap::FileType::directory) {
mode = gkfs::config::syscall::stat::dir_mode_default;
}
gkfs::metadata::Metadata md{};
md.mode(mode);
md.ctime(cache_entry->ctime);
md.size(cache_entry->size);
return md;
}
}
if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
err = gkfs::rpc::forward_stat_proxy(path, attr);
} else {
err = gkfs::rpc::forward_stat(path, attr, 0);
// TODO: retry on failure
auto copy = 1;
while(copy < CTX->get_replicas() + 1 && err) {
LOG(ERROR, "Retrying Stat on replica {} {}", copy,
follow_links);
err = gkfs::rpc::forward_stat(path, attr, copy);
copy++;
}
#ifdef HAS_SYMLINKS
gkfs::metadata::Metadata md{attr};
if(gkfs::config::proxy::fwd_stat && CTX->use_proxy()) {
err = gkfs::rpc::forward_stat_proxy(md.target_path(), attr);
} else {
err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
}
}
md = gkfs::metadata::Metadata{attr};
}
}
#endif
return gkfs::metadata::Metadata{attr};
* Converts the Metadata object into a stat struct, which is needed by Linux
* @param attr
* @return
*/
int
metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
struct stat& attr) {
attr.st_ino = std::hash<std::string>{}(path);
attr.st_nlink = 1;
attr.st_uid = CTX->fs_conf()->uid;
attr.st_gid = CTX->fs_conf()->gid;
attr.st_rdev = 0;
attr.st_blksize = gkfs::config::rpc::chunksize;
attr.st_blocks = 0;
memset(&attr.st_atim, 0, sizeof(timespec));
memset(&attr.st_mtim, 0, sizeof(timespec));
memset(&attr.st_ctim, 0, sizeof(timespec));
attr.st_size = md.target_path().size() + CTX->mountdir().size();
else
#endif
attr.st_size = md.size();
if(CTX->fs_conf()->atime_state) {
attr.st_atim.tv_sec = md.atime();
if(CTX->fs_conf()->mtime_state) {
attr.st_mtim.tv_sec = md.mtime();
if(CTX->fs_conf()->ctime_state) {
attr.st_ctim.tv_sec = md.ctime();
if(CTX->fs_conf()->link_cnt_state) {
attr.st_nlink = md.link_count();

Julius Athenstaedt
committed
} else {
attr.st_blocks = md.size() / gkfs::config::syscall::stat::st_nblocksize;
}
return 0;
pair<int, off64_t>
update_file_size(const std::string& path, size_t count, off64_t offset,
bool is_append) {
LOG(DEBUG, "{}() path: '{}', count: '{}', offset: '{}', is_append: '{}'",
__func__, path, count, offset, is_append);
pair<int, long> ret_offset;
auto num_replicas = CTX->get_replicas();
if(gkfs::config::proxy::fwd_update_size && CTX->use_proxy()) {
ret_offset = gkfs::rpc::forward_update_metadentry_size_proxy(
path, count, offset, is_append);
} else {
ret_offset = gkfs::rpc::forward_update_metadentry_size(
path, count, offset, is_append, num_replicas);
}
return ret_offset;
}
map<string, uint64_t>
load_forwarding_map_file(const std::string& lfpath) {
LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath);
ifstream lf(lfpath);
if(!lf) {
throw runtime_error(
fmt::format("Failed to open forwarding map file '{}': {}",
lfpath, strerror(errno)));
}
map<string, uint64_t> forwarding_map;
const regex line_re("^(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize);
string line;
string host;
uint64_t forwarder;
std::smatch match;
while(getline(lf, line)) {
if(!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
lfpath, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
}
host = match[1];
forwarder = std::stoi(match[2].str());
forwarding_map[host] = forwarder;
}
return forwarding_map;
}
void
load_forwarding_map() {
forwarding_map_file = gkfs::env::get_var(
gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
map<string, uint64_t> forwarding_map;
while(forwarding_map.size() == 0) {
try {
forwarding_map = load_forwarding_map_file(forwarding_map_file);
} catch(const exception& e) {
auto emsg = fmt::format("Failed to load forwarding map file: {}",
e.what());
throw runtime_error(emsg);
}
}
// if (forwarding_map.size() == 0) {
// throw runtime_error(fmt::format("Forwarding map file is empty: '{}'",
// forwarding_map_file));
auto local_hostname = gkfs::rpc::get_my_hostname(true);
if(forwarding_map.find(local_hostname) == forwarding_map.end()) {
throw runtime_error(
fmt::format("Unable to determine the forwarder for host: '{}'",
local_hostname));
LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname,
forwarding_map[local_hostname]);
CTX->fwd_host_id(forwarding_map[local_hostname]);
}
vector<pair<string, string>>
read_hosts_file() {
string hostfile;
hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
gkfs::config::hostfile_path);
vector<pair<string, string>> hosts;
try {
hosts = load_hostfile(hostfile);
} catch(const exception& e) {
auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
throw runtime_error(emsg);
throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
LOG(INFO, "Hosts pool size: {}", hosts.size());
return hosts;
}
/**
* Connects to daemons and lookup Mercury URI addresses via Hermes
* @param hosts vector<pair<hostname, Mercury URI address>>
* @throws std::runtime_error through lookup_endpoint()
*/
void
connect_to_hosts(const vector<pair<string, string>>& hosts) {
auto local_hostname = gkfs::rpc::get_my_hostname(true);
addrs.resize(hosts.size());
vector<uint64_t> host_ids(hosts.size());
// populate vector with [0, ..., host_size - 1]
::iota(::begin(host_ids), ::end(host_ids), 0);
* Shuffle hosts to balance addr lookups to all hosts
* Too many concurrent lookups send to same host
* could overwhelm the server,
* returning error when addr lookup
::random_device rd; // obtain a random number from hardware
::mt19937 g(rd()); // seed the random generator
::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
// lookup addresses and put abstract server addresses into rpc_addresses
for(const auto& id : host_ids) {
const auto& hostname = hosts.at(id).first;
const auto& uri = hosts.at(id).second;
addrs[id] = lookup_endpoint(uri);
if(!local_host_found && hostname == local_hostname) {
LOG(DEBUG, "Found local host: {}", hostname);
CTX->local_host_id(id);
local_host_found = true;
LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
LOG(WARNING, "Failed to find local host. Using host '0' as local host");
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
/**
* Looks for a proxy pid file. If it exists, we set address string in preload
* context.
*/
void
check_for_proxy() {
auto pid_path = gkfs::env::get_var(gkfs::env::PROXY_PID_FILE,
gkfs::config::proxy::pid_path);
ifstream ifs(pid_path, ::ifstream::in);
if(!ifs) {
LOG(INFO, "Proxy pid file NOT FOUND. Proxy will NOT be used!");
return;
}
/*
* read two lines in pid file
* 1. line: process id (used to check for process existence)
* 2. line: na_sm address to connect to (which will be returned)
*/
if(ifs) {
// get PID
string running_pid;
if(getline(ifs, running_pid) && !running_pid.empty()) {
// check if process exists without killing it. Signal 0 doesn't
// kill
if(0 != ::kill(::stoi(running_pid), 0)) {
LOG(WARNING,
"Proxy pid file '{}' found but process with pid '{}' was not found. Will NOT use proxy",
pid_path, running_pid);
return;
}
} else {
LOG(WARNING,
"Proxy pid file '{}' first line is empty. Will NOT use proxy",
pid_path);
return;
}
// get proxy address
string proxy_address{};
if(getline(ifs, proxy_address) && !proxy_address.empty()) {
CTX->proxy_address_str(proxy_address);
} else {
LOG(WARNING,
"Proxy pid file '{}' second line is empty. Will NOT use proxy",
pid_path);
return;
}
} else {
LOG(WARNING,
"Proxy pid file '{}' was found but cannot be opened. Will NOT use proxy.",
pid_path);
return;
}
LOG(INFO, "Proxy is enabled and will be used!");
CTX->use_proxy(true);
}
/**
* Lookup proxy address via hermes RPC client
* @throws runtime_error
*/
void
lookup_proxy_addr() {
auto addr = lookup_endpoint(CTX->proxy_address_str(), true);
LOG(DEBUG, "Found proxy peer: {}", addr.to_string());
CTX->proxy_host(addr);
}