Newer
Older
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#include <client/env.hpp>
#include <client/logging.hpp>
#include <client/rpc/forward_metadata.hpp>
#include <global/rpc/distributor.hpp>
#include <global/rpc/rpc_util.hpp>
#include <fstream>
#include <csignal>
extern "C" {
using namespace std;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
namespace {
hermes::endpoint lookup_endpoint(const std::string& uri,
std::size_t max_retries = 3) {
LOG(DEBUG, "Looking up address \"{}\"", uri);
std::random_device rd; // obtain a random number from hardware
std::size_t attempts = 0;
std::string error_msg;
do {
try {
return ld_network_service->lookup(uri);
} catch (const exception& ex) {
error_msg = ex.what();
LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]",
uri, attempts + 1, max_retries);
// Wait a random amount of time and try again
std::mt19937 g(rd()); // seed the random generator
std::uniform_int_distribution<> distr(50, 50 * (attempts + 2)); // define the range
std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
continue;
}
} while (++attempts < max_retries);
throw std::runtime_error(
fmt::format("Endpoint for address '{}' could not be found ({})",
uri, error_msg));
}
} // namespace
namespace gkfs {
namespace util {
std::shared_ptr<gkfs::metadata::Metadata> get_metadata(const string& path, bool follow_links) {
std::string attr;
auto err = gkfs::rpc::forward_stat(path, attr);
if (err) {
return nullptr;
}
#ifdef HAS_SYMLINKS
if (follow_links) {
gkfs::metadata::Metadata md{attr};
while (md.is_link()) {
err = gkfs::rpc::forward_stat(md.target_path(), attr);
if (err) {
return nullptr;
}
md = gkfs::metadata::Metadata{attr};
}
}
#endif
return make_shared<gkfs::metadata::Metadata>(attr);
}
* Converts the Metadata object into a stat struct, which is needed by Linux
* @param attr
* @return
*/
int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, struct stat& attr) {
attr.st_ino = std::hash<std::string>{}(path);
attr.st_nlink = 1;
attr.st_uid = CTX->fs_conf()->uid;
attr.st_gid = CTX->fs_conf()->gid;
attr.st_rdev = 0;
attr.st_blksize = gkfs::config::rpc::chunksize;
attr.st_blocks = 0;
memset(&attr.st_atim, 0, sizeof(timespec));
memset(&attr.st_mtim, 0, sizeof(timespec));
memset(&attr.st_ctim, 0, sizeof(timespec));
#ifdef HAS_SYMLINKS
if (md.is_link())
attr.st_size = md.target_path().size() + CTX->mountdir().size();
else
#endif
attr.st_size = md.size();
attr.st_atim.tv_sec = md.atime();
attr.st_mtim.tv_sec = md.mtime();
attr.st_ctim.tv_sec = md.ctime();
if (CTX->fs_conf()->link_cnt_state) {
attr.st_nlink = md.link_count();
if (CTX->fs_conf()->blocks_state) { // last one will not encounter a delimiter anymore
}
return 0;
vector<pair<string, string>> load_hostfile(const std::string& lfpath) {
LOG(DEBUG, "Loading hosts file: \"{}\"", lfpath);
throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
lfpath, strerror(errno)));
const regex line_re("^(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize);
string host;
string uri;
std::smatch match;
if (!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
lfpath, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
void load_hosts() {
string hostfile;
hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path);
vector<pair<string, string>> hosts;
try {
hosts = load_hostfile(hostfile);
} catch (const exception& e) {
auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
throw runtime_error(emsg);
if (hosts.empty()) {
throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
LOG(INFO, "Hosts pool size: {}", hosts.size());
auto local_hostname = get_my_hostname(true);
bool local_host_found = false;
addrs.resize(hosts.size());
vector<uint64_t> host_ids(hosts.size());
// populate vector with [0, ..., host_size - 1]
::iota(::begin(host_ids), ::end(host_ids), 0);
* Shuffle hosts to balance addr lookups to all hosts
* Too many concurrent lookups send to same host
* could overwhelm the server,
* returning error when addr lookup
::random_device rd; // obtain a random number from hardware
::mt19937 g(rd()); // seed the random generator
::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
// lookup addresses and put abstract server addresses into rpc_addresses
const auto& hostname = hosts.at(id).first;
const auto& uri = hosts.at(id).second;
addrs[id] = lookup_endpoint(uri);
if (!local_host_found && hostname == local_hostname) {
LOG(DEBUG, "Found local host: {}", hostname);
CTX->local_host_id(id);
local_host_found = true;
LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
LOG(WARNING, "Failed to find local host. Using host '0' as local host");
} // namespace util
} // namespace gkfs