Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS' POSIX interface.
12 :
13 : GekkoFS' POSIX interface is free software: you can redistribute it and/or
14 : modify it under the terms of the GNU Lesser General Public License as
15 : published by the Free Software Foundation, either version 3 of the License,
16 : or (at your option) any later version.
17 :
18 : GekkoFS' POSIX interface is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU Lesser General Public License for more details.
22 :
23 : You should have received a copy of the GNU Lesser General Public License
24 : along with GekkoFS' POSIX interface. If not, see
25 : <https://www.gnu.org/licenses/>.
26 :
27 : SPDX-License-Identifier: LGPL-3.0-or-later
28 : */
29 :
30 : #include <client/preload_util.hpp>
31 : #include <client/env.hpp>
32 : #include <client/logging.hpp>
33 : #include <client/rpc/forward_metadata.hpp>
34 :
35 : #include <common/rpc/distributor.hpp>
36 : #include <common/rpc/rpc_util.hpp>
37 : #include <common/env_util.hpp>
38 : #include <common/common_defs.hpp>
39 :
40 : #include <hermes.hpp>
41 :
42 : #include <fstream>
43 : #include <sstream>
44 : #include <regex>
45 : #include <csignal>
46 : #include <random>
47 :
48 : extern "C" {
49 : #include <sys/sysmacros.h>
50 : }
51 :
52 : using namespace std;
53 :
54 : namespace {
55 :
56 : /**
57 : * Looks up a host endpoint via Hermes
58 : * @param uri
59 : * @param max_retries
60 : * @return hermes endpoint, if successful
61 : * @throws std::runtime_error
62 : */
63 : hermes::endpoint
64 248 : lookup_endpoint(const std::string& uri, std::size_t max_retries = 3) {
65 :
66 248 : LOG(DEBUG, "Looking up address \"{}\"", uri);
67 :
68 248 : std::random_device rd; // obtain a random number from hardware
69 248 : std::size_t attempts = 0;
70 496 : std::string error_msg;
71 :
72 248 : do {
73 248 : try {
74 496 : return ld_network_service->lookup(uri);
75 0 : } catch(const exception& ex) {
76 0 : error_msg = ex.what();
77 :
78 0 : LOG(WARNING, "Failed to lookup address '{}'. Attempts [{}/{}]", uri,
79 0 : attempts + 1, max_retries);
80 :
81 : // Wait a random amount of time and try again
82 0 : std::mt19937 g(rd()); // seed the random generator
83 0 : std::uniform_int_distribution<> distr(
84 0 : 50, 50 * (attempts + 2)); // define the range
85 0 : std::this_thread::sleep_for(std::chrono::milliseconds(distr(g)));
86 0 : continue;
87 : }
88 0 : } while(++attempts < max_retries);
89 :
90 0 : throw std::runtime_error(
91 0 : fmt::format("Endpoint for address '{}' could not be found ({})",
92 0 : uri, error_msg));
93 : }
94 :
95 : /**
96 : * extracts protocol from a given URI generated by the RPC server of the daemon
97 : * @param uri
98 : * @throws std::runtime_error
99 : */
100 : void
101 248 : extract_protocol(const string& uri) {
102 248 : if(uri.rfind("://") == string::npos) {
103 : // invalid format. kill client
104 0 : throw runtime_error(fmt::format("Invalid format for URI: '{}'", uri));
105 : }
106 248 : string protocol{};
107 :
108 248 : if(uri.find(gkfs::rpc::protocol::ofi_sockets) != string::npos) {
109 248 : protocol = gkfs::rpc::protocol::ofi_sockets;
110 0 : } else if(uri.find(gkfs::rpc::protocol::ofi_psm2) != string::npos) {
111 0 : protocol = gkfs::rpc::protocol::ofi_psm2;
112 0 : } else if(uri.find(gkfs::rpc::protocol::ofi_verbs) != string::npos) {
113 0 : protocol = gkfs::rpc::protocol::ofi_verbs;
114 : }
115 : // check for shared memory protocol. Can be plain shared memory or real ofi
116 : // protocol + auto_sm
117 248 : if(uri.find(gkfs::rpc::protocol::na_sm) != string::npos) {
118 0 : if(protocol.empty())
119 0 : protocol = gkfs::rpc::protocol::na_sm;
120 : else
121 0 : CTX->auto_sm(true);
122 : }
123 248 : if(protocol.empty()) {
124 : // unsupported protocol. kill client
125 0 : throw runtime_error(fmt::format(
126 : "Unsupported RPC protocol found in hosts file with URI: '{}'",
127 0 : uri));
128 : }
129 248 : LOG(INFO,
130 : "RPC protocol '{}' extracted from hosts file. Using auto_sm is '{}'",
131 248 : protocol, CTX->auto_sm());
132 248 : CTX->rpc_protocol(protocol);
133 248 : }
134 :
135 : /**
136 : * Reads the daemon generator hosts file by a given path, returning hosts and
137 : * URI addresses
138 : * @param path to hosts file
139 : * @return vector<pair<hosts, URI>>
140 : * @throws std::runtime_error
141 : */
142 : vector<pair<string, string>>
143 248 : load_hostfile(const std::string& path) {
144 :
145 248 : LOG(DEBUG, "Loading hosts file: \"{}\"", path);
146 :
147 496 : ifstream lf(path);
148 248 : if(!lf) {
149 0 : throw runtime_error(fmt::format("Failed to open hosts file '{}': {}",
150 0 : path, strerror(errno)));
151 : }
152 248 : vector<pair<string, string>> hosts;
153 248 : const regex line_re("^(\\S+)\\s+(\\S+)$",
154 248 : regex::ECMAScript | regex::optimize);
155 496 : string line;
156 248 : string host;
157 248 : string uri;
158 496 : std::smatch match;
159 496 : while(getline(lf, line)) {
160 248 : if(!regex_match(line, match, line_re)) {
161 :
162 0 : LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
163 0 : path, line);
164 :
165 0 : throw runtime_error(
166 0 : fmt::format("unrecognized line format: '{}'", line));
167 : }
168 248 : host = match[1];
169 248 : uri = match[2];
170 248 : hosts.emplace_back(host, uri);
171 : }
172 248 : if(hosts.empty()) {
173 0 : throw runtime_error(
174 0 : "Hosts file found but no suitable addresses could be extracted");
175 : }
176 248 : extract_protocol(hosts[0].second);
177 : // sort hosts so that data always hashes to the same place during restart
178 248 : std::sort(hosts.begin(), hosts.end());
179 : // remove rootdir suffix from host after sorting as no longer required
180 496 : for(auto& h : hosts) {
181 248 : auto idx = h.first.rfind("#");
182 248 : if(idx != string::npos)
183 0 : h.first.erase(idx, h.first.length());
184 : }
185 496 : return hosts;
186 : }
187 :
188 : } // namespace
189 :
190 : namespace gkfs::utils {
191 :
192 :
193 : /**
194 : * Retrieve metadata from daemon and return Metadata object
195 : * errno may be set
196 : * @param path
197 : * @param follow_links
198 : * @return Metadata
199 : */
200 : optional<gkfs::metadata::Metadata>
201 1359 : get_metadata(const string& path, bool follow_links) {
202 2718 : std::string attr;
203 1359 : auto err = gkfs::rpc::forward_stat(path, attr, 0);
204 : // TODO: retry on failure
205 :
206 1359 : if(err) {
207 24 : auto copy = 1;
208 24 : while(copy < CTX->get_replicas() + 1 && err) {
209 0 : LOG(ERROR, "Retrying Stat on replica {} {}", copy, follow_links);
210 0 : err = gkfs::rpc::forward_stat(path, attr, copy);
211 0 : copy++;
212 : }
213 24 : if(err) {
214 24 : errno = err;
215 24 : return {};
216 : }
217 : }
218 : #ifdef HAS_SYMLINKS
219 1335 : if(follow_links) {
220 144 : gkfs::metadata::Metadata md{attr};
221 72 : while(md.is_link()) {
222 0 : err = gkfs::rpc::forward_stat(md.target_path(), attr, 0);
223 0 : if(err) {
224 0 : errno = err;
225 0 : return {};
226 : }
227 0 : md = gkfs::metadata::Metadata{attr};
228 : }
229 : }
230 : #endif
231 2670 : return gkfs::metadata::Metadata{attr};
232 : }
233 :
234 :
235 : /**
236 : * Converts the Metadata object into a stat struct, which is needed by Linux
237 : * @param path
238 : * @param md
239 : * @param attr
240 : * @return
241 : */
242 : int
243 53 : metadata_to_stat(const std::string& path, const gkfs::metadata::Metadata& md,
244 : struct stat& attr) {
245 :
246 : /* Populate default values */
247 53 : attr.st_dev = makedev(0, 0);
248 53 : attr.st_ino = std::hash<std::string>{}(path);
249 53 : attr.st_nlink = 1;
250 53 : attr.st_uid = CTX->fs_conf()->uid;
251 53 : attr.st_gid = CTX->fs_conf()->gid;
252 53 : attr.st_rdev = 0;
253 53 : attr.st_blksize = gkfs::config::rpc::chunksize;
254 53 : attr.st_blocks = 0;
255 :
256 53 : memset(&attr.st_atim, 0, sizeof(timespec));
257 53 : memset(&attr.st_mtim, 0, sizeof(timespec));
258 53 : memset(&attr.st_ctim, 0, sizeof(timespec));
259 :
260 53 : attr.st_mode = md.mode();
261 :
262 : #ifdef HAS_SYMLINKS
263 53 : if(md.is_link())
264 0 : attr.st_size = md.target_path().size() + CTX->mountdir().size();
265 : else
266 : #endif
267 53 : attr.st_size = md.size();
268 :
269 53 : if(CTX->fs_conf()->atime_state) {
270 53 : attr.st_atim.tv_sec = md.atime();
271 : }
272 53 : if(CTX->fs_conf()->mtime_state) {
273 53 : attr.st_mtim.tv_sec = md.mtime();
274 : }
275 53 : if(CTX->fs_conf()->ctime_state) {
276 53 : attr.st_ctim.tv_sec = md.ctime();
277 : }
278 53 : if(CTX->fs_conf()->link_cnt_state) {
279 53 : attr.st_nlink = md.link_count();
280 : }
281 53 : if(CTX->fs_conf()->blocks_state) {
282 53 : attr.st_blocks = md.blocks();
283 : } else {
284 0 : attr.st_blocks = md.size() / gkfs::config::syscall::stat::st_nblocksize;
285 : }
286 53 : return 0;
287 : }
288 :
289 : #ifdef GKFS_ENABLE_FORWARDING
290 : map<string, uint64_t>
291 0 : load_forwarding_map_file(const std::string& lfpath) {
292 :
293 0 : LOG(DEBUG, "Loading forwarding map file file: \"{}\"", lfpath);
294 :
295 0 : ifstream lf(lfpath);
296 0 : if(!lf) {
297 0 : throw runtime_error(
298 0 : fmt::format("Failed to open forwarding map file '{}': {}",
299 0 : lfpath, strerror(errno)));
300 : }
301 0 : map<string, uint64_t> forwarding_map;
302 0 : const regex line_re("^(\\S+)\\s+(\\S+)$",
303 0 : regex::ECMAScript | regex::optimize);
304 0 : string line;
305 0 : string host;
306 0 : uint64_t forwarder;
307 0 : std::smatch match;
308 0 : while(getline(lf, line)) {
309 0 : if(!regex_match(line, match, line_re)) {
310 :
311 0 : LOG(ERROR, "Unrecognized line format: [path: '{}', line: '{}']",
312 0 : lfpath, line);
313 :
314 0 : throw runtime_error(
315 0 : fmt::format("unrecognized line format: '{}'", line));
316 : }
317 0 : host = match[1];
318 0 : forwarder = std::stoi(match[2].str());
319 0 : forwarding_map[host] = forwarder;
320 : }
321 0 : return forwarding_map;
322 : }
323 : #endif
324 :
325 : #ifdef GKFS_ENABLE_FORWARDING
326 : void
327 0 : load_forwarding_map() {
328 0 : string forwarding_map_file;
329 :
330 0 : forwarding_map_file = gkfs::env::get_var(
331 0 : gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path);
332 :
333 0 : map<string, uint64_t> forwarding_map;
334 :
335 0 : while(forwarding_map.size() == 0) {
336 0 : try {
337 0 : forwarding_map = load_forwarding_map_file(forwarding_map_file);
338 0 : } catch(const exception& e) {
339 0 : auto emsg = fmt::format("Failed to load forwarding map file: {}",
340 0 : e.what());
341 0 : throw runtime_error(emsg);
342 : }
343 : }
344 :
345 : // if (forwarding_map.size() == 0) {
346 : // throw runtime_error(fmt::format("Forwarding map file is empty: '{}'",
347 : // forwarding_map_file));
348 : //}
349 :
350 0 : auto local_hostname = gkfs::rpc::get_my_hostname(true);
351 :
352 0 : if(forwarding_map.find(local_hostname) == forwarding_map.end()) {
353 0 : throw runtime_error(
354 0 : fmt::format("Unable to determine the forwarder for host: '{}'",
355 0 : local_hostname));
356 : }
357 0 : LOG(INFO, "Forwarding map loaded for '{}' as '{}'", local_hostname,
358 0 : forwarding_map[local_hostname]);
359 :
360 0 : CTX->fwd_host_id(forwarding_map[local_hostname]);
361 0 : }
362 : #endif
363 :
364 : vector<pair<string, string>>
365 248 : read_hosts_file() {
366 248 : string hostfile;
367 :
368 248 : hostfile = gkfs::env::get_var(gkfs::env::HOSTS_FILE,
369 496 : gkfs::config::hostfile_path);
370 :
371 248 : vector<pair<string, string>> hosts;
372 248 : try {
373 496 : hosts = load_hostfile(hostfile);
374 0 : } catch(const exception& e) {
375 0 : auto emsg = fmt::format("Failed to load hosts file: {}", e.what());
376 0 : throw runtime_error(emsg);
377 : }
378 :
379 248 : if(hosts.empty()) {
380 0 : throw runtime_error(fmt::format("Hostfile empty: '{}'", hostfile));
381 : }
382 :
383 248 : LOG(INFO, "Hosts pool size: {}", hosts.size());
384 248 : return hosts;
385 : }
386 :
387 : /**
388 : * Connects to daemons and lookup Mercury URI addresses via Hermes
389 : * @param hosts vector<pair<hostname, Mercury URI address>>
390 : * @throws std::runtime_error through lookup_endpoint()
391 : */
392 : void
393 248 : connect_to_hosts(const vector<pair<string, string>>& hosts) {
394 248 : auto local_hostname = gkfs::rpc::get_my_hostname(true);
395 248 : bool local_host_found = false;
396 :
397 496 : std::vector<hermes::endpoint> addrs;
398 248 : addrs.resize(hosts.size());
399 :
400 496 : vector<uint64_t> host_ids(hosts.size());
401 : // populate vector with [0, ..., host_size - 1]
402 248 : ::iota(::begin(host_ids), ::end(host_ids), 0);
403 : /*
404 : * Shuffle hosts to balance addr lookups to all hosts
405 : * Too many concurrent lookups send to same host
406 : * could overwhelm the server,
407 : * returning error when addr lookup
408 : */
409 496 : ::random_device rd; // obtain a random number from hardware
410 248 : ::mt19937 g(rd()); // seed the random generator
411 248 : ::shuffle(host_ids.begin(), host_ids.end(), g); // Shuffle hosts vector
412 : // lookup addresses and put abstract server addresses into rpc_addresses
413 :
414 496 : for(const auto& id : host_ids) {
415 248 : const auto& hostname = hosts.at(id).first;
416 248 : const auto& uri = hosts.at(id).second;
417 :
418 248 : addrs[id] = lookup_endpoint(uri);
419 :
420 248 : if(!local_host_found && hostname == local_hostname) {
421 248 : LOG(DEBUG, "Found local host: {}", hostname);
422 248 : CTX->local_host_id(id);
423 : local_host_found = true;
424 : }
425 :
426 496 : LOG(DEBUG, "Found peer: {}", addrs[id].to_string());
427 : }
428 :
429 248 : if(!local_host_found) {
430 0 : LOG(WARNING, "Failed to find local host. Using host '0' as local host");
431 0 : CTX->local_host_id(0);
432 : }
433 :
434 248 : CTX->hosts(addrs);
435 248 : }
436 :
437 : } // namespace gkfs::utils
|