Newer
Older
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#include <client/env.hpp>
#include <client/logging.hpp>
#include <client/rpc/forward_metadata.hpp>
#include <global/rpc/distributor.hpp>
#include <global/rpc/rpc_util.hpp>
#include <fstream>
#include <csignal>
extern "C" {
using namespace std;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
namespace {
hermes::endpoint lookup_endpoint(const std::string& uri,
std::size_t max_retries = 3) {
LOG(DEBUG, "Looking up address \"{}\"", uri);
std::random_device rd; // obtain a random number from hardware
std::size_t attempts = 0;
std::string error_msg;
do {
try {
return ld_network_service->lookup(uri);
} catch (const exception& ex) {
error_msg = ex.what();
LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]",
uri, attempts + 1, max_retries);
// Wait a random amount of time and try again
std::mt19937 g(rd()); // seed the random generator
std::uniform_int_distribution<> distr(50, 50 * (attempts + 2)); // define the range
std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
continue;
}
} while (++attempts < max_retries);
throw std::runtime_error(
fmt::format("Endpoint for address '{}' could not be found ({})",
uri, error_msg));
}
} // namespace
namespace gkfs {
namespace util {
/**
* Retrieve metadata from daemon
* errno may be set
* @param path
* @param follow_links
* @return shared_ptr for metadata, nullptr else
*/
std::shared_ptr<gkfs::metadata::Metadata> get_metadata(const string& path, bool follow_links) {
std::string attr;
auto err = gkfs::rpc::forward_stat(path, attr);
if (err) {
return nullptr;
}
#ifdef HAS_SYMLINKS
if (follow_links) {
gkfs::metadata::Metadata md{attr};
while (md.is_link()) {
err = gkfs::rpc::forward_stat(md.target_path(), attr);
if (err) {
return nullptr;
}
md = gkfs::metadata::Metadata{attr};
}
}
#endif
return make_shared<gkfs::metadata::Metadata>(attr);
}
* Converts the Metadata object into a stat struct, which is needed by Linux
* @param attr
* @return
*/
int metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md, struct stat& attr) {
attr.st_ino = std::hash<std::string>{}(path);
attr.st_nlink = 1;
attr.st_uid = CTX->fs_conf()->uid;
attr.st_gid = CTX->fs_conf()->gid;
attr.st_rdev = 0;
attr.st_blksize = gkfs::config::rpc::chunksize;
attr.st_blocks = 0;
memset(&attr.st_atim, 0, sizeof(timespec));
memset(&attr.st_mtim, 0, sizeof(timespec));
memset(&attr.st_ctim, 0, sizeof(timespec));
#ifdef HAS_SYMLINKS
if (md.is_link())
attr.st_size = md.target_path().size() + CTX->mountdir().size();
else
#endif
attr.st_size = md.size();
attr.st_atim.tv_sec = md.atime();
attr.st_mtim.tv_sec = md.mtime();
attr.st_ctim.tv_sec = md.ctime();
if (CTX->fs_conf()->link_cnt_state) {
attr.st_nlink = md.link_count();
if (CTX->fs_conf()->blocks_state) { // last one will not encounter a delimiter anymore
}
return 0;
vector<pair<string, string>> load_hostfile(const std::string& lfpath) {
LOG(DEBUG, "Loading hosts file: \"{}\"", lfpath);
throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
lfpath, strerror(errno)));
const regex line_re("^(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize);
string host;
string uri;
std::smatch match;
if (!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
lfpath, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#ifdef GKFS_ENABLE_FORWARDING
map<string, uint64_t> load_forwarding_map_file(const std::string& lfpath) {
LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath);
ifstream lf(lfpath);
if (!lf) {
throw runtime_error(fmt::format("Failed to open forwarding map file '{}': {}",
lfpath, strerror(errno)));
}
map<string, uint64_t> forwarding_map;
const regex line_re("^(\\S+)\\s+(\\S+)$",
regex::ECMAScript | regex::optimize);
string line;
string host;
uint64_t forwarder;
std::smatch match;
while (getline(lf, line)) {
if (!regex_match(line, match, line_re)) {
LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
lfpath, line);
throw runtime_error(
fmt::format("unrecognized line format: '{}'", line));
}
host = match[1];
forwarder = std::stoi(match[2].str());
forwarding_map[host] = forwarder;
}
return forwarding_map;
}
#endif
#ifdef GKFS_ENABLE_FORWARDING
void load_forwarding_map() {
string forwarding_map_file;
forwarding_map_file = gkfs::env::get_var(gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
map<string, uint64_t> forwarding_map;
while (forwarding_map.size() == 0) {
try {
forwarding_map = load_forwarding_map_file(forwarding_map_file);
} catch (const exception& e) {
auto emsg = fmt::format("Failed to load forwarding map file: {}", e.what());
throw runtime_error(emsg);
}
}
//if (forwarding_map.size() == 0) {
// throw runtime_error(fmt::format("Forwarding map file is empty: '{}'", forwarding_map_file));
//}
auto local_hostname = get_my_hostname(true);
if (forwarding_map.find(local_hostname) == forwarding_map.end()) {
throw runtime_error(fmt::format("Unable to determine the forwarder for host: '{}'", local_hostname));
}
LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname, forwarding_map[local_hostname]);
CTX->fwd_host_id(forwarding_map[local_hostname]);
}
#endif
void load_hosts() {
string hostfile;
hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE, gkfs::config::hostfile_path);
vector<pair<string, string>> hosts;
try {
hosts = load_hostfile(hostfile);
} catch (const exception& e) {
auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
throw runtime_error(emsg);
if (hosts.empty()) {
throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
LOG(INFO, "Hosts pool size: {}", hosts.size());
auto local_hostname = gkfs::rpc::get_my_hostname(true);
addrs.resize(hosts.size());
vector<uint64_t> host_ids(hosts.size());
// populate vector with [0, ..., host_size - 1]
::iota(::begin(host_ids), ::end(host_ids), 0);
* Shuffle hosts to balance addr lookups to all hosts
* Too many concurrent lookups send to same host
* could overwhelm the server,
* returning error when addr lookup
::random_device rd; // obtain a random number from hardware
::mt19937 g(rd()); // seed the random generator
::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
// lookup addresses and put abstract server addresses into rpc_addresses
const auto& hostname = hosts.at(id).first;
const auto& uri = hosts.at(id).second;
addrs[id] = lookup_endpoint(uri);
if (!local_host_found && hostname == local_hostname) {
LOG(DEBUG, "Found local host: {}", hostname);
CTX->local_host_id(id);
local_host_found = true;
LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
LOG(WARNING, "Failed to find local host. Using host '0' as local host");
} // namespace util