LCOV - code coverage report
Current view: top level - src/client/rpc - forward_data.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 188 240 78.3 %
Date: 2024-04-30 13:21:35 Functions: 4 4 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS' POSIX interface.
      12             : 
      13             :   GekkoFS' POSIX interface is free software: you can redistribute it and/or
      14             :   modify it under the terms of the GNU Lesser General Public License as
      15             :   published by the Free Software Foundation, either version 3 of the License,
      16             :   or (at your option) any later version.
      17             : 
      18             :   GekkoFS' POSIX interface is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU Lesser General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU Lesser General Public License
      24             :   along with GekkoFS' POSIX interface.  If not, see
      25             :   <https://www.gnu.org/licenses/>.
      26             : 
      27             :   SPDX-License-Identifier: LGPL-3.0-or-later
      28             : */
      29             : 
      30             : #include <client/preload_util.hpp>
      31             : #include <client/rpc/forward_data.hpp>
      32             : #include <client/rpc/rpc_types.hpp>
      33             : #include <client/logging.hpp>
      34             : 
      35             : #include <common/rpc/distributor.hpp>
      36             : #include <common/arithmetic/arithmetic.hpp>
      37             : #include <common/rpc/rpc_util.hpp>
      38             : 
      39             : #include <unordered_set>
      40             : 
      41             : using namespace std;
      42             : 
      43             : namespace gkfs::rpc {
      44             : 
      45             : /*
      46             :  * This file includes all data RPC calls.
      47             :  * NOTE: No errno is defined here!
      48             :  */
      49             : 
      50             : /**
      51             :  * Send an RPC request to write from a buffer.
      52             :  * There is a bitset of 1024 chunks to tell the server
      53             :  * which chunks to process. Exceeding this value will work without
      54             :  * replication. Another way is to leverage mercury segments.
      55             :  * TODO: Decide how to manage a write to a replica that doesn't exist
      56             :  * @param path
      57             :  * @param buf
      58             :  * @param append_flag
      59             :  * @param write_size
      60             :  * @param num_copies number of replicas
      61             :  * @return pair<error code, written size>
      62             :  */
      63             : pair<int, ssize_t>
      64          41 : forward_write(const string& path, const void* buf, const off64_t offset,
      65             :               const size_t write_size, const int8_t num_copies) {
      66             : 
      67             :     // import pow2-optimized arithmetic functions
      68          41 :     using namespace gkfs::utils::arithmetic;
      69             : 
      70          41 :     assert(write_size > 0);
      71             : 
      72             :     // Calculate chunkid boundaries and numbers so that daemons know in
      73             :     // which interval to look for chunks
      74          41 :     auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
      75          41 :     auto chnk_end = block_index((offset + write_size) - 1,
      76          41 :                                 gkfs::config::rpc::chunksize);
      77             : 
      78          41 :     auto chnk_total = (chnk_end - chnk_start) + 1;
      79             : 
      80             :     // Collect all chunk ids within count that have the same destination so
      81             :     // that those are send in one rpc bulk transfer
      82          82 :     std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
      83             : 
      84             :     // contains the target ids, used to access the target_chnks map.
      85             :     // First idx is chunk with potential offset
      86          82 :     std::vector<uint64_t> targets{};
      87             : 
      88             :     // targets for the first and last chunk as they need special treatment
      89             :     // We need a set to manage replicas.
      90          82 :     std::set<uint64_t> chnk_start_target{};
      91          41 :     std::set<uint64_t> chnk_end_target{};
      92             : 
      93          82 :     std::unordered_map<uint64_t, std::vector<uint8_t>> write_ops_vect;
      94             : 
      95             :     // If num_copies is 0, we do the normal write operation. Otherwise
      96             :     // we process all the replicas.
      97         141 :     for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
      98         300 :         for(auto copy = num_copies ? 1 : 0; copy < num_copies + 1; copy++) {
      99         100 :             auto target = CTX->distributor()->locate_data(path, chnk_id, copy);
     100             : 
     101         100 :             if(write_ops_vect.find(target) == write_ops_vect.end())
     102          41 :                 write_ops_vect[target] =
     103          82 :                         std::vector<uint8_t>(((chnk_total + 7) / 8));
     104         100 :             gkfs::rpc::set_bitset(write_ops_vect[target], chnk_id - chnk_start);
     105             : 
     106         100 :             if(target_chnks.count(target) == 0) {
     107          41 :                 target_chnks.insert(
     108          41 :                         std::make_pair(target, std::vector<uint64_t>{chnk_id}));
     109          41 :                 targets.push_back(target);
     110             :             } else {
     111          59 :                 target_chnks[target].push_back(chnk_id);
     112             :             }
     113             : 
     114             :             // set first and last chnk targets
     115         100 :             if(chnk_id == chnk_start) {
     116          41 :                 chnk_start_target.insert(target);
     117             :             }
     118             : 
     119         100 :             if(chnk_id == chnk_end) {
     120          41 :                 chnk_end_target.insert(target);
     121             :             }
     122             :         }
     123             :     }
     124             : 
     125             :     // some helper variables for async RPC
     126          41 :     std::vector<hermes::mutable_buffer> bufseq{
     127             :             hermes::mutable_buffer{const_cast<void*>(buf), write_size},
     128          82 :     };
     129             : 
     130             :     // expose user buffers so that they can serve as RDMA data sources
     131             :     // (these are automatically "unexposed" when the destructor is called)
     132          82 :     hermes::exposed_memory local_buffers;
     133             : 
     134          41 :     try {
     135          82 :         local_buffers = ld_network_service->expose(
     136          41 :                 bufseq, hermes::access_mode::read_only);
     137             : 
     138           0 :     } catch(const std::exception& ex) {
     139           0 :         LOG(ERROR, "Failed to expose buffers for RMA");
     140           0 :         return make_pair(EBUSY, 0);
     141             :     }
     142             : 
     143          82 :     std::vector<hermes::rpc_handle<gkfs::rpc::write_data>> handles;
     144             : 
     145             :     // Issue non-blocking RPC requests and wait for the result later
     146             :     //
     147             :     // TODO(amiranda): This could be simplified by adding a vector of inputs
     148             :     // to async_engine::broadcast(). This would allow us to avoid manually
     149             :     // looping over handles as we do below
     150          82 :     for(const auto& target : targets) {
     151             : 
     152             :         // total chunk_size for target
     153          41 :         auto total_chunk_size =
     154          41 :                 target_chnks[target].size() * gkfs::config::rpc::chunksize;
     155             : 
     156             :         // receiver of first chunk must subtract the offset from first chunk
     157          41 :         if(chnk_start_target.end() != chnk_start_target.find(target)) {
     158          41 :             total_chunk_size -=
     159          41 :                     block_overrun(offset, gkfs::config::rpc::chunksize);
     160             :         }
     161             : 
     162             :         // receiver of last chunk must subtract
     163          82 :         if(chnk_end_target.end() != chnk_end_target.find(target) &&
     164          41 :            !is_aligned(offset + write_size, gkfs::config::rpc::chunksize)) {
     165          39 :             total_chunk_size -= block_underrun(offset + write_size,
     166             :                                                gkfs::config::rpc::chunksize);
     167             :         }
     168             : 
     169          82 :         auto endp = CTX->hosts().at(target);
     170             : 
     171          41 :         try {
     172          41 :             LOG(DEBUG, "Sending RPC ...");
     173             : 
     174          41 :             gkfs::rpc::write_data::input in(
     175             :                     path,
     176             :                     // first offset in targets is the chunk with
     177             :                     // a potential offset
     178          41 :                     block_overrun(offset, gkfs::config::rpc::chunksize), target,
     179          41 :                     CTX->hosts().size(),
     180             :                     // number of chunks handled by that destination
     181          41 :                     gkfs::rpc::compress_bitset(write_ops_vect[target]),
     182          41 :                     target_chnks[target].size(),
     183             :                     // chunk start id of this write
     184             :                     chnk_start,
     185             :                     // chunk end id of this write
     186             :                     chnk_end,
     187             :                     // total size to write
     188          82 :                     total_chunk_size, local_buffers);
     189             : 
     190             :             // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
     191             :             // we can retry for RPC_TRIES (see old commits with margo)
     192             :             // TODO(amiranda): hermes will eventually provide a post(endpoint)
     193             :             // returning one result and a broadcast(endpoint_set) returning a
     194             :             // result_set. When that happens we can remove the .at(0) :/
     195          41 :             handles.emplace_back(
     196          41 :                     ld_network_service->post<gkfs::rpc::write_data>(endp, in));
     197             : 
     198          41 :             LOG(DEBUG,
     199             :                 "host: {}, path: \"{}\", chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
     200             :                 target, path, chnk_start, chnk_end, in.chunk_n(),
     201          41 :                 total_chunk_size, in.offset());
     202           0 :         } catch(const std::exception& ex) {
     203           0 :             LOG(ERROR,
     204             :                 "Unable to send non-blocking rpc for "
     205             :                 "path \"{}\" [peer: {}]",
     206           0 :                 path, target);
     207           0 :             if(num_copies == 0)
     208           0 :                 return make_pair(EBUSY, 0);
     209             :         }
     210             :     }
     211             : 
     212          41 :     auto err = 0;
     213          41 :     ssize_t out_size = 0;
     214          41 :     std::size_t idx = 0;
     215             : #ifdef REPLICA_CHECK
     216             :     std::vector<uint8_t> fill(chnk_total);
     217             :     auto write_ops = write_ops_vect.begin();
     218             : #endif
     219          82 :     for(const auto& h : handles) {
     220          41 :         try {
     221             :             // XXX We might need a timeout here to not wait forever for an
     222             :             // output that never comes?
     223          82 :             auto out = h.get().at(0);
     224             : 
     225          41 :             if(out.err() != 0) {
     226           0 :                 LOG(ERROR, "Daemon reported error: {}", out.err());
     227          41 :                 err = out.err();
     228             :             } else {
     229          41 :                 out_size += static_cast<size_t>(out.io_size());
     230             : #ifdef REPLICA_CHECK
     231             :                 if(num_copies) {
     232             :                     if(fill.size() == 0) {
     233             :                         fill = write_ops->second;
     234             :                     } else {
     235             :                         for(size_t i = 0; i < fill.size(); i++) {
     236             :                             fill[i] |= write_ops->second[i];
     237             :                         }
     238             :                     }
     239             :                 }
     240             :                 write_ops++;
     241             : #endif
     242             :             }
     243           0 :         } catch(const std::exception& ex) {
     244           0 :             LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
     245           0 :                 path, targets[idx]);
     246           0 :             err = EIO;
     247             :         }
     248          41 :         idx++;
     249             :     }
     250             :     // As servers can fail (and we cannot know if the total data is written), we
     251             :     // send the updated size but check that at least one copy of all chunks are
     252             :     // processed.
     253          41 :     if(num_copies) {
     254             :         // A bit-wise or should show that all the chunks are written (255)
     255           0 :         out_size = write_size;
     256             : #ifdef REPLICA_CHECK
     257             :         for(size_t i = 0; i < fill.size() - 1; i++) {
     258             :             if(fill[i] != 255) {
     259             :                 err = EIO;
     260             :                 break;
     261             :             }
     262             :         }
     263             :         // Process the leftover bytes
     264             :         for(uint64_t chnk_id = (chnk_start + (fill.size() - 1) * 8);
     265             :             chnk_id <= chnk_end; chnk_id++) {
     266             :             if(!(fill[(chnk_id - chnk_start) / 8] &
     267             :                  (1 << ((chnk_id - chnk_start) % 8)))) {
     268             :                 err = EIO;
     269             :                 break;
     270             :             }
     271             :         }
     272             : #endif
     273             :     }
     274             :     /*
     275             :      * Typically file systems return the size even if only a part of it was
     276             :      * written. In our case, we do not keep track which daemon fully wrote its
     277             :      * workload. Thus, we always return size 0 on error.
     278             :      */
     279          41 :     if(err)
     280           0 :         return make_pair(err, 0);
     281             :     else
     282          41 :         return make_pair(0, out_size);
     283             : }
     284             : 
     285             : /**
     286             :  * Send an RPC request to read to a buffer.
     287             :  * @param path
     288             :  * @param buf
     289             :  * @param offset
     290             :  * @param read_size
     291             :  * @param num_copies number of copies available (0 is no replication)
     292             :  * @param failed nodes failed that should not be used
     293             :  * @return pair<error code, read size>
     294             :  */
     295             : pair<int, ssize_t>
     296          28 : forward_read(const string& path, void* buf, const off64_t offset,
     297             :              const size_t read_size, const int8_t num_copies,
     298             :              std::set<int8_t>& failed) {
     299             : 
     300             :     // import pow2-optimized arithmetic functions
     301          28 :     using namespace gkfs::utils::arithmetic;
     302             : 
     303             :     // Calculate chunkid boundaries and numbers so that daemons know in which
     304             :     // interval to look for chunks
     305          28 :     auto chnk_start = block_index(offset, gkfs::config::rpc::chunksize);
     306          28 :     auto chnk_end =
     307          28 :             block_index((offset + read_size - 1), gkfs::config::rpc::chunksize);
     308          28 :     auto chnk_total = (chnk_end - chnk_start) + 1;
     309             :     // Collect all chunk ids within count that have the same destination so
     310             :     // that those are send in one rpc bulk transfer
     311          56 :     std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
     312             : 
     313             :     // contains the recipient ids, used to access the target_chnks map.
     314             :     // First idx is chunk with potential offset
     315          56 :     std::vector<uint64_t> targets{};
     316             :     // targets for the first and last chunk as they need special treatment
     317          28 :     uint64_t chnk_start_target = 0;
     318          28 :     uint64_t chnk_end_target = 0;
     319          56 :     std::unordered_map<uint64_t, std::vector<uint8_t>> read_bitset_vect;
     320             : 
     321         102 :     for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
     322          74 :         auto target = CTX->distributor()->locate_data(path, chnk_id, 0);
     323          74 :         if(num_copies > 0) {
     324             :             // If we have some failures we select another copy (randomly).
     325           0 :             while(failed.find(target) != failed.end()) {
     326           0 :                 LOG(DEBUG, "Selecting another node, target: {} down", target);
     327           0 :                 target = CTX->distributor()->locate_data(path, chnk_id,
     328           0 :                                                          rand() % num_copies);
     329             :             }
     330             :         }
     331             : 
     332          74 :         if(read_bitset_vect.find(target) == read_bitset_vect.end())
     333          28 :             read_bitset_vect[target] =
     334          56 :                     std::vector<uint8_t>(((chnk_total + 7) / 8));
     335          74 :         read_bitset_vect[target][(chnk_id - chnk_start) / 8] |=
     336          74 :                 1 << ((chnk_id - chnk_start) % 8); // set
     337             : 
     338          74 :         if(target_chnks.count(target) == 0) {
     339          28 :             target_chnks.insert(
     340          28 :                     std::make_pair(target, std::vector<uint64_t>{chnk_id}));
     341          28 :             targets.push_back(target);
     342             :         } else {
     343          46 :             target_chnks[target].push_back(chnk_id);
     344             :         }
     345             : 
     346             :         // set first and last chnk targets
     347          74 :         if(chnk_id == chnk_start) {
     348          28 :             chnk_start_target = target;
     349             :         }
     350             : 
     351          74 :         if(chnk_id == chnk_end) {
     352          28 :             chnk_end_target = target;
     353             :         }
     354             :     }
     355             : 
     356             :     // some helper variables for async RPCs
     357          28 :     std::vector<hermes::mutable_buffer> bufseq{
     358             :             hermes::mutable_buffer{buf, read_size},
     359          56 :     };
     360             : 
     361             :     // expose user buffers so that they can serve as RDMA data targets
     362             :     // (these are automatically "unexposed" when the destructor is called)
     363          56 :     hermes::exposed_memory local_buffers;
     364             : 
     365          28 :     try {
     366          56 :         local_buffers = ld_network_service->expose(
     367          28 :                 bufseq, hermes::access_mode::write_only);
     368             : 
     369           0 :     } catch(const std::exception& ex) {
     370           0 :         LOG(ERROR, "Failed to expose buffers for RMA");
     371           0 :         return make_pair(EBUSY, 0);
     372             :     }
     373             : 
     374          56 :     std::vector<hermes::rpc_handle<gkfs::rpc::read_data>> handles;
     375             : 
     376             :     // Issue non-blocking RPC requests and wait for the result later
     377             :     //
     378             :     // TODO(amiranda): This could be simplified by adding a vector of inputs
     379             :     // to async_engine::broadcast(). This would allow us to avoid manually
     380             :     // looping over handles as we do below
     381             : 
     382          56 :     for(const auto& target : targets) {
     383             : 
     384             :         // total chunk_size for target
     385          28 :         auto total_chunk_size =
     386          28 :                 target_chnks[target].size() * gkfs::config::rpc::chunksize;
     387             : 
     388             :         // receiver of first chunk must subtract the offset from first chunk
     389          28 :         if(target == chnk_start_target) {
     390          28 :             total_chunk_size -=
     391          28 :                     block_overrun(offset, gkfs::config::rpc::chunksize);
     392             :         }
     393             : 
     394             :         // receiver of last chunk must subtract
     395          56 :         if(target == chnk_end_target &&
     396          28 :            !is_aligned(offset + read_size, gkfs::config::rpc::chunksize)) {
     397          26 :             total_chunk_size -= block_underrun(offset + read_size,
     398             :                                                gkfs::config::rpc::chunksize);
     399             :         }
     400             : 
     401          56 :         auto endp = CTX->hosts().at(target);
     402             : 
     403          28 :         try {
     404             : 
     405          28 :             LOG(DEBUG, "Sending RPC ...");
     406             : 
     407          28 :             gkfs::rpc::read_data::input in(
     408             :                     path,
     409             :                     // first offset in targets is the chunk with
     410             :                     // a potential offset
     411          28 :                     block_overrun(offset, gkfs::config::rpc::chunksize), target,
     412          28 :                     CTX->hosts().size(),
     413          28 :                     gkfs::rpc::compress_bitset(read_bitset_vect[target]),
     414             :                     // number of chunks handled by that destination
     415          28 :                     target_chnks[target].size(),
     416             :                     // chunk start id of this write
     417             :                     chnk_start,
     418             :                     // chunk end id of this write
     419             :                     chnk_end,
     420             :                     // total size to write
     421          56 :                     total_chunk_size, local_buffers);
     422             : 
     423             :             // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
     424             :             // that we can retry for RPC_TRIES (see old commits with margo)
     425             :             // TODO(amiranda): hermes will eventually provide a
     426             :             // post(endpoint) returning one result and a
     427             :             // broadcast(endpoint_set) returning a result_set. When that
     428             :             // happens we can remove the .at(0) :/
     429          28 :             handles.emplace_back(
     430          28 :                     ld_network_service->post<gkfs::rpc::read_data>(endp, in));
     431             : 
     432          28 :             LOG(DEBUG,
     433             :                 "host: {}, path: {}, chunk_start: {}, chunk_end: {}, chunks: {}, size: {}, offset: {}",
     434             :                 target, path, chnk_start, chnk_end, in.chunk_n(),
     435          28 :                 total_chunk_size, in.offset());
     436             : 
     437          56 :             LOG(TRACE_READS,
     438             :                 "read {} host: {}, path: {}, chunk_start: {}, chunk_end: {}",
     439          28 :                 CTX->get_hostname(), target, path, chnk_start, chnk_end);
     440             : 
     441             : 
     442           0 :         } catch(const std::exception& ex) {
     443           0 :             LOG(ERROR,
     444             :                 "Unable to send non-blocking rpc for path \"{}\" "
     445             :                 "[peer: {}]",
     446           0 :                 path, target);
     447           0 :             return make_pair(EBUSY, 0);
     448             :         }
     449             :     }
     450             : 
     451             :     // Wait for RPC responses and then get response and add it to out_size
     452             :     // which is the read size. All potential outputs are served to free
     453             :     // resources regardless of errors, although an errorcode is set.
     454          28 :     auto err = 0;
     455          28 :     ssize_t out_size = 0;
     456          28 :     std::size_t idx = 0;
     457             : 
     458          56 :     for(const auto& h : handles) {
     459          28 :         try {
     460             :             // XXX We might need a timeout here to not wait forever for an
     461             :             // output that never comes?
     462          56 :             auto out = h.get().at(0);
     463             : 
     464          28 :             if(out.err() != 0) {
     465           0 :                 LOG(ERROR, "Daemon reported error: {}", out.err());
     466          28 :                 err = out.err();
     467             :             }
     468             : 
     469          28 :             out_size += static_cast<size_t>(out.io_size());
     470             : 
     471           0 :         } catch(const std::exception& ex) {
     472           0 :             LOG(ERROR, "Failed to get rpc output for path \"{}\" [peer: {}]",
     473           0 :                 path, targets[idx]);
     474           0 :             err = EIO;
     475             :             // We should get targets[idx] and remove from the list of peers
     476           0 :             failed.insert(targets[idx]);
     477             :             // Then repeat the read with another peer (We repear the full
     478             :             // read, this can be optimised but it is a cornercase)
     479             :         }
     480          28 :         idx++;
     481             :     }
     482             : 
     483             : 
     484             :     /*
     485             :      * Typically file systems return the size even if only a part of it was
     486             :      * read. In our case, we do not keep track which daemon fully read its
     487             :      * workload. Thus, we always return size 0 on error.
     488             :      */
     489          28 :     if(err)
     490           0 :         return make_pair(err, 0);
     491             :     else
     492          28 :         return make_pair(0, out_size);
     493             : }
     494             : 
     495             : /**
     496             :  * Send an RPC request to truncate a file to given new size
     497             :  * @param path
     498             :  * @param current_size
     499             :  * @param new_size
     500             :  * @param num_copies Number of replicas
     501             :  * @return error code
     502             :  */
     503             : int
     504           3 : forward_truncate(const std::string& path, size_t current_size, size_t new_size,
     505             :                  const int8_t num_copies) {
     506             : 
     507             :     // import pow2-optimized arithmetic functions
     508           3 :     using namespace gkfs::utils::arithmetic;
     509             : 
     510           3 :     assert(current_size > new_size);
     511             : 
     512             :     // Find out which data servers need to delete data chunks in order to
     513             :     // contact only them
     514           3 :     const unsigned int chunk_start =
     515           3 :             block_index(new_size, gkfs::config::rpc::chunksize);
     516           3 :     const unsigned int chunk_end = block_index(current_size - new_size - 1,
     517           3 :                                                gkfs::config::rpc::chunksize);
     518             : 
     519           3 :     std::unordered_set<unsigned int> hosts;
     520          18 :     for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end;
     521             :         ++chunk_id) {
     522          30 :         for(auto copy = 0; copy < (num_copies + 1); ++copy) {
     523          30 :             hosts.insert(CTX->distributor()->locate_data(path, chunk_id, copy));
     524             :         }
     525             :     }
     526             : 
     527           6 :     std::vector<hermes::rpc_handle<gkfs::rpc::trunc_data>> handles;
     528             : 
     529           3 :     auto err = 0;
     530             : 
     531           5 :     for(const auto& host : hosts) {
     532             : 
     533           4 :         auto endp = CTX->hosts().at(host);
     534             : 
     535           2 :         try {
     536           2 :             LOG(DEBUG, "Sending RPC ...");
     537             : 
     538           4 :             gkfs::rpc::trunc_data::input in(path, new_size);
     539             : 
     540             :             // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
     541             :             // that we can retry for RPC_TRIES (see old commits with margo)
     542             :             // TODO(amiranda): hermes will eventually provide a
     543             :             // post(endpoint) returning one result and a
     544             :             // broadcast(endpoint_set) returning a result_set. When that
     545             :             // happens we can remove the .at(0) :/
     546           2 :             handles.emplace_back(
     547           2 :                     ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));
     548             : 
     549           0 :         } catch(const std::exception& ex) {
     550             :             // TODO(amiranda): we should cancel all previously posted
     551             :             // requests here, unfortunately, Hermes does not support it yet
     552             :             // :/
     553           0 :             LOG(ERROR, "Failed to send request to host: {}", host);
     554           0 :             err = EIO;
     555           0 :             break; // We need to gather all responses so we can't return
     556             :                    // here
     557             :         }
     558             :     }
     559             : 
     560             :     // Wait for RPC responses and then get response
     561           5 :     for(const auto& h : handles) {
     562           2 :         try {
     563             :             // XXX We might need a timeout here to not wait forever for an
     564             :             // output that never comes?
     565           4 :             auto out = h.get().at(0);
     566             : 
     567           2 :             if(out.err()) {
     568           0 :                 LOG(ERROR, "received error response: {}", out.err());
     569             :                 err = EIO;
     570             :             }
     571           0 :         } catch(const std::exception& ex) {
     572           0 :             LOG(ERROR, "while getting rpc output");
     573           0 :             err = EIO;
     574             :         }
     575             :     }
     576           6 :     return err ? err : 0;
     577             : }
     578             : 
     579             : /**
     580             :  * Send an RPC request to chunk stat all hosts
     581             :  * @return pair<error code, rpc::ChunkStat>
     582             :  */
     583             : pair<int, ChunkStat>
     584           2 : forward_get_chunk_stat() {
     585             : 
     586           4 :     std::vector<hermes::rpc_handle<gkfs::rpc::chunk_stat>> handles;
     587             : 
     588           2 :     auto err = 0;
     589             : 
     590           4 :     for(const auto& endp : CTX->hosts()) {
     591           2 :         try {
     592           4 :             LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
     593             : 
     594           2 :             gkfs::rpc::chunk_stat::input in(0);
     595             : 
     596             :             // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
     597             :             // that we can retry for RPC_TRIES (see old commits with margo)
     598             :             // TODO(amiranda): hermes will eventually provide a
     599             :             // post(endpoint) returning one result and a
     600             :             // broadcast(endpoint_set) returning a result_set. When that
     601             :             // happens we can remove the .at(0) :/
     602           2 :             handles.emplace_back(
     603           2 :                     ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));
     604             : 
     605           0 :         } catch(const std::exception& ex) {
     606             :             // TODO(amiranda): we should cancel all previously posted
     607             :             // requests here, unfortunately, Hermes does not support it yet
     608             :             // :/
     609           0 :             LOG(ERROR, "Failed to send request to host: {}", endp.to_string());
     610           0 :             err = EBUSY;
     611           0 :             break; // We need to gather all responses so we can't return
     612             :                    // here
     613             :         }
     614             :     }
     615             : 
     616           2 :     unsigned long chunk_size = gkfs::config::rpc::chunksize;
     617           2 :     unsigned long chunk_total = 0;
     618           2 :     unsigned long chunk_free = 0;
     619             : 
     620             :     // wait for RPC responses
     621           4 :     for(std::size_t i = 0; i < handles.size(); ++i) {
     622             : 
     623           2 :         gkfs::rpc::chunk_stat::output out{};
     624             : 
     625           2 :         try {
     626             :             // XXX We might need a timeout here to not wait forever for an
     627             :             // output that never comes?
     628           4 :             out = handles[i].get().at(0);
     629             : 
     630           2 :             if(out.err()) {
     631           0 :                 err = out.err();
     632           0 :                 LOG(ERROR,
     633             :                     "Host '{}' reported err code '{}' during stat chunk.",
     634           0 :                     CTX->hosts().at(i).to_string(), err);
     635             :                 // we don't break here to ensure all responses are processed
     636           0 :                 continue;
     637             :             }
     638           2 :             assert(out.chunk_size() == chunk_size);
     639           2 :             chunk_total += out.chunk_total();
     640           2 :             chunk_free += out.chunk_free();
     641           0 :         } catch(const std::exception& ex) {
     642           0 :             LOG(ERROR, "Failed to get RPC output from host: {}", i);
     643             :             // Avoid setting err if a server fails.
     644             :             // err = EBUSY;
     645             :         }
     646             :     }
     647             : 
     648           2 :     if(err)
     649           0 :         return make_pair(err, ChunkStat{});
     650             :     else
     651           2 :         return make_pair(0, ChunkStat{chunk_size, chunk_total, chunk_free});
     652             : }
     653             : 
     654             : } // namespace gkfs::rpc

Generated by: LCOV version 1.16