Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS' POSIX interface.
12 :
13 : GekkoFS' POSIX interface is free software: you can redistribute it and/or
14 : modify it under the terms of the GNU Lesser General Public License as
15 : published by the Free Software Foundation, either version 3 of the License,
16 : or (at your option) any later version.
17 :
18 : GekkoFS' POSIX interface is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU Lesser General Public License for more details.
22 :
23 : You should have received a copy of the GNU Lesser General Public License
24 : along with GekkoFS' POSIX interface. If not, see
25 : <https://www.gnu.org/licenses/>.
26 :
27 : SPDX-License-Identifier: LGPL-3.0-or-later
28 : */
29 :
30 : #include <client/rpc/forward_metadata.hpp>
31 : #include <client/preload.hpp>
32 : #include <client/logging.hpp>
33 : #include <client/preload_util.hpp>
34 : #include <client/open_dir.hpp>
35 : #include <client/rpc/rpc_types.hpp>
36 :
37 : #include <common/rpc/rpc_util.hpp>
38 : #include <common/rpc/distributor.hpp>
39 : #include <common/rpc/rpc_types.hpp>
40 :
41 : using namespace std;
42 :
43 : namespace gkfs::rpc {
44 :
45 : /*
46 : * This file includes all metadata RPC calls.
47 : * NOTE: No errno is defined here!
48 : */
49 :
50 : /**
51 : * Send an RPC for a create request
52 : * @param path
53 : * @param mode
54 : * @param copy Number of replica to create
55 : * @return error code
56 : */
57 : int
58 1057 : forward_create(const std::string& path, const mode_t mode, const int copy) {
59 :
60 1057 : auto endp = CTX->hosts().at(
61 3171 : CTX->distributor()->locate_file_metadata(path, copy));
62 :
63 1057 : try {
64 1057 : LOG(DEBUG, "Sending RPC ...");
65 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
66 : // can retry for RPC_TRIES (see old commits with margo)
67 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
68 : // returning one result and a broadcast(endpoint_set) returning a
69 : // result_set. When that happens we can remove the .at(0) :/
70 1057 : auto out = ld_network_service->post<gkfs::rpc::create>(endp, path, mode)
71 2114 : .get()
72 1057 : .at(0);
73 1057 : LOG(DEBUG, "Got response success: {}", out.err());
74 :
75 1057 : return out.err() ? out.err() : 0;
76 0 : } catch(const std::exception& ex) {
77 0 : LOG(ERROR, "while getting rpc output");
78 0 : return EBUSY;
79 : }
80 : }
81 :
82 : /**
83 : * Send an RPC for a stat request
84 : * @param path
85 : * @param attr
86 : * @param copy metadata replica to read from
87 : * @return error code
88 : */
89 : int
90 1359 : forward_stat(const std::string& path, string& attr, const int copy) {
91 :
92 1359 : auto endp = CTX->hosts().at(
93 4077 : CTX->distributor()->locate_file_metadata(path, copy));
94 :
95 1359 : try {
96 1359 : LOG(DEBUG, "Sending RPC ...");
97 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
98 : // can retry for RPC_TRIES (see old commits with margo)
99 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
100 : // returning one result and a broadcast(endpoint_set) returning a
101 : // result_set. When that happens we can remove the .at(0) :/
102 1359 : auto out = ld_network_service->post<gkfs::rpc::stat>(endp, path)
103 2718 : .get()
104 1359 : .at(0);
105 1359 : LOG(DEBUG, "Got response success: {}", out.err());
106 :
107 1359 : if(out.err())
108 24 : return out.err();
109 :
110 1335 : attr = out.db_val();
111 0 : } catch(const std::exception& ex) {
112 0 : LOG(ERROR, "while getting rpc output");
113 0 : return EBUSY;
114 : }
115 1335 : return 0;
116 : }
117 :
118 : /**
119 : * Send an RPC for a remove request. This removes metadata and all data chunks
120 : * possible distributed across many daemons. Optimizations are in place for
121 : * small files (file_size / chunk_size) < number_of_daemons where no broadcast
122 : * to all daemons is used to remove all chunks. Otherwise, a broadcast to all
123 : * daemons is used.
124 : *
125 : * This function only attempts data removal if data exists (determined when
126 : * metadata is removed)
127 : * @param path
128 : * @param num_copies Replication scenarios with many replicas
129 : * @return error code
130 : */
131 : int
132 8 : forward_remove(const std::string& path, const int8_t num_copies) {
133 8 : int64_t size = 0;
134 8 : uint32_t mode = 0;
135 :
136 16 : for(auto copy = 0; copy < (num_copies + 1); copy++) {
137 8 : auto endp = CTX->hosts().at(
138 24 : CTX->distributor()->locate_file_metadata(path, copy));
139 :
140 : /*
141 : * Send one RPC to metadata destination and remove metadata while
142 : * retrieving size and mode to determine if data needs to removed too
143 : */
144 8 : try {
145 8 : LOG(DEBUG, "Sending RPC ...");
146 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
147 : // we can retry for RPC_TRIES (see old commits with margo)
148 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
149 : // returning one result and a broadcast(endpoint_set) returning a
150 : // result_set. When that happens we can remove the .at(0) :/
151 8 : auto out = ld_network_service
152 8 : ->post<gkfs::rpc::remove_metadata>(endp, path)
153 16 : .get()
154 8 : .at(0);
155 :
156 8 : LOG(DEBUG, "Got response success: {}", out.err());
157 :
158 8 : if(out.err())
159 0 : return out.err();
160 8 : size = out.size();
161 8 : mode = out.mode();
162 0 : } catch(const std::exception& ex) {
163 0 : LOG(ERROR, "while getting rpc output");
164 0 : return EBUSY;
165 : }
166 : }
167 : // if file is not a regular file and it's size is 0, data does not need to
168 : // be removed, thus, we exit
169 8 : if(!(S_ISREG(mode) && (size != 0)))
170 : return 0;
171 :
172 :
173 6 : std::vector<hermes::rpc_handle<gkfs::rpc::remove_data>> handles;
174 :
175 : // Small files
176 3 : if(static_cast<std::size_t>(size / gkfs::config::rpc::chunksize) <
177 3 : CTX->hosts().size()) {
178 4 : for(auto copymd = 0; copymd < (num_copies + 1); copymd++) {
179 2 : const auto metadata_host_id =
180 2 : CTX->distributor()->locate_file_metadata(path, copymd);
181 4 : const auto endp_metadata = CTX->hosts().at(metadata_host_id);
182 :
183 2 : try {
184 4 : LOG(DEBUG, "Sending RPC to host: {}",
185 2 : endp_metadata.to_string());
186 4 : gkfs::rpc::remove_data::input in(path);
187 2 : handles.emplace_back(
188 2 : ld_network_service->post<gkfs::rpc::remove_data>(
189 2 : endp_metadata, in));
190 :
191 2 : uint64_t chnk_start = 0;
192 2 : uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
193 :
194 4 : for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end;
195 : chnk_id++) {
196 4 : for(auto copy = 0; copy < (num_copies + 1); copy++) {
197 2 : const auto chnk_host_id =
198 4 : CTX->distributor()->locate_data(path, chnk_id,
199 2 : copy);
200 2 : if constexpr(gkfs::config::metadata::
201 : implicit_data_removal) {
202 : /*
203 : * If the chnk host matches the metadata host the
204 : * remove request as already been sent as part of
205 : * the metadata remove request.
206 : */
207 2 : if(chnk_host_id == metadata_host_id)
208 2 : continue;
209 : }
210 0 : const auto endp_chnk = CTX->hosts().at(chnk_host_id);
211 :
212 0 : LOG(DEBUG, "Sending RPC to host: {}",
213 0 : endp_chnk.to_string());
214 :
215 0 : handles.emplace_back(
216 : ld_network_service
217 0 : ->post<gkfs::rpc::remove_data>(
218 0 : endp_chnk, in));
219 : }
220 : }
221 0 : } catch(const std::exception& ex) {
222 0 : LOG(ERROR,
223 0 : "Failed to forward non-blocking rpc request reduced remove requests");
224 0 : return EBUSY;
225 : }
226 : }
227 : } else { // "Big" files
228 2 : for(const auto& endp : CTX->hosts()) {
229 1 : try {
230 2 : LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
231 :
232 2 : gkfs::rpc::remove_data::input in(path);
233 :
234 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
235 : // that we can retry for RPC_TRIES (see old commits with margo)
236 : // TODO(amiranda): hermes will eventually provide a
237 : // post(endpoint) returning one result and a
238 : // broadcast(endpoint_set) returning a result_set. When that
239 : // happens we can remove the .at(0) :/
240 :
241 1 : handles.emplace_back(
242 1 : ld_network_service->post<gkfs::rpc::remove_data>(endp,
243 1 : in));
244 :
245 0 : } catch(const std::exception& ex) {
246 : // TODO(amiranda): we should cancel all previously posted
247 : // requests here, unfortunately, Hermes does not support it yet
248 : // :/
249 0 : LOG(ERROR,
250 : "Failed to forward non-blocking rpc request to host: {}",
251 0 : endp.to_string());
252 0 : return EBUSY;
253 : }
254 : }
255 : }
256 : // wait for RPC responses
257 3 : auto err = 0;
258 6 : for(const auto& h : handles) {
259 3 : try {
260 : // XXX We might need a timeout here to not wait forever for an
261 : // output that never comes?
262 6 : auto out = h.get().at(0);
263 :
264 3 : if(out.err() != 0) {
265 0 : LOG(ERROR, "received error response: {}", out.err());
266 3 : err = out.err();
267 : }
268 0 : } catch(const std::exception& ex) {
269 0 : LOG(ERROR, "while getting rpc output");
270 0 : err = EBUSY;
271 : }
272 : }
273 3 : return err;
274 : }
275 :
276 : /**
277 : * Send an RPC for a decrement file size request. This is for example used
278 : * during a truncate() call.
279 : * @param path
280 : * @param length
281 : * @param copy Target replica (0 original)
282 : * @return error code
283 : */
284 : int
285 3 : forward_decr_size(const std::string& path, size_t length, const int copy) {
286 :
287 3 : auto endp = CTX->hosts().at(
288 9 : CTX->distributor()->locate_file_metadata(path, copy));
289 :
290 3 : try {
291 3 : LOG(DEBUG, "Sending RPC ...");
292 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
293 : // can retry for RPC_TRIES (see old commits with margo)
294 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
295 : // returning one result and a broadcast(endpoint_set) returning a
296 : // result_set. When that happens we can remove the .at(0) :/
297 3 : auto out = ld_network_service
298 3 : ->post<gkfs::rpc::decr_size>(endp, path, length)
299 6 : .get()
300 3 : .at(0);
301 :
302 3 : LOG(DEBUG, "Got response success: {}", out.err());
303 :
304 3 : return out.err() ? out.err() : 0;
305 0 : } catch(const std::exception& ex) {
306 0 : LOG(ERROR, "while getting rpc output");
307 0 : return EBUSY;
308 : }
309 : }
310 :
311 :
312 : /**
313 : * Send an RPC for an update metadentry request.
314 : * NOTE: Currently unused.
315 : * @param path
316 : * @param md
317 : * @param md_flags
318 : * @param copy Target replica (0 original)
319 : * @return error code
320 : */
321 : int
322 1 : forward_update_metadentry(const string& path,
323 : const gkfs::metadata::Metadata& md,
324 : const gkfs::metadata::MetadentryUpdateFlags& md_flags,
325 : const int copy) {
326 :
327 1 : auto endp = CTX->hosts().at(
328 3 : CTX->distributor()->locate_file_metadata(path, copy));
329 :
330 1 : try {
331 1 : LOG(DEBUG, "Sending RPC ...");
332 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
333 : // can retry for RPC_TRIES (see old commits with margo)
334 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
335 : // returning one result and a broadcast(endpoint_set) returning a
336 : // result_set. When that happens we can remove the .at(0) :/
337 1 : auto out = ld_network_service
338 2 : ->post<gkfs::rpc::update_metadentry>(
339 : endp, path,
340 0 : (md_flags.link_count ? md.link_count() : 0),
341 2 : /* mode */ 0,
342 2 : /* uid */ 0,
343 1 : /* gid */ 0, (md_flags.size ? md.size() : 0),
344 1 : (md_flags.blocks ? md.blocks() : 0),
345 1 : (md_flags.atime ? md.atime() : 0),
346 1 : (md_flags.mtime ? md.mtime() : 0),
347 1 : (md_flags.ctime ? md.ctime() : 0),
348 1 : bool_to_merc_bool(md_flags.link_count),
349 2 : /* mode_flag */ false,
350 1 : bool_to_merc_bool(md_flags.size),
351 1 : bool_to_merc_bool(md_flags.blocks),
352 1 : bool_to_merc_bool(md_flags.atime),
353 1 : bool_to_merc_bool(md_flags.mtime),
354 2 : bool_to_merc_bool(md_flags.ctime))
355 2 : .get()
356 1 : .at(0);
357 :
358 1 : LOG(DEBUG, "Got response success: {}", out.err());
359 :
360 1 : return out.err() ? out.err() : 0;
361 0 : } catch(const std::exception& ex) {
362 0 : LOG(ERROR, "while getting rpc output");
363 0 : return EBUSY;
364 : }
365 : }
366 :
367 : #ifdef HAS_RENAME
368 : /**
369 : * Send an RPC for a rename metadentry request.
370 : * Steps.. SetUp a blkcnt of -1
371 : * This marks that this file doesn't have to be accessed directly
372 : * Create a new md with the new name, which should have as value the old name
373 : * All operations should check blockcnt and extract a NOTEXISTS
374 : * The operations does not support replication
375 : * @param oldpath
376 : * @param newpath
377 : * @param md
378 : *
379 : * @return error code
380 : */
381 : int
382 10 : forward_rename(const string& oldpath, const string& newpath,
383 : const gkfs::metadata::Metadata& md) {
384 :
385 10 : auto endp = CTX->hosts().at(
386 30 : CTX->distributor()->locate_file_metadata(oldpath, 0));
387 :
388 10 : try {
389 10 : LOG(DEBUG, "Sending RPC ...");
390 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
391 : // can retry for RPC_TRIES (see old commits with margo)
392 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
393 : // returning one result and a broadcast(endpoint_set) returning a
394 : // result_set. When that happens we can remove the .at(0) :/
395 10 : auto out = ld_network_service
396 20 : ->post<gkfs::rpc::update_metadentry>(
397 30 : endp, oldpath, (md.link_count()),
398 20 : /* mode */ 0,
399 20 : /* uid */ 0,
400 10 : /* gid */ 0, md.size(),
401 10 : /* blockcnt */ -1, (md.atime()),
402 10 : (md.mtime()), (md.ctime()),
403 10 : bool_to_merc_bool(md.link_count()),
404 20 : /* mode_flag */ false,
405 10 : bool_to_merc_bool(md.size()), 1,
406 10 : bool_to_merc_bool(md.atime()),
407 10 : bool_to_merc_bool(md.mtime()),
408 20 : bool_to_merc_bool(md.ctime()))
409 20 : .get()
410 10 : .at(0);
411 :
412 10 : LOG(DEBUG, "Got response success: {}", out.err());
413 :
414 : // Now create the new file
415 :
416 0 : } catch(const std::exception& ex) {
417 0 : LOG(ERROR, "while getting rpc output");
418 0 : return EBUSY;
419 : }
420 :
421 20 : auto md2 = md;
422 :
423 10 : md2.target_path(oldpath);
424 : /*
425 : * Now create the new file
426 : */
427 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
428 : // can retry for RPC_TRIES (see old commits with margo)
429 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
430 : // returning one result and a broadcast(endpoint_set) returning a
431 : // result_set. When that happens we can remove the .at(0) :/
432 10 : auto endp2 = CTX->hosts().at(
433 30 : CTX->distributor()->locate_file_metadata(newpath, 0));
434 :
435 10 : try {
436 10 : LOG(DEBUG, "Sending RPC ...");
437 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
438 : // can retry for RPC_TRIES (see old commits with margo)
439 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
440 : // returning one result and a broadcast(endpoint_set) returning a
441 : // result_set. When that happens we can remove the .at(0) :/
442 :
443 10 : auto out = ld_network_service
444 10 : ->post<gkfs::rpc::create>(endp2, newpath, md2.mode())
445 20 : .get()
446 10 : .at(0);
447 10 : LOG(DEBUG, "Got response success: {}", out.err());
448 :
449 0 : } catch(const std::exception& ex) {
450 0 : LOG(ERROR, "while getting rpc output");
451 0 : return EBUSY;
452 : }
453 :
454 :
455 10 : try {
456 10 : LOG(DEBUG, "Sending RPC ...");
457 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
458 : // can retry for RPC_TRIES (see old commits with margo)
459 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
460 : // returning one result and a broadcast(endpoint_set) returning a
461 : // result_set. When that happens we can remove the .at(0) :/
462 : // Update new file with target link = oldpath
463 10 : auto out =
464 : ld_network_service
465 10 : ->post<gkfs::rpc::mk_symlink>(endp2, newpath, oldpath)
466 20 : .get()
467 10 : .at(0);
468 :
469 10 : LOG(DEBUG, "Got response success: {}", out.err());
470 :
471 : // return out.err() ? out.err() : 0;
472 :
473 0 : } catch(const std::exception& ex) {
474 0 : LOG(ERROR, "while getting rpc output");
475 0 : return EBUSY;
476 : }
477 :
478 : // Update the renamed path to solve the issue with fstat with fd)
479 10 : try {
480 10 : LOG(DEBUG, "Sending RPC ...");
481 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
482 : // can retry for RPC_TRIES (see old commits with margo)
483 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
484 : // returning one result and a broadcast(endpoint_set) returning a
485 : // result_set. When that happens we can remove the .at(0) :/
486 10 : auto out = ld_network_service
487 10 : ->post<gkfs::rpc::mk_symlink>(endp, oldpath, newpath)
488 20 : .get()
489 10 : .at(0);
490 :
491 10 : LOG(DEBUG, "Got response success: {}", out.err());
492 :
493 : // return out.err() ? out.err() : 0;
494 10 : return 0;
495 0 : } catch(const std::exception& ex) {
496 0 : LOG(ERROR, "while getting rpc output");
497 0 : return EBUSY;
498 : }
499 : }
500 :
501 : #endif
502 :
503 : /**
504 : * Send an RPC request for an update to the file size.
505 : * This is called during a write() call or similar
506 : * A single correct call is needed only to progress.
507 : * @param path
508 : * @param size
509 : * @param offset
510 : * @param append_flag
511 : * @param num_copies number of replicas
512 : * @return pair<error code, size after update>
513 : */
514 : pair<int, off64_t>
515 41 : forward_update_metadentry_size(const string& path, const size_t size,
516 : const off64_t offset, const bool append_flag,
517 : const int num_copies) {
518 :
519 82 : std::vector<hermes::rpc_handle<gkfs::rpc::update_metadentry_size>> handles;
520 :
521 82 : for(auto copy = 0; copy < num_copies + 1; copy++) {
522 41 : auto endp = CTX->hosts().at(
523 123 : CTX->distributor()->locate_file_metadata(path, copy));
524 41 : try {
525 41 : LOG(DEBUG, "Sending RPC ...");
526 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
527 : // we can retry for RPC_TRIES (see old commits with margo)
528 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
529 : // returning one result and a broadcast(endpoint_set) returning a
530 : // result_set. When that happens we can remove the .at(0) :/
531 41 : handles.emplace_back(
532 41 : ld_network_service->post<gkfs::rpc::update_metadentry_size>(
533 : endp, path, size, offset,
534 82 : bool_to_merc_bool(append_flag)));
535 0 : } catch(const std::exception& ex) {
536 0 : LOG(ERROR, "while getting rpc output");
537 0 : return make_pair(EBUSY, 0);
538 : }
539 : }
540 41 : auto err = 0;
541 41 : ssize_t out_size = 0;
542 41 : auto idx = 0;
543 41 : bool valid = false;
544 82 : for(const auto& h : handles) {
545 41 : try {
546 : // XXX We might need a timeout here to not wait forever for an
547 : // output that never comes?
548 82 : auto out = h.get().at(0);
549 :
550 41 : if(out.err() != 0) {
551 0 : LOG(ERROR, "Daemon {} reported error: {}", idx, out.err());
552 : } else {
553 41 : valid = true;
554 41 : out_size = out.ret_size();
555 : }
556 :
557 0 : } catch(const std::exception& ex) {
558 0 : LOG(ERROR, "Failed to get rpc output");
559 0 : if(!valid) {
560 0 : err = EIO;
561 : }
562 : }
563 41 : idx++;
564 : }
565 :
566 41 : if(!valid)
567 0 : return make_pair(err, 0);
568 : else
569 41 : return make_pair(0, out_size);
570 : }
571 :
572 :
573 : /**
574 : * Send an RPC request to get the current file size.
575 : * This is called during a lseek() call
576 : * @param path
577 : * @param copy Target replica (0 original)
578 : * @return pair<error code, file size>
579 : */
580 : pair<int, off64_t>
581 5 : forward_get_metadentry_size(const std::string& path, const int copy) {
582 :
583 5 : auto endp = CTX->hosts().at(
584 15 : CTX->distributor()->locate_file_metadata(path, copy));
585 :
586 5 : try {
587 5 : LOG(DEBUG, "Sending RPC ...");
588 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
589 : // can retry for RPC_TRIES (see old commits with margo)
590 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
591 : // returning one result and a broadcast(endpoint_set) returning a
592 : // result_set. When that happens we can remove the .at(0) :/
593 5 : auto out = ld_network_service
594 5 : ->post<gkfs::rpc::get_metadentry_size>(endp, path)
595 10 : .get()
596 5 : .at(0);
597 :
598 5 : LOG(DEBUG, "Got response success: {}", out.err());
599 :
600 5 : if(out.err())
601 0 : return make_pair(out.err(), 0);
602 : else
603 5 : return make_pair(0, out.ret_size());
604 0 : } catch(const std::exception& ex) {
605 0 : LOG(ERROR, "while getting rpc output");
606 0 : return make_pair(EBUSY, 0);
607 : }
608 : }
609 :
610 : /**
611 : * Send an RPC request to receive all entries of a directory.
612 : * @param open_dir
613 : * @return error code
614 : */
615 : pair<int, shared_ptr<gkfs::filemap::OpenDir>>
616 25 : forward_get_dirents(const string& path) {
617 :
618 25 : LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
619 :
620 50 : auto const targets = CTX->distributor()->locate_directory_metadata(path);
621 :
622 : /* preallocate receiving buffer. The actual size is not known yet.
623 : *
624 : * On C++14 make_unique function also zeroes the newly allocated buffer.
625 : * It turns out that this operation is increadibly slow for such a big
626 : * buffer. Moreover we don't need a zeroed buffer here.
627 : */
628 25 : auto large_buffer = std::unique_ptr<char[]>(
629 50 : new char[gkfs::config::rpc::dirents_buff_size]);
630 :
631 : // XXX there is a rounding error here depending on the number of targets...
632 25 : const std::size_t per_host_buff_size =
633 25 : gkfs::config::rpc::dirents_buff_size / targets.size();
634 :
635 : // expose local buffers for RMA from servers
636 50 : std::vector<hermes::exposed_memory> exposed_buffers;
637 25 : exposed_buffers.reserve(targets.size());
638 :
639 50 : for(std::size_t i = 0; i < targets.size(); ++i) {
640 25 : try {
641 50 : exposed_buffers.emplace_back(ld_network_service->expose(
642 25 : std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
643 25 : large_buffer.get() + (i * per_host_buff_size),
644 50 : per_host_buff_size}},
645 25 : hermes::access_mode::write_only));
646 0 : } catch(const std::exception& ex) {
647 0 : LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'",
648 0 : __func__, ex.what());
649 0 : return make_pair(EBUSY, nullptr);
650 : }
651 : }
652 :
653 25 : auto err = 0;
654 : // send RPCs
655 25 : std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents>> handles;
656 :
657 50 : for(std::size_t i = 0; i < targets.size(); ++i) {
658 :
659 : // Setup rpc input parameters for each host
660 50 : auto endp = CTX->hosts().at(targets[i]);
661 :
662 50 : gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]);
663 :
664 25 : try {
665 25 : LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
666 25 : handles.emplace_back(
667 25 : ld_network_service->post<gkfs::rpc::get_dirents>(endp, in));
668 0 : } catch(const std::exception& ex) {
669 0 : LOG(ERROR,
670 : "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
671 0 : __func__, path, targets[i], ex.what());
672 0 : err = EBUSY;
673 0 : break; // we need to gather responses from already sent RPCS
674 : }
675 : }
676 :
677 25 : LOG(DEBUG,
678 : "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
679 25 : __func__, path, targets.size(), per_host_buff_size);
680 :
681 25 : auto send_error = err != 0;
682 50 : auto open_dir = make_shared<gkfs::filemap::OpenDir>(path);
683 : // wait for RPC responses
684 50 : for(std::size_t i = 0; i < handles.size(); ++i) {
685 :
686 25 : gkfs::rpc::get_dirents::output out;
687 :
688 25 : try {
689 : // XXX We might need a timeout here to not wait forever for an
690 : // output that never comes?
691 50 : out = handles[i].get().at(0);
692 : // skip processing dirent data if there was an error during send
693 : // In this case all responses are gathered but their contents
694 : // skipped
695 25 : if(send_error)
696 0 : continue;
697 :
698 25 : if(out.err() != 0) {
699 0 : LOG(ERROR,
700 : "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
701 0 : __func__, targets[i], strerror(out.err()), path);
702 0 : err = out.err();
703 : // We need to gather all responses before exiting
704 0 : continue;
705 : }
706 0 : } catch(const std::exception& ex) {
707 0 : LOG(ERROR,
708 : "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
709 0 : __func__, path, targets[i], ex.what());
710 0 : err = EBUSY;
711 : // We need to gather all responses before exiting
712 0 : continue;
713 : }
714 :
715 : // each server wrote information to its pre-defined region in
716 : // large_buffer, recover it by computing the base_address for each
717 : // particular server and adding the appropriate offsets
718 25 : assert(exposed_buffers[i].count() == 1);
719 25 : void* base_ptr = exposed_buffers[i].begin()->data();
720 :
721 25 : bool* bool_ptr = reinterpret_cast<bool*>(base_ptr);
722 25 : char* names_ptr = reinterpret_cast<char*>(base_ptr) +
723 25 : (out.dirents_size() * sizeof(bool));
724 :
725 1054 : for(std::size_t j = 0; j < out.dirents_size(); j++) {
726 :
727 2058 : gkfs::filemap::FileType ftype =
728 1029 : (*bool_ptr) ? gkfs::filemap::FileType::directory
729 : : gkfs::filemap::FileType::regular;
730 1029 : bool_ptr++;
731 :
732 : // Check that we are not outside the recv_buff for this specific
733 : // host
734 1029 : assert((names_ptr - reinterpret_cast<char*>(base_ptr)) > 0);
735 1029 : assert(static_cast<unsigned long int>(
736 : names_ptr - reinterpret_cast<char*>(base_ptr)) <
737 : per_host_buff_size);
738 :
739 2058 : auto name = std::string(names_ptr);
740 : // number of characters in entry + \0 terminator
741 1029 : names_ptr += name.size() + 1;
742 :
743 1029 : open_dir->add(name, ftype);
744 : }
745 : }
746 50 : return make_pair(err, open_dir);
747 : }
748 :
749 : /**
750 : * Send an RPC request to receive all entries of a directory in a server.
751 : * @param path
752 : * @param server
753 : * @return error code
754 : * Returns a tuple with path-isdir-size and ctime
755 : * We still use dirents_buff_size, but we need to match in the client side, as
756 : * the buffer is provided by the "gfind" applications However, as we only ask
757 : * for a server the size should be enought for most of the scenarios. We are
758 : * reusing the forward_get_dirents code. As we only need a server, we could
759 : * simplify the code removing the asynchronous part.
760 : */
761 : pair<int, vector<tuple<const std::string, bool, size_t, time_t>>>
762 4 : forward_get_dirents_single(const string& path, int server) {
763 :
764 4 : LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
765 :
766 8 : auto const targets = CTX->distributor()->locate_directory_metadata(path);
767 :
768 : /* preallocate receiving buffer. The actual size is not known yet.
769 : *
770 : * On C++14 make_unique function also zeroes the newly allocated buffer.
771 : * It turns out that this operation is increadibly slow for such a big
772 : * buffer. Moreover we don't need a zeroed buffer here.
773 : */
774 4 : auto large_buffer = std::unique_ptr<char[]>(
775 8 : new char[gkfs::config::rpc::dirents_buff_size]);
776 :
777 : // We use the full size per server...
778 4 : const std::size_t per_host_buff_size = gkfs::config::rpc::dirents_buff_size;
779 8 : vector<tuple<const std::string, bool, size_t, time_t>> output;
780 :
781 : // expose local buffers for RMA from servers
782 4 : std::vector<hermes::exposed_memory> exposed_buffers;
783 4 : exposed_buffers.reserve(1);
784 4 : std::size_t i = server;
785 4 : try {
786 8 : exposed_buffers.emplace_back(ld_network_service->expose(
787 4 : std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
788 4 : large_buffer.get(), per_host_buff_size}},
789 4 : hermes::access_mode::write_only));
790 0 : } catch(const std::exception& ex) {
791 0 : LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__,
792 0 : ex.what());
793 0 : return make_pair(EBUSY, output);
794 : }
795 :
796 4 : auto err = 0;
797 : // send RPCs
798 4 : std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents_extended>> handles;
799 :
800 8 : auto endp = CTX->hosts().at(targets[i]);
801 :
802 8 : gkfs::rpc::get_dirents_extended::input in(path, exposed_buffers[0]);
803 :
804 4 : try {
805 4 : LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
806 4 : handles.emplace_back(
807 4 : ld_network_service->post<gkfs::rpc::get_dirents_extended>(endp,
808 4 : in));
809 0 : } catch(const std::exception& ex) {
810 0 : LOG(ERROR,
811 : "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
812 0 : __func__, path, targets[i], ex.what());
813 0 : err = EBUSY;
814 : }
815 :
816 4 : LOG(DEBUG,
817 : "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
818 4 : __func__, path, targets.size(), per_host_buff_size);
819 :
820 : // wait for RPC responses
821 :
822 4 : gkfs::rpc::get_dirents_extended::output out;
823 :
824 4 : try {
825 : // XXX We might need a timeout here to not wait forever for an
826 : // output that never comes?
827 8 : out = handles[0].get().at(0);
828 : // skip processing dirent data if there was an error during send
829 : // In this case all responses are gathered but their contents skipped
830 :
831 4 : if(out.err() != 0) {
832 0 : LOG(ERROR,
833 : "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
834 : __func__, targets[0], strerror(out.err()), path);
835 4 : err = out.err();
836 : // We need to gather all responses before exiting
837 : }
838 0 : } catch(const std::exception& ex) {
839 0 : LOG(ERROR,
840 : "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
841 0 : __func__, path, targets[0], ex.what());
842 0 : err = EBUSY;
843 : // We need to gather all responses before exiting
844 : }
845 :
846 : // The parenthesis is extremely important if not the cast will add as a
847 : // size_t or a time_t and not as a char
848 4 : auto out_buff_ptr = static_cast<char*>(exposed_buffers[0].begin()->data());
849 4 : auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
850 4 : auto size_ptr = reinterpret_cast<size_t*>(
851 4 : (out_buff_ptr) + (out.dirents_size() * sizeof(bool)));
852 4 : auto ctime_ptr = reinterpret_cast<time_t*>(
853 : (out_buff_ptr) +
854 4 : (out.dirents_size() * (sizeof(bool) + sizeof(size_t))));
855 4 : auto names_ptr =
856 4 : out_buff_ptr + (out.dirents_size() *
857 4 : (sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
858 :
859 8 : for(std::size_t j = 0; j < out.dirents_size(); j++) {
860 :
861 4 : bool ftype = (*bool_ptr);
862 4 : bool_ptr++;
863 :
864 4 : size_t size = *size_ptr;
865 4 : size_ptr++;
866 :
867 4 : time_t ctime = *ctime_ptr;
868 4 : ctime_ptr++;
869 :
870 8 : auto name = std::string(names_ptr);
871 : // number of characters in entry + \0 terminator
872 4 : names_ptr += name.size() + 1;
873 4 : output.emplace_back(std::forward_as_tuple(name, ftype, size, ctime));
874 : }
875 8 : return make_pair(err, output);
876 : }
877 :
878 :
879 : #ifdef HAS_SYMLINKS
880 :
881 : /**
882 : * Send an RPC request to create a symlink.
883 : * @param path
884 : * @param target_path
885 : * @return error code
886 : */
887 : int
888 0 : forward_mk_symlink(const std::string& path, const std::string& target_path) {
889 :
890 0 : auto endp =
891 0 : CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0));
892 :
893 0 : try {
894 0 : LOG(DEBUG, "Sending RPC ...");
895 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
896 : // can retry for RPC_TRIES (see old commits with margo)
897 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
898 : // returning one result and a broadcast(endpoint_set) returning a
899 : // result_set. When that happens we can remove the .at(0) :/
900 0 : auto out =
901 : ld_network_service
902 0 : ->post<gkfs::rpc::mk_symlink>(endp, path, target_path)
903 0 : .get()
904 0 : .at(0);
905 :
906 0 : LOG(DEBUG, "Got response success: {}", out.err());
907 :
908 0 : return out.err() ? out.err() : 0;
909 0 : } catch(const std::exception& ex) {
910 0 : LOG(ERROR, "while getting rpc output");
911 0 : return EBUSY;
912 : }
913 : }
914 :
915 : #endif
916 :
917 : } // namespace gkfs::rpc
|