Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS' POSIX interface.
12 :
13 : GekkoFS' POSIX interface is free software: you can redistribute it and/or
14 : modify it under the terms of the GNU Lesser General Public License as
15 : published by the Free Software Foundation, either version 3 of the License,
16 : or (at your option) any later version.
17 :
18 : GekkoFS' POSIX interface is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU Lesser General Public License for more details.
22 :
23 : You should have received a copy of the GNU Lesser General Public License
24 : along with GekkoFS' POSIX interface. If not, see
25 : <https://www.gnu.org/licenses/>.
26 :
27 : SPDX-License-Identifier: LGPL-3.0-or-later
28 : */
29 :
30 : #include <client/preload_util.hpp>
31 : #include <client/rpc/forward_data.hpp>
32 : #include <client/rpc/rpc_types.hpp>
33 : #include <client/logging.hpp>
34 :
35 : #include <common/rpc/distributor.hpp>
36 : #include <common/arithmetic/arithmetic.hpp>
37 : #include <common/rpc/rpc_util.hpp>
38 :
39 : #include <unordered_set>
40 :
41 : using namespace std;
42 :
43 : namespace gkfs::rpc {
44 :
45 : /*
46 : * This file includes all data RPC calls.
47 : * NOTE: No errno is defined here!
48 : */
49 :
50 : /**
51 : * Send an RPC request to write from a buffer.
52 : * There is a bitset of 1024 chunks to tell the server
53 : * which chunks to process. Exceeding this value will work without
54 : * replication. Another way is to leverage mercury segments.
55 : * TODO: Decide how to manage a write to a replica that doesn't exist
56 : * @param path
57 : * @param buf
58 : * @param append_flag
59 : * @param write_size
60 : * @param num_copies number of replicas
61 : * @return pair<error code, written size>
62 : */
63 : pair<int, ssize_t>
64 46 : forward_write(const string& path, const void* buf, const off64_t offset,
65 : const size_t write_size, const int8_t num_copies) {
66 :
67 : // import pow2-optimized arithmetic functions
68 46 : using namespace gkfs::utils::arithmetic;
69 :
70 46 : assert(write_size > 0);
71 :
72 : // Calculate chunkid boundaries and numbers so that daemons know in
73 : // which interval to look for chunks
74 46 : auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
75 46 : auto chnk_end = block_index((offset + write_size) - 1,
76 46 : gkfs::config::rpc::chunksize);
77 :
78 46 : auto chnk_total = (chnk_end - chnk_start) + 1;
79 :
80 : // Collect all chunk ids within count that have the same destination so
81 : // that those are send in one rpc bulk transfer
82 92 : std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
83 :
84 : // contains the target ids, used to access the target_chnks map.
85 : // First idx is chunk with potential offset
86 92 : std::vector<uint64_t> targets{};
87 :
88 : // targets for the first and last chunk as they need special treatment
89 : // We need a set to manage replicas.
90 92 : std::set<uint64_t> chnk_start_target{};
91 46 : std::set<uint64_t> chnk_end_target{};
92 :
93 92 : std::unordered_map<uint64_t, std::vector<uint8_t>> write_ops_vect;
94 :
95 : // If num_copies is 0, we do the normal write operation. Otherwise
96 : // we process all the replicas.
97 151 : for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
98 315 : for(auto copy = num_copies ? 1 : 0; copy < num_copies + 1; copy++) {
99 105 : auto target = CTX->distributor()->locate_data(path, chnk_id, copy);
100 :
101 105 : if(write_ops_vect.find(target) == write_ops_vect.end())
102 46 : write_ops_vect[target] =
103 92 : std::vector<uint8_t>(((chnk_total + 7) / 8));
104 105 : gkfs::rpc::set_bitset(write_ops_vect[target], chnk_id - chnk_start);
105 :
106 105 : if(target_chnks.count(target) == 0) {
107 46 : target_chnks.insert(
108 46 : std::make_pair(target, std::vector<uint64_t>{chnk_id}));
109 46 : targets.push_back(target);
110 : } else {
111 59 : target_chnks[target].push_back(chnk_id);
112 : }
113 :
114 : // set first and last chnk targets
115 105 : if(chnk_id == chnk_start) {
116 46 : chnk_start_target.insert(target);
117 : }
118 :
119 105 : if(chnk_id == chnk_end) {
120 46 : chnk_end_target.insert(target);
121 : }
122 : }
123 : }
124 :
125 : // some helper variables for async RPC
126 46 : std::vector<hermes::mutable_buffer> bufseq{
127 : hermes::mutable_buffer{const_cast<void*>(buf), write_size},
128 92 : };
129 :
130 : // expose user buffers so that they can serve as RDMA data sources
131 : // (these are automatically "unexposed" when the destructor is called)
132 92 : hermes::exposed_memory local_buffers;
133 :
134 46 : try {
135 92 : local_buffers = ld_network_service->expose(
136 46 : bufseq, hermes::access_mode::read_only);
137 :
138 0 : } catch(const std::exception& ex) {
139 0 : LOG(ERROR, "Failed to expose buffers for RMA");
140 0 : return make_pair(EBUSY, 0);
141 : }
142 :
143 92 : std::vector<hermes::rpc_handle<gkfs::rpc::write_data>> handles;
144 :
145 : // Issue non-blocking RPC requests and wait for the result later
146 : //
147 : // TODO(amiranda): This could be simplified by adding a vector of inputs
148 : // to async_engine::broadcast(). This would allow us to avoid manually
149 : // looping over handles as we do below
150 92 : for(const auto& target : targets) {
151 :
152 : // total chunk_size for target
153 46 : auto total_chunk_size =
154 46 : target_chnks[target].size() * gkfs::config::rpc::chunksize;
155 :
156 : // receiver of first chunk must subtract the offset from first chunk
157 46 : if(chnk_start_target.end() != chnk_start_target.find(target)) {
158 46 : total_chunk_size -=
159 46 : block_overrun(offset, gkfs::config::rpc::chunksize);
160 : }
161 :
162 : // receiver of last chunk must subtract
163 92 : if(chnk_end_target.end() != chnk_end_target.find(target) &&
164 46 : !is_aligned(offset + write_size, gkfs::config::rpc::chunksize)) {
165 44 : total_chunk_size -= block_underrun(offset + write_size,
166 : gkfs::config::rpc::chunksize);
167 : }
168 :
169 92 : auto endp = CTX->hosts().at(target);
170 :
171 46 : try {
172 46 : LOG(DEBUG, "Sending RPC ...");
173 :
174 46 : gkfs::rpc::write_data::input in(
175 : path,
176 : // first offset in targets is the chunk with
177 : // a potential offset
178 46 : block_overrun(offset, gkfs::config::rpc::chunksize), target,
179 46 : CTX->hosts().size(),
180 : // number of chunks handled by that destination
181 46 : gkfs::rpc::compress_bitset(write_ops_vect[target]),
182 46 : target_chnks[target].size(),
183 : // chunk start id of this write
184 : chnk_start,
185 : // chunk end id of this write
186 : chnk_end,
187 : // total size to write
188 92 : total_chunk_size, local_buffers);
189 :
190 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
191 : // we can retry for RPC_TRIES (see old commits with margo)
192 : // TODO(amiranda): hermes will eventually provide a post(endpoint)
193 : // returning one result and a broadcast(endpoint_set) returning a
194 : // result_set. When that happens we can remove the .at(0) :/
195 46 : handles.emplace_back(
196 46 : ld_network_service->post<gkfs::rpc::write_data>(endp, in));
197 :
198 46 : LOG(DEBUG,
199 : "host: {}, path: \"{}\", chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
200 : target, path, chnk_start, chnk_end, in.chunk_n(),
201 46 : total_chunk_size, in.offset());
202 0 : } catch(const std::exception& ex) {
203 0 : LOG(ERROR,
204 : "Unable to send non-blocking rpc for "
205 : "path \"{}\" [peer: {}]",
206 0 : path, target);
207 0 : if(num_copies == 0)
208 0 : return make_pair(EBUSY, 0);
209 : }
210 : }
211 :
212 46 : auto err = 0;
213 46 : ssize_t out_size = 0;
214 46 : std::size_t idx = 0;
215 : #ifdef REPLICA_CHECK
216 : std::vector<uint8_t> fill(chnk_total);
217 : auto write_ops = write_ops_vect.begin();
218 : #endif
219 92 : for(const auto& h : handles) {
220 46 : try {
221 : // XXX We might need a timeout here to not wait forever for an
222 : // output that never comes?
223 92 : auto out = h.get().at(0);
224 :
225 46 : if(out.err() != 0) {
226 0 : LOG(ERROR, "Daemon reported error: {}", out.err());
227 46 : err = out.err();
228 : } else {
229 46 : out_size += static_cast<size_t>(out.io_size());
230 : #ifdef REPLICA_CHECK
231 : if(num_copies) {
232 : if(fill.size() == 0) {
233 : fill = write_ops->second;
234 : } else {
235 : for(size_t i = 0; i < fill.size(); i++) {
236 : fill[i] |= write_ops->second[i];
237 : }
238 : }
239 : }
240 : write_ops++;
241 : #endif
242 : }
243 0 : } catch(const std::exception& ex) {
244 0 : LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
245 0 : path, targets[idx]);
246 0 : err = EIO;
247 : }
248 46 : idx++;
249 : }
250 : // As servers can fail (and we cannot know if the total data is written), we
251 : // send the updated size but check that at least one copy of all chunks are
252 : // processed.
253 46 : if(num_copies) {
254 : // A bit-wise or should show that all the chunks are written (255)
255 0 : out_size = write_size;
256 : #ifdef REPLICA_CHECK
257 : for(size_t i = 0; i < fill.size() - 1; i++) {
258 : if(fill[i] != 255) {
259 : err = EIO;
260 : break;
261 : }
262 : }
263 : // Process the leftover bytes
264 : for(uint64_t chnk_id = (chnk_start + (fill.size() - 1) * 8);
265 : chnk_id <= chnk_end; chnk_id++) {
266 : if(!(fill[(chnk_id - chnk_start) / 8] &
267 : (1 << ((chnk_id - chnk_start) % 8)))) {
268 : err = EIO;
269 : break;
270 : }
271 : }
272 : #endif
273 : }
274 : /*
275 : * Typically file systems return the size even if only a part of it was
276 : * written. In our case, we do not keep track which daemon fully wrote its
277 : * workload. Thus, we always return size 0 on error.
278 : */
279 46 : if(err)
280 0 : return make_pair(err, 0);
281 : else
282 46 : return make_pair(0, out_size);
283 : }
284 :
285 : /**
286 : * Send an RPC request to read to a buffer.
287 : * @param path
288 : * @param buf
289 : * @param offset
290 : * @param read_size
291 : * @param num_copies number of copies available (0 is no replication)
292 : * @param failed nodes failed that should not be used
293 : * @return pair<error code, read size>
294 : */
295 : pair<int, ssize_t>
296 32 : forward_read(const string& path, void* buf, const off64_t offset,
297 : const size_t read_size, const int8_t num_copies,
298 : std::set<int8_t>& failed) {
299 :
300 : // import pow2-optimized arithmetic functions
301 32 : using namespace gkfs::utils::arithmetic;
302 :
303 : // Calculate chunkid boundaries and numbers so that daemons know in which
304 : // interval to look for chunks
305 32 : auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
306 32 : auto chnk_end =
307 32 : block_index((offset + read_size - 1), gkfs::config::rpc::chunksize);
308 32 : auto chnk_total = (chnk_end - chnk_start) + 1;
309 : // Collect all chunk ids within count that have the same destination so
310 : // that those are send in one rpc bulk transfer
311 64 : std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
312 :
313 : // contains the recipient ids, used to access the target_chnks map.
314 : // First idx is chunk with potential offset
315 64 : std::vector<uint64_t> targets{};
316 : // targets for the first and last chunk as they need special treatment
317 32 : uint64_t chnk_start_target = 0;
318 32 : uint64_t chnk_end_target = 0;
319 64 : std::unordered_map<uint64_t, std::vector<uint8_t>> read_bitset_vect;
320 :
321 110 : for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
322 78 : auto target = CTX->distributor()->locate_data(path, chnk_id, 0);
323 78 : if(num_copies > 0) {
324 : // If we have some failures we select another copy (randomly).
325 0 : while(failed.find(target) != failed.end()) {
326 0 : LOG(DEBUG, "Selecting another node, target: {} down", target);
327 0 : target = CTX->distributor()->locate_data(path, chnk_id,
328 0 : rand() % num_copies);
329 : }
330 : }
331 :
332 78 : if(read_bitset_vect.find(target) == read_bitset_vect.end())
333 32 : read_bitset_vect[target] =
334 64 : std::vector<uint8_t>(((chnk_total + 7) / 8));
335 78 : read_bitset_vect[target][(chnk_id - chnk_start) / 8] |=
336 78 : 1 << ((chnk_id - chnk_start) % 8); // set
337 :
338 78 : if(target_chnks.count(target) == 0) {
339 32 : target_chnks.insert(
340 32 : std::make_pair(target, std::vector<uint64_t>{chnk_id}));
341 32 : targets.push_back(target);
342 : } else {
343 46 : target_chnks[target].push_back(chnk_id);
344 : }
345 :
346 : // set first and last chnk targets
347 78 : if(chnk_id == chnk_start) {
348 32 : chnk_start_target = target;
349 : }
350 :
351 78 : if(chnk_id == chnk_end) {
352 32 : chnk_end_target = target;
353 : }
354 : }
355 :
356 : // some helper variables for async RPCs
357 32 : std::vector<hermes::mutable_buffer> bufseq{
358 : hermes::mutable_buffer{buf, read_size},
359 64 : };
360 :
361 : // expose user buffers so that they can serve as RDMA data targets
362 : // (these are automatically "unexposed" when the destructor is called)
363 64 : hermes::exposed_memory local_buffers;
364 :
365 32 : try {
366 64 : local_buffers = ld_network_service->expose(
367 32 : bufseq, hermes::access_mode::write_only);
368 :
369 0 : } catch(const std::exception& ex) {
370 0 : LOG(ERROR, "Failed to expose buffers for RMA");
371 0 : return make_pair(EBUSY, 0);
372 : }
373 :
374 64 : std::vector<hermes::rpc_handle<gkfs::rpc::read_data>> handles;
375 :
376 : // Issue non-blocking RPC requests and wait for the result later
377 : //
378 : // TODO(amiranda): This could be simplified by adding a vector of inputs
379 : // to async_engine::broadcast(). This would allow us to avoid manually
380 : // looping over handles as we do below
381 :
382 64 : for(const auto& target : targets) {
383 :
384 : // total chunk_size for target
385 32 : auto total_chunk_size =
386 32 : target_chnks[target].size() * gkfs::config::rpc::chunksize;
387 :
388 : // receiver of first chunk must subtract the offset from first chunk
389 32 : if(target == chnk_start_target) {
390 32 : total_chunk_size -=
391 32 : block_overrun(offset, gkfs::config::rpc::chunksize);
392 : }
393 :
394 : // receiver of last chunk must subtract
395 64 : if(target == chnk_end_target &&
396 32 : !is_aligned(offset + read_size, gkfs::config::rpc::chunksize)) {
397 30 : total_chunk_size -= block_underrun(offset + read_size,
398 : gkfs::config::rpc::chunksize);
399 : }
400 :
401 64 : auto endp = CTX->hosts().at(target);
402 :
403 32 : try {
404 :
405 32 : LOG(DEBUG, "Sending RPC ...");
406 :
407 32 : gkfs::rpc::read_data::input in(
408 : path,
409 : // first offset in targets is the chunk with
410 : // a potential offset
411 32 : block_overrun(offset, gkfs::config::rpc::chunksize), target,
412 32 : CTX->hosts().size(),
413 32 : gkfs::rpc::compress_bitset(read_bitset_vect[target]),
414 : // number of chunks handled by that destination
415 32 : target_chnks[target].size(),
416 : // chunk start id of this write
417 : chnk_start,
418 : // chunk end id of this write
419 : chnk_end,
420 : // total size to write
421 64 : total_chunk_size, local_buffers);
422 :
423 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
424 : // that we can retry for RPC_TRIES (see old commits with margo)
425 : // TODO(amiranda): hermes will eventually provide a
426 : // post(endpoint) returning one result and a
427 : // broadcast(endpoint_set) returning a result_set. When that
428 : // happens we can remove the .at(0) :/
429 32 : handles.emplace_back(
430 32 : ld_network_service->post<gkfs::rpc::read_data>(endp, in));
431 :
432 32 : LOG(DEBUG,
433 : "host: {}, path: {}, chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
434 : target, path, chnk_start, chnk_end, in.chunk_n(),
435 32 : total_chunk_size, in.offset());
436 :
437 64 : LOG(TRACE_READS,
438 : "read {} host: {}, path: {}, chunk_start: {}, chunk_end: {}",
439 32 : CTX->get_hostname(), target, path, chnk_start, chnk_end);
440 :
441 :
442 0 : } catch(const std::exception& ex) {
443 0 : LOG(ERROR,
444 : "Unable to send non-blocking rpc for path \"{}\" "
445 : "[peer: {}]",
446 0 : path, target);
447 0 : return make_pair(EBUSY, 0);
448 : }
449 : }
450 :
451 : // Wait for RPC responses and then get response and add it to out_size
452 : // which is the read size. All potential outputs are served to free
453 : // resources regardless of errors, although an errorcode is set.
454 32 : auto err = 0;
455 32 : ssize_t out_size = 0;
456 32 : std::size_t idx = 0;
457 :
458 64 : for(const auto& h : handles) {
459 32 : try {
460 : // XXX We might need a timeout here to not wait forever for an
461 : // output that never comes?
462 64 : auto out = h.get().at(0);
463 :
464 32 : if(out.err() != 0) {
465 0 : LOG(ERROR, "Daemon reported error: {}", out.err());
466 32 : err = out.err();
467 : }
468 :
469 32 : out_size += static_cast<size_t>(out.io_size());
470 :
471 0 : } catch(const std::exception& ex) {
472 0 : LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
473 0 : path, targets[idx]);
474 0 : err = EIO;
475 : // We should get targets[idx] and remove from the list of peers
476 0 : failed.insert(targets[idx]);
477 : // Then repeat the read with another peer (We repear the full
478 : // read, this can be optimised but it is a cornercase)
479 : }
480 32 : idx++;
481 : }
482 :
483 :
484 : /*
485 : * Typically file systems return the size even if only a part of it was
486 : * read. In our case, we do not keep track which daemon fully read its
487 : * workload. Thus, we always return size 0 on error.
488 : */
489 32 : if(err)
490 0 : return make_pair(err, 0);
491 : else
492 32 : return make_pair(0, out_size);
493 : }
494 :
495 : /**
496 : * Send an RPC request to truncate a file to given new size
497 : * @param path
498 : * @param current_size
499 : * @param new_size
500 : * @param num_copies Number of replicas
501 : * @return error code
502 : */
503 : int
504 3 : forward_truncate(const std::string& path, size_t current_size, size_t new_size,
505 : const int8_t num_copies) {
506 :
507 : // import pow2-optimized arithmetic functions
508 3 : using namespace gkfs::utils::arithmetic;
509 :
510 3 : assert(current_size > new_size);
511 :
512 : // Find out which data servers need to delete data chunks in order to
513 : // contact only them
514 3 : const unsigned int chunk_start =
515 3 : block_index(new_size, gkfs::config::rpc::chunksize);
516 3 : const unsigned int chunk_end = block_index(current_size - new_size - 1,
517 3 : gkfs::config::rpc::chunksize);
518 :
519 3 : std::unordered_set<unsigned int> hosts;
520 18 : for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end;
521 : ++chunk_id) {
522 30 : for(auto copy = 0; copy < (num_copies + 1); ++copy) {
523 30 : hosts.insert(CTX->distributor()->locate_data(path, chunk_id, copy));
524 : }
525 : }
526 :
527 6 : std::vector<hermes::rpc_handle<gkfs::rpc::trunc_data>> handles;
528 :
529 3 : auto err = 0;
530 :
531 5 : for(const auto& host : hosts) {
532 :
533 4 : auto endp = CTX->hosts().at(host);
534 :
535 2 : try {
536 2 : LOG(DEBUG, "Sending RPC ...");
537 :
538 4 : gkfs::rpc::trunc_data::input in(path, new_size);
539 :
540 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
541 : // that we can retry for RPC_TRIES (see old commits with margo)
542 : // TODO(amiranda): hermes will eventually provide a
543 : // post(endpoint) returning one result and a
544 : // broadcast(endpoint_set) returning a result_set. When that
545 : // happens we can remove the .at(0) :/
546 2 : handles.emplace_back(
547 2 : ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));
548 :
549 0 : } catch(const std::exception& ex) {
550 : // TODO(amiranda): we should cancel all previously posted
551 : // requests here, unfortunately, Hermes does not support it yet
552 : // :/
553 0 : LOG(ERROR, "Failed to send request to host: {}", host);
554 0 : err = EIO;
555 0 : break; // We need to gather all responses so we can't return
556 : // here
557 : }
558 : }
559 :
560 : // Wait for RPC responses and then get response
561 5 : for(const auto& h : handles) {
562 2 : try {
563 : // XXX We might need a timeout here to not wait forever for an
564 : // output that never comes?
565 4 : auto out = h.get().at(0);
566 :
567 2 : if(out.err()) {
568 0 : LOG(ERROR, "received error response: {}", out.err());
569 : err = EIO;
570 : }
571 0 : } catch(const std::exception& ex) {
572 0 : LOG(ERROR, "while getting rpc output");
573 0 : err = EIO;
574 : }
575 : }
576 6 : return err ? err : 0;
577 : }
578 :
579 : /**
580 : * Send an RPC request to chunk stat all hosts
581 : * @return pair<error code, rpc::ChunkStat>
582 : */
583 : pair<int, ChunkStat>
584 2 : forward_get_chunk_stat() {
585 :
586 4 : std::vector<hermes::rpc_handle<gkfs::rpc::chunk_stat>> handles;
587 :
588 2 : auto err = 0;
589 :
590 4 : for(const auto& endp : CTX->hosts()) {
591 2 : try {
592 4 : LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
593 :
594 2 : gkfs::rpc::chunk_stat::input in(0);
595 :
596 : // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
597 : // that we can retry for RPC_TRIES (see old commits with margo)
598 : // TODO(amiranda): hermes will eventually provide a
599 : // post(endpoint) returning one result and a
600 : // broadcast(endpoint_set) returning a result_set. When that
601 : // happens we can remove the .at(0) :/
602 2 : handles.emplace_back(
603 2 : ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));
604 :
605 0 : } catch(const std::exception& ex) {
606 : // TODO(amiranda): we should cancel all previously posted
607 : // requests here, unfortunately, Hermes does not support it yet
608 : // :/
609 0 : LOG(ERROR, "Failed to send request to host: {}", endp.to_string());
610 0 : err = EBUSY;
611 0 : break; // We need to gather all responses so we can't return
612 : // here
613 : }
614 : }
615 :
616 2 : unsigned long chunk_size = gkfs::config::rpc::chunksize;
617 2 : unsigned long chunk_total = 0;
618 2 : unsigned long chunk_free = 0;
619 :
620 : // wait for RPC responses
621 4 : for(std::size_t i = 0; i < handles.size(); ++i) {
622 :
623 2 : gkfs::rpc::chunk_stat::output out{};
624 :
625 2 : try {
626 : // XXX We might need a timeout here to not wait forever for an
627 : // output that never comes?
628 4 : out = handles[i].get().at(0);
629 :
630 2 : if(out.err()) {
631 0 : err = out.err();
632 0 : LOG(ERROR,
633 : "Host '{}' reported err code '{}' during stat chunk.",
634 0 : CTX->hosts().at(i).to_string(), err);
635 : // we don't break here to ensure all responses are processed
636 0 : continue;
637 : }
638 2 : assert(out.chunk_size() == chunk_size);
639 2 : chunk_total += out.chunk_total();
640 2 : chunk_free += out.chunk_free();
641 0 : } catch(const std::exception& ex) {
642 0 : LOG(ERROR, "Failed to get RPC output from host: {}", i);
643 : // Avoid setting err if a server fails.
644 : // err = EBUSY;
645 : }
646 : }
647 :
648 2 : if(err)
649 0 : return make_pair(err, ChunkStat{});
650 : else
651 2 : return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free});
652 : }
653 :
654 : } // namespace gkfs::rpc
|