LCOV - code coverage report
Current view: top level - src/client/rpc - forward_metadata.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 298 404 73.8 %
Date: 2024-04-30 13:21:35 Functions: 10 11 90.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS' POSIX interface.
      12             : 
      13             :   GekkoFS' POSIX interface is free software: you can redistribute it and/or
      14             :   modify it under the terms of the GNU Lesser General Public License as
      15             :   published by the Free Software Foundation, either version 3 of the License,
      16             :   or (at your option) any later version.
      17             : 
      18             :   GekkoFS' POSIX interface is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU Lesser General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU Lesser General Public License
      24             :   along with GekkoFS' POSIX interface.  If not, see
      25             :   <https://www.gnu.org/licenses/>.
      26             : 
      27             :   SPDX-License-Identifier: LGPL-3.0-or-later
      28             : */
      29             : 
      30             : #include <client/rpc/forward_metadata.hpp>
      31             : #include <client/preload.hpp>
      32             : #include <client/logging.hpp>
      33             : #include <client/preload_util.hpp>
      34             : #include <client/open_dir.hpp>
      35             : #include <client/rpc/rpc_types.hpp>
      36             : 
      37             : #include <common/rpc/rpc_util.hpp>
      38             : #include <common/rpc/distributor.hpp>
      39             : #include <common/rpc/rpc_types.hpp>
      40             : 
      41             : using namespace std;
      42             : 
      43             : namespace gkfs::rpc {
      44             : 
      45             : /*
      46             :  * This file includes all metadata RPC calls.
      47             :  * NOTE: No errno is defined here!
      48             :  */
      49             : 
      50             : /**
      51             :  * Send an RPC for a create request
      52             :  * @param path
      53             :  * @param mode
      54             :  * @param copy Number of replica to create
      55             :  * @return error code
      56             :  */
      57             : int
      58        1057 : forward_create(const std::string& path, const mode_t mode, const int copy) {
      59             : 
      60        1057 :     auto endp = CTX->hosts().at(
      61        3171 :             CTX->distributor()->locate_file_metadata(path, copy));
      62             : 
      63        1057 :     try {
      64        1057 :         LOG(DEBUG, "Sending RPC ...");
      65             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
      66             :         // can retry for RPC_TRIES (see old commits with margo)
      67             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
      68             :         // returning one result and a broadcast(endpoint_set) returning a
      69             :         // result_set. When that happens we can remove the .at(0) :/
      70        1057 :         auto out = ld_network_service->post<gkfs::rpc::create>(endp, path, mode)
      71        2114 :                            .get()
      72        1057 :                            .at(0);
      73        1057 :         LOG(DEBUG, "Got response success: {}", out.err());
      74             : 
      75        1057 :         return out.err() ? out.err() : 0;
      76           0 :     } catch(const std::exception& ex) {
      77           0 :         LOG(ERROR, "while getting rpc output");
      78           0 :         return EBUSY;
      79             :     }
      80             : }
      81             : 
      82             : /**
      83             :  * Send an RPC for a stat request
      84             :  * @param path
      85             :  * @param attr
      86             :  * @param copy metadata replica to read from
      87             :  * @return error code
      88             :  */
      89             : int
      90        1359 : forward_stat(const std::string& path, string& attr, const int copy) {
      91             : 
      92        1359 :     auto endp = CTX->hosts().at(
      93        4077 :             CTX->distributor()->locate_file_metadata(path, copy));
      94             : 
      95        1359 :     try {
      96        1359 :         LOG(DEBUG, "Sending RPC ...");
      97             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
      98             :         // can retry for RPC_TRIES (see old commits with margo)
      99             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     100             :         // returning one result and a broadcast(endpoint_set) returning a
     101             :         // result_set. When that happens we can remove the .at(0) :/
     102        1359 :         auto out = ld_network_service->post<gkfs::rpc::stat>(endp, path)
     103        2718 :                            .get()
     104        1359 :                            .at(0);
     105        1359 :         LOG(DEBUG, "Got response success: {}", out.err());
     106             : 
     107        1359 :         if(out.err())
     108          24 :             return out.err();
     109             : 
     110        1335 :         attr = out.db_val();
     111           0 :     } catch(const std::exception& ex) {
     112           0 :         LOG(ERROR, "while getting rpc output");
     113           0 :         return EBUSY;
     114             :     }
     115        1335 :     return 0;
     116             : }
     117             : 
     118             : /**
     119             :  * Send an RPC for a remove request. This removes metadata and all data chunks
     120             :  * possible distributed across many daemons. Optimizations are in place for
     121             :  * small files (file_size / chunk_size) < number_of_daemons where no broadcast
     122             :  * to all daemons is used to remove all chunks. Otherwise, a broadcast to all
     123             :  * daemons is used.
     124             :  *
     125             :  * This function only attempts data removal if data exists (determined when
     126             :  * metadata is removed)
     127             :  * @param path
     128             :  * @param num_copies Replication scenarios with many replicas
     129             :  * @return error code
     130             :  */
     131             : int
     132           8 : forward_remove(const std::string& path, const int8_t num_copies) {
     133           8 :     int64_t size = 0;
     134           8 :     uint32_t mode = 0;
     135             : 
     136          16 :     for(auto copy = 0; copy < (num_copies + 1); copy++) {
     137           8 :         auto endp = CTX->hosts().at(
     138          24 :                 CTX->distributor()->locate_file_metadata(path, copy));
     139             : 
     140             :         /*
     141             :          * Send one RPC to metadata destination and remove metadata while
     142             :          * retrieving size and mode to determine if data needs to removed too
     143             :          */
     144           8 :         try {
     145           8 :             LOG(DEBUG, "Sending RPC ...");
     146             :             // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
     147             :             // we can retry for RPC_TRIES (see old commits with margo)
     148             :             // TODO(amiranda): hermes will eventually provide a post(endpoint)
     149             :             // returning one result and a broadcast(endpoint_set) returning a
     150             :             // result_set. When that happens we can remove the .at(0) :/
     151           8 :             auto out = ld_network_service
     152           8 :                                ->post<gkfs::rpc::remove_metadata>(endp, path)
     153          16 :                                .get()
     154           8 :                                .at(0);
     155             : 
     156           8 :             LOG(DEBUG, "Got response success: {}", out.err());
     157             : 
     158           8 :             if(out.err())
     159           0 :                 return out.err();
     160           8 :             size = out.size();
     161           8 :             mode = out.mode();
     162           0 :         } catch(const std::exception& ex) {
     163           0 :             LOG(ERROR, "while getting rpc output");
     164           0 :             return EBUSY;
     165             :         }
     166             :     }
     167             :     // if file is not a regular file and it's size is 0, data does not need to
     168             :     // be removed, thus, we exit
     169           8 :     if(!(S_ISREG(mode) && (size != 0)))
     170             :         return 0;
     171             : 
     172             : 
     173           6 :     std::vector<hermes::rpc_handle<gkfs::rpc::remove_data>> handles;
     174             : 
     175             :     // Small files
     176           3 :     if(static_cast<std::size_t>(size / gkfs::config::rpc::chunksize) <
     177           3 :        CTX->hosts().size()) {
     178           4 :         for(auto copymd = 0; copymd < (num_copies + 1); copymd++) {
     179           2 :             const auto metadata_host_id =
     180           2 :                     CTX->distributor()->locate_file_metadata(path, copymd);
     181           4 :             const auto endp_metadata = CTX->hosts().at(metadata_host_id);
     182             : 
     183           2 :             try {
     184           4 :                 LOG(DEBUG, "Sending RPC to host: {}",
     185           2 :                     endp_metadata.to_string());
     186           4 :                 gkfs::rpc::remove_data::input in(path);
     187           2 :                 handles.emplace_back(
     188           2 :                         ld_network_service->post<gkfs::rpc::remove_data>(
     189           2 :                                 endp_metadata, in));
     190             : 
     191           2 :                 uint64_t chnk_start = 0;
     192           2 :                 uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
     193             : 
     194           4 :                 for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end;
     195             :                     chnk_id++) {
     196           4 :                     for(auto copy = 0; copy < (num_copies + 1); copy++) {
     197           2 :                         const auto chnk_host_id =
     198           4 :                                 CTX->distributor()->locate_data(path, chnk_id,
     199           2 :                                                                 copy);
     200           2 :                         if constexpr(gkfs::config::metadata::
     201             :                                              implicit_data_removal) {
     202             :                             /*
     203             :                              * If the chnk host matches the metadata host the
     204             :                              * remove request as already been sent as part of
     205             :                              * the metadata remove request.
     206             :                              */
     207           2 :                             if(chnk_host_id == metadata_host_id)
     208           2 :                                 continue;
     209             :                         }
     210           0 :                         const auto endp_chnk = CTX->hosts().at(chnk_host_id);
     211             : 
     212           0 :                         LOG(DEBUG, "Sending RPC to host: {}",
     213           0 :                             endp_chnk.to_string());
     214             : 
     215           0 :                         handles.emplace_back(
     216             :                                 ld_network_service
     217           0 :                                         ->post<gkfs::rpc::remove_data>(
     218           0 :                                                 endp_chnk, in));
     219             :                     }
     220             :                 }
     221           0 :             } catch(const std::exception& ex) {
     222           0 :                 LOG(ERROR,
     223           0 :                     "Failed to forward non-blocking rpc request reduced remove requests");
     224           0 :                 return EBUSY;
     225             :             }
     226             :         }
     227             :     } else { // "Big" files
     228           2 :         for(const auto& endp : CTX->hosts()) {
     229           1 :             try {
     230           2 :                 LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
     231             : 
     232           2 :                 gkfs::rpc::remove_data::input in(path);
     233             : 
     234             :                 // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
     235             :                 // that we can retry for RPC_TRIES (see old commits with margo)
     236             :                 // TODO(amiranda): hermes will eventually provide a
     237             :                 // post(endpoint) returning one result and a
     238             :                 // broadcast(endpoint_set) returning a result_set. When that
     239             :                 // happens we can remove the .at(0) :/
     240             : 
     241           1 :                 handles.emplace_back(
     242           1 :                         ld_network_service->post<gkfs::rpc::remove_data>(endp,
     243           1 :                                                                          in));
     244             : 
     245           0 :             } catch(const std::exception& ex) {
     246             :                 // TODO(amiranda): we should cancel all previously posted
     247             :                 // requests here, unfortunately, Hermes does not support it yet
     248             :                 // :/
     249           0 :                 LOG(ERROR,
     250             :                     "Failed to forward non-blocking rpc request to host: {}",
     251           0 :                     endp.to_string());
     252           0 :                 return EBUSY;
     253             :             }
     254             :         }
     255             :     }
     256             :     // wait for RPC responses
     257           3 :     auto err = 0;
     258           6 :     for(const auto& h : handles) {
     259           3 :         try {
     260             :             // XXX We might need a timeout here to not wait forever for an
     261             :             // output that never comes?
     262           6 :             auto out = h.get().at(0);
     263             : 
     264           3 :             if(out.err() != 0) {
     265           0 :                 LOG(ERROR, "received error response: {}", out.err());
     266           3 :                 err = out.err();
     267             :             }
     268           0 :         } catch(const std::exception& ex) {
     269           0 :             LOG(ERROR, "while getting rpc output");
     270           0 :             err = EBUSY;
     271             :         }
     272             :     }
     273           3 :     return err;
     274             : }
     275             : 
     276             : /**
     277             :  * Send an RPC for a decrement file size request. This is for example used
     278             :  * during a truncate() call.
     279             :  * @param path
     280             :  * @param length
     281             :  * @param copy Target replica (0 original)
     282             :  * @return error code
     283             :  */
     284             : int
     285           3 : forward_decr_size(const std::string& path, size_t length, const int copy) {
     286             : 
     287           3 :     auto endp = CTX->hosts().at(
     288           9 :             CTX->distributor()->locate_file_metadata(path, copy));
     289             : 
     290           3 :     try {
     291           3 :         LOG(DEBUG, "Sending RPC ...");
     292             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     293             :         // can retry for RPC_TRIES (see old commits with margo)
     294             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     295             :         // returning one result and a broadcast(endpoint_set) returning a
     296             :         // result_set. When that happens we can remove the .at(0) :/
     297           3 :         auto out = ld_network_service
     298           3 :                            ->post<gkfs::rpc::decr_size>(endp, path, length)
     299           6 :                            .get()
     300           3 :                            .at(0);
     301             : 
     302           3 :         LOG(DEBUG, "Got response success: {}", out.err());
     303             : 
     304           3 :         return out.err() ? out.err() : 0;
     305           0 :     } catch(const std::exception& ex) {
     306           0 :         LOG(ERROR, "while getting rpc output");
     307           0 :         return EBUSY;
     308             :     }
     309             : }
     310             : 
     311             : 
     312             : /**
     313             :  * Send an RPC for an update metadentry request.
     314             :  * NOTE: Currently unused.
     315             :  * @param path
     316             :  * @param md
     317             :  * @param md_flags
     318             :  * @param copy Target replica (0 original)
     319             :  * @return error code
     320             :  */
     321             : int
     322           1 : forward_update_metadentry(const string& path,
     323             :                           const gkfs::metadata::Metadata& md,
     324             :                           const gkfs::metadata::MetadentryUpdateFlags& md_flags,
     325             :                           const int copy) {
     326             : 
     327           1 :     auto endp = CTX->hosts().at(
     328           3 :             CTX->distributor()->locate_file_metadata(path, copy));
     329             : 
     330           1 :     try {
     331           1 :         LOG(DEBUG, "Sending RPC ...");
     332             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     333             :         // can retry for RPC_TRIES (see old commits with margo)
     334             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     335             :         // returning one result and a broadcast(endpoint_set) returning a
     336             :         // result_set. When that happens we can remove the .at(0) :/
     337           1 :         auto out = ld_network_service
     338           2 :                            ->post<gkfs::rpc::update_metadentry>(
     339             :                                    endp, path,
     340           0 :                                    (md_flags.link_count ? md.link_count() : 0),
     341           2 :                                    /* mode */ 0,
     342           2 :                                    /* uid */ 0,
     343           1 :                                    /* gid */ 0, (md_flags.size ? md.size() : 0),
     344           1 :                                    (md_flags.blocks ? md.blocks() : 0),
     345           1 :                                    (md_flags.atime ? md.atime() : 0),
     346           1 :                                    (md_flags.mtime ? md.mtime() : 0),
     347           1 :                                    (md_flags.ctime ? md.ctime() : 0),
     348           1 :                                    bool_to_merc_bool(md_flags.link_count),
     349           2 :                                    /* mode_flag */ false,
     350           1 :                                    bool_to_merc_bool(md_flags.size),
     351           1 :                                    bool_to_merc_bool(md_flags.blocks),
     352           1 :                                    bool_to_merc_bool(md_flags.atime),
     353           1 :                                    bool_to_merc_bool(md_flags.mtime),
     354           2 :                                    bool_to_merc_bool(md_flags.ctime))
     355           2 :                            .get()
     356           1 :                            .at(0);
     357             : 
     358           1 :         LOG(DEBUG, "Got response success: {}", out.err());
     359             : 
     360           1 :         return out.err() ? out.err() : 0;
     361           0 :     } catch(const std::exception& ex) {
     362           0 :         LOG(ERROR, "while getting rpc output");
     363           0 :         return EBUSY;
     364             :     }
     365             : }
     366             : 
     367             : #ifdef HAS_RENAME
     368             : /**
     369             :  * Send an RPC for a rename metadentry request.
     370             :  * Steps.. SetUp a blkcnt of -1
     371             :  * This marks that this file doesn't have to be accessed directly
     372             :  * Create a new md with the new name, which should have as value the old name
     373             :  * All operations should check blockcnt and extract a NOTEXISTS
     374             :  * The operations does not support replication
     375             :  * @param oldpath
     376             :  * @param newpath
     377             :  * @param md
     378             :  *
     379             :  * @return error code
     380             :  */
     381             : int
     382          10 : forward_rename(const string& oldpath, const string& newpath,
     383             :                const gkfs::metadata::Metadata& md) {
     384             : 
     385          10 :     auto endp = CTX->hosts().at(
     386          30 :             CTX->distributor()->locate_file_metadata(oldpath, 0));
     387             : 
     388          10 :     try {
     389          10 :         LOG(DEBUG, "Sending RPC ...");
     390             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     391             :         // can retry for RPC_TRIES (see old commits with margo)
     392             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     393             :         // returning one result and a broadcast(endpoint_set) returning a
     394             :         // result_set. When that happens we can remove the .at(0) :/
     395          10 :         auto out = ld_network_service
     396          20 :                            ->post<gkfs::rpc::update_metadentry>(
     397          30 :                                    endp, oldpath, (md.link_count()),
     398          20 :                                    /* mode */ 0,
     399          20 :                                    /* uid */ 0,
     400          10 :                                    /* gid */ 0, md.size(),
     401          10 :                                    /*  blockcnt  */ -1, (md.atime()),
     402          10 :                                    (md.mtime()), (md.ctime()),
     403          10 :                                    bool_to_merc_bool(md.link_count()),
     404          20 :                                    /* mode_flag */ false,
     405          10 :                                    bool_to_merc_bool(md.size()), 1,
     406          10 :                                    bool_to_merc_bool(md.atime()),
     407          10 :                                    bool_to_merc_bool(md.mtime()),
     408          20 :                                    bool_to_merc_bool(md.ctime()))
     409          20 :                            .get()
     410          10 :                            .at(0);
     411             : 
     412          10 :         LOG(DEBUG, "Got response success: {}", out.err());
     413             : 
     414             :         // Now create the new file
     415             : 
     416           0 :     } catch(const std::exception& ex) {
     417           0 :         LOG(ERROR, "while getting rpc output");
     418           0 :         return EBUSY;
     419             :     }
     420             : 
     421          20 :     auto md2 = md;
     422             : 
     423          10 :     md2.target_path(oldpath);
     424             :     /*
     425             :      * Now create the new file
     426             :      */
     427             :     // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     428             :     // can retry for RPC_TRIES (see old commits with margo)
     429             :     // TODO(amiranda): hermes will eventually provide a post(endpoint)
     430             :     // returning one result and a broadcast(endpoint_set) returning a
     431             :     // result_set. When that happens we can remove the .at(0) :/
     432          10 :     auto endp2 = CTX->hosts().at(
     433          30 :             CTX->distributor()->locate_file_metadata(newpath, 0));
     434             : 
     435          10 :     try {
     436          10 :         LOG(DEBUG, "Sending RPC ...");
     437             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     438             :         // can retry for RPC_TRIES (see old commits with margo)
     439             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     440             :         // returning one result and a broadcast(endpoint_set) returning a
     441             :         // result_set. When that happens we can remove the .at(0) :/
     442             : 
     443          10 :         auto out = ld_network_service
     444          10 :                            ->post<gkfs::rpc::create>(endp2, newpath, md2.mode())
     445          20 :                            .get()
     446          10 :                            .at(0);
     447          10 :         LOG(DEBUG, "Got response success: {}", out.err());
     448             : 
     449           0 :     } catch(const std::exception& ex) {
     450           0 :         LOG(ERROR, "while getting rpc output");
     451           0 :         return EBUSY;
     452             :     }
     453             : 
     454             : 
     455          10 :     try {
     456          10 :         LOG(DEBUG, "Sending RPC ...");
     457             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     458             :         // can retry for RPC_TRIES (see old commits with margo)
     459             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     460             :         // returning one result and a broadcast(endpoint_set) returning a
     461             :         // result_set. When that happens we can remove the .at(0) :/
     462             :         // Update new file with target link = oldpath
     463          10 :         auto out =
     464             :                 ld_network_service
     465          10 :                         ->post<gkfs::rpc::mk_symlink>(endp2, newpath, oldpath)
     466          20 :                         .get()
     467          10 :                         .at(0);
     468             : 
     469          10 :         LOG(DEBUG, "Got response success: {}", out.err());
     470             : 
     471             :         // return out.err() ? out.err() : 0;
     472             : 
     473           0 :     } catch(const std::exception& ex) {
     474           0 :         LOG(ERROR, "while getting rpc output");
     475           0 :         return EBUSY;
     476             :     }
     477             : 
     478             :     // Update the renamed path to solve the issue with fstat with fd)
     479          10 :     try {
     480          10 :         LOG(DEBUG, "Sending RPC ...");
     481             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     482             :         // can retry for RPC_TRIES (see old commits with margo)
     483             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     484             :         // returning one result and a broadcast(endpoint_set) returning a
     485             :         // result_set. When that happens we can remove the .at(0) :/
     486          10 :         auto out = ld_network_service
     487          10 :                            ->post<gkfs::rpc::mk_symlink>(endp, oldpath, newpath)
     488          20 :                            .get()
     489          10 :                            .at(0);
     490             : 
     491          10 :         LOG(DEBUG, "Got response success: {}", out.err());
     492             : 
     493             :         // return out.err() ? out.err() : 0;
     494          10 :         return 0;
     495           0 :     } catch(const std::exception& ex) {
     496           0 :         LOG(ERROR, "while getting rpc output");
     497           0 :         return EBUSY;
     498             :     }
     499             : }
     500             : 
     501             : #endif
     502             : 
     503             : /**
     504             :  * Send an RPC request for an update to the file size.
     505             :  * This is called during a write() call or similar
     506             :  * A single correct call is needed only to progress.
     507             :  * @param path
     508             :  * @param size
     509             :  * @param offset
     510             :  * @param append_flag
     511             :  * @param num_copies number of replicas
     512             :  * @return pair<error code, size after update>
     513             :  */
     514             : pair<int, off64_t>
     515          41 : forward_update_metadentry_size(const string& path, const size_t size,
     516             :                                const off64_t offset, const bool append_flag,
     517             :                                const int num_copies) {
     518             : 
     519          82 :     std::vector<hermes::rpc_handle<gkfs::rpc::update_metadentry_size>> handles;
     520             : 
     521          82 :     for(auto copy = 0; copy < num_copies + 1; copy++) {
     522          41 :         auto endp = CTX->hosts().at(
     523         123 :                 CTX->distributor()->locate_file_metadata(path, copy));
     524          41 :         try {
     525          41 :             LOG(DEBUG, "Sending RPC ...");
     526             :             // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
     527             :             // we can retry for RPC_TRIES (see old commits with margo)
     528             :             // TODO(amiranda): hermes will eventually provide a post(endpoint)
     529             :             // returning one result and a broadcast(endpoint_set) returning a
     530             :             // result_set. When that happens we can remove the .at(0) :/
     531          41 :             handles.emplace_back(
     532          41 :                     ld_network_service->post<gkfs::rpc::update_metadentry_size>(
     533             :                             endp, path, size, offset,
     534          82 :                             bool_to_merc_bool(append_flag)));
     535           0 :         } catch(const std::exception& ex) {
     536           0 :             LOG(ERROR, "while getting rpc output");
     537           0 :             return make_pair(EBUSY, 0);
     538             :         }
     539             :     }
     540          41 :     auto err = 0;
     541          41 :     ssize_t out_size = 0;
     542          41 :     auto idx = 0;
     543          41 :     bool valid = false;
     544          82 :     for(const auto& h : handles) {
     545          41 :         try {
     546             :             // XXX We might need a timeout here to not wait forever for an
     547             :             // output that never comes?
     548          82 :             auto out = h.get().at(0);
     549             : 
     550          41 :             if(out.err() != 0) {
     551           0 :                 LOG(ERROR, "Daemon {} reported error: {}", idx, out.err());
     552             :             } else {
     553          41 :                 valid = true;
     554          41 :                 out_size = out.ret_size();
     555             :             }
     556             : 
     557           0 :         } catch(const std::exception& ex) {
     558           0 :             LOG(ERROR, "Failed to get rpc output");
     559           0 :             if(!valid) {
     560           0 :                 err = EIO;
     561             :             }
     562             :         }
     563          41 :         idx++;
     564             :     }
     565             : 
     566          41 :     if(!valid)
     567           0 :         return make_pair(err, 0);
     568             :     else
     569          41 :         return make_pair(0, out_size);
     570             : }
     571             : 
     572             : 
     573             : /**
     574             :  * Send an RPC request to get the current file size.
     575             :  * This is called during a lseek() call
     576             :  * @param path
     577             :  * @param copy Target replica (0 original)
     578             :  * @return pair<error code, file size>
     579             :  */
     580             : pair<int, off64_t>
     581           5 : forward_get_metadentry_size(const std::string& path, const int copy) {
     582             : 
     583           5 :     auto endp = CTX->hosts().at(
     584          15 :             CTX->distributor()->locate_file_metadata(path, copy));
     585             : 
     586           5 :     try {
     587           5 :         LOG(DEBUG, "Sending RPC ...");
     588             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     589             :         // can retry for RPC_TRIES (see old commits with margo)
     590             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     591             :         // returning one result and a broadcast(endpoint_set) returning a
     592             :         // result_set. When that happens we can remove the .at(0) :/
     593           5 :         auto out = ld_network_service
     594           5 :                            ->post<gkfs::rpc::get_metadentry_size>(endp, path)
     595          10 :                            .get()
     596           5 :                            .at(0);
     597             : 
     598           5 :         LOG(DEBUG, "Got response success: {}", out.err());
     599             : 
     600           5 :         if(out.err())
     601           0 :             return make_pair(out.err(), 0);
     602             :         else
     603           5 :             return make_pair(0, out.ret_size());
     604           0 :     } catch(const std::exception& ex) {
     605           0 :         LOG(ERROR, "while getting rpc output");
     606           0 :         return make_pair(EBUSY, 0);
     607             :     }
     608             : }
     609             : 
     610             : /**
     611             :  * Send an RPC request to receive all entries of a directory.
     612             :  * @param open_dir
     613             :  * @return error code
     614             :  */
     615             : pair<int, shared_ptr<gkfs::filemap::OpenDir>>
     616          25 : forward_get_dirents(const string& path) {
     617             : 
     618          25 :     LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
     619             : 
     620          50 :     auto const targets = CTX->distributor()->locate_directory_metadata(path);
     621             : 
     622             :     /* preallocate receiving buffer. The actual size is not known yet.
     623             :      *
     624             :      * On C++14 make_unique function also zeroes the newly allocated buffer.
     625             :      * It turns out that this operation is increadibly slow for such a big
     626             :      * buffer. Moreover we don't need a zeroed buffer here.
     627             :      */
     628          25 :     auto large_buffer = std::unique_ptr<char[]>(
     629          50 :             new char[gkfs::config::rpc::dirents_buff_size]);
     630             : 
     631             :     // XXX there is a rounding error here depending on the number of targets...
     632          25 :     const std::size_t per_host_buff_size =
     633          25 :             gkfs::config::rpc::dirents_buff_size / targets.size();
     634             : 
     635             :     // expose local buffers for RMA from servers
     636          50 :     std::vector<hermes::exposed_memory> exposed_buffers;
     637          25 :     exposed_buffers.reserve(targets.size());
     638             : 
     639          50 :     for(std::size_t i = 0; i < targets.size(); ++i) {
     640          25 :         try {
     641          50 :             exposed_buffers.emplace_back(ld_network_service->expose(
     642          25 :                     std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
     643          25 :                             large_buffer.get() + (i * per_host_buff_size),
     644          50 :                             per_host_buff_size}},
     645          25 :                     hermes::access_mode::write_only));
     646           0 :         } catch(const std::exception& ex) {
     647           0 :             LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'",
     648           0 :                 __func__, ex.what());
     649           0 :             return make_pair(EBUSY, nullptr);
     650             :         }
     651             :     }
     652             : 
     653          25 :     auto err = 0;
     654             :     // send RPCs
     655          25 :     std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents>> handles;
     656             : 
     657          50 :     for(std::size_t i = 0; i < targets.size(); ++i) {
     658             : 
     659             :         // Setup rpc input parameters for each host
     660          50 :         auto endp = CTX->hosts().at(targets[i]);
     661             : 
     662          50 :         gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]);
     663             : 
     664          25 :         try {
     665          25 :             LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
     666          25 :             handles.emplace_back(
     667          25 :                     ld_network_service->post<gkfs::rpc::get_dirents>(endp, in));
     668           0 :         } catch(const std::exception& ex) {
     669           0 :             LOG(ERROR,
     670             :                 "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
     671           0 :                 __func__, path, targets[i], ex.what());
     672           0 :             err = EBUSY;
     673           0 :             break; // we need to gather responses from already sent RPCS
     674             :         }
     675             :     }
     676             : 
     677          25 :     LOG(DEBUG,
     678             :         "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
     679          25 :         __func__, path, targets.size(), per_host_buff_size);
     680             : 
     681          25 :     auto send_error = err != 0;
     682          50 :     auto open_dir = make_shared<gkfs::filemap::OpenDir>(path);
     683             :     // wait for RPC responses
     684          50 :     for(std::size_t i = 0; i < handles.size(); ++i) {
     685             : 
     686          25 :         gkfs::rpc::get_dirents::output out;
     687             : 
     688          25 :         try {
     689             :             // XXX We might need a timeout here to not wait forever for an
     690             :             // output that never comes?
     691          50 :             out = handles[i].get().at(0);
     692             :             // skip processing dirent data if there was an error during send
     693             :             // In this case all responses are gathered but their contents
     694             :             // skipped
     695          25 :             if(send_error)
     696           0 :                 continue;
     697             : 
     698          25 :             if(out.err() != 0) {
     699           0 :                 LOG(ERROR,
     700             :                     "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
     701           0 :                     __func__, targets[i], strerror(out.err()), path);
     702           0 :                 err = out.err();
     703             :                 // We need to gather all responses before exiting
     704           0 :                 continue;
     705             :             }
     706           0 :         } catch(const std::exception& ex) {
     707           0 :             LOG(ERROR,
     708             :                 "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
     709           0 :                 __func__, path, targets[i], ex.what());
     710           0 :             err = EBUSY;
     711             :             // We need to gather all responses before exiting
     712           0 :             continue;
     713             :         }
     714             : 
     715             :         // each server wrote information to its pre-defined region in
     716             :         // large_buffer, recover it by computing the base_address for each
     717             :         // particular server and adding the appropriate offsets
     718          25 :         assert(exposed_buffers[i].count() == 1);
     719          25 :         void* base_ptr = exposed_buffers[i].begin()->data();
     720             : 
     721          25 :         bool* bool_ptr = reinterpret_cast<bool*>(base_ptr);
     722          25 :         char* names_ptr = reinterpret_cast<char*>(base_ptr) +
     723          25 :                           (out.dirents_size() * sizeof(bool));
     724             : 
     725        1054 :         for(std::size_t j = 0; j < out.dirents_size(); j++) {
     726             : 
     727        2058 :             gkfs::filemap::FileType ftype =
     728        1029 :                     (*bool_ptr) ? gkfs::filemap::FileType::directory
     729             :                                 : gkfs::filemap::FileType::regular;
     730        1029 :             bool_ptr++;
     731             : 
     732             :             // Check that we are not outside the recv_buff for this specific
     733             :             // host
     734        1029 :             assert((names_ptr - reinterpret_cast<char*>(base_ptr)) > 0);
     735        1029 :             assert(static_cast<unsigned long int>(
     736             :                            names_ptr - reinterpret_cast<char*>(base_ptr)) <
     737             :                    per_host_buff_size);
     738             : 
     739        2058 :             auto name = std::string(names_ptr);
     740             :             // number of characters in entry + \0 terminator
     741        1029 :             names_ptr += name.size() + 1;
     742             : 
     743        1029 :             open_dir->add(name, ftype);
     744             :         }
     745             :     }
     746          50 :     return make_pair(err, open_dir);
     747             : }
     748             : 
     749             : /**
     750             :  * Send an RPC request to receive all entries of a directory in a server.
     751             :  * @param path
     752             :  * @param server
     753             :  * @return error code
     754             :  * Returns a tuple with path-isdir-size and ctime
     755             :  * We still use dirents_buff_size, but we need to match in the client side, as
     756             :  * the buffer is provided by the "gfind" applications However, as we only ask
     757             :  * for a server the size should be enought for most of the scenarios. We are
     758             :  * reusing the forward_get_dirents code. As we only need a server, we could
     759             :  * simplify the code removing the asynchronous part.
     760             :  */
     761             : pair<int, vector<tuple<const std::string, bool, size_t, time_t>>>
     762           4 : forward_get_dirents_single(const string& path, int server) {
     763             : 
     764           4 :     LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
     765             : 
     766           8 :     auto const targets = CTX->distributor()->locate_directory_metadata(path);
     767             : 
     768             :     /* preallocate receiving buffer. The actual size is not known yet.
     769             :      *
     770             :      * On C++14 make_unique function also zeroes the newly allocated buffer.
     771             :      * It turns out that this operation is increadibly slow for such a big
     772             :      * buffer. Moreover we don't need a zeroed buffer here.
     773             :      */
     774           4 :     auto large_buffer = std::unique_ptr<char[]>(
     775           8 :             new char[gkfs::config::rpc::dirents_buff_size]);
     776             : 
     777             :     // We use the full size per server...
     778           4 :     const std::size_t per_host_buff_size = gkfs::config::rpc::dirents_buff_size;
     779           8 :     vector<tuple<const std::string, bool, size_t, time_t>> output;
     780             : 
     781             :     // expose local buffers for RMA from servers
     782           4 :     std::vector<hermes::exposed_memory> exposed_buffers;
     783           4 :     exposed_buffers.reserve(1);
     784           4 :     std::size_t i = server;
     785           4 :     try {
     786           8 :         exposed_buffers.emplace_back(ld_network_service->expose(
     787           4 :                 std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
     788           4 :                         large_buffer.get(), per_host_buff_size}},
     789           4 :                 hermes::access_mode::write_only));
     790           0 :     } catch(const std::exception& ex) {
     791           0 :         LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__,
     792           0 :             ex.what());
     793           0 :         return make_pair(EBUSY, output);
     794             :     }
     795             : 
     796           4 :     auto err = 0;
     797             :     // send RPCs
     798           4 :     std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents_extended>> handles;
     799             : 
     800           8 :     auto endp = CTX->hosts().at(targets[i]);
     801             : 
     802           8 :     gkfs::rpc::get_dirents_extended::input in(path, exposed_buffers[0]);
     803             : 
     804           4 :     try {
     805           4 :         LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
     806           4 :         handles.emplace_back(
     807           4 :                 ld_network_service->post<gkfs::rpc::get_dirents_extended>(endp,
     808           4 :                                                                           in));
     809           0 :     } catch(const std::exception& ex) {
     810           0 :         LOG(ERROR,
     811             :             "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
     812           0 :             __func__, path, targets[i], ex.what());
     813           0 :         err = EBUSY;
     814             :     }
     815             : 
     816           4 :     LOG(DEBUG,
     817             :         "{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
     818           4 :         __func__, path, targets.size(), per_host_buff_size);
     819             : 
     820             :     // wait for RPC responses
     821             : 
     822           4 :     gkfs::rpc::get_dirents_extended::output out;
     823             : 
     824           4 :     try {
     825             :         // XXX We might need a timeout here to not wait forever for an
     826             :         // output that never comes?
     827           8 :         out = handles[0].get().at(0);
     828             :         // skip processing dirent data if there was an error during send
     829             :         // In this case all responses are gathered but their contents skipped
     830             : 
     831           4 :         if(out.err() != 0) {
     832           0 :             LOG(ERROR,
     833             :                 "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
     834             :                 __func__, targets[0], strerror(out.err()), path);
     835           4 :             err = out.err();
     836             :             // We need to gather all responses before exiting
     837             :         }
     838           0 :     } catch(const std::exception& ex) {
     839           0 :         LOG(ERROR,
     840             :             "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
     841           0 :             __func__, path, targets[0], ex.what());
     842           0 :         err = EBUSY;
     843             :         // We need to gather all responses before exiting
     844             :     }
     845             : 
     846             :     // The parenthesis is extremely important if not the cast will add as a
     847             :     // size_t or a time_t and not as a char
     848           4 :     auto out_buff_ptr = static_cast<char*>(exposed_buffers[0].begin()->data());
     849           4 :     auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
     850           4 :     auto size_ptr = reinterpret_cast<size_t*>(
     851           4 :             (out_buff_ptr) + (out.dirents_size() * sizeof(bool)));
     852           4 :     auto ctime_ptr = reinterpret_cast<time_t*>(
     853             :             (out_buff_ptr) +
     854           4 :             (out.dirents_size() * (sizeof(bool) + sizeof(size_t))));
     855           4 :     auto names_ptr =
     856           4 :             out_buff_ptr + (out.dirents_size() *
     857           4 :                             (sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
     858             : 
     859           8 :     for(std::size_t j = 0; j < out.dirents_size(); j++) {
     860             : 
     861           4 :         bool ftype = (*bool_ptr);
     862           4 :         bool_ptr++;
     863             : 
     864           4 :         size_t size = *size_ptr;
     865           4 :         size_ptr++;
     866             : 
     867           4 :         time_t ctime = *ctime_ptr;
     868           4 :         ctime_ptr++;
     869             : 
     870           8 :         auto name = std::string(names_ptr);
     871             :         // number of characters in entry + \0 terminator
     872           4 :         names_ptr += name.size() + 1;
     873           4 :         output.emplace_back(std::forward_as_tuple(name, ftype, size, ctime));
     874             :     }
     875           8 :     return make_pair(err, output);
     876             : }
     877             : 
     878             : 
     879             : #ifdef HAS_SYMLINKS
     880             : 
     881             : /**
     882             :  * Send an RPC request to create a symlink.
     883             :  * @param path
     884             :  * @param target_path
     885             :  * @return error code
     886             :  */
     887             : int
     888           0 : forward_mk_symlink(const std::string& path, const std::string& target_path) {
     889             : 
     890           0 :     auto endp =
     891           0 :             CTX->hosts().at(CTX->distributor()->locate_file_metadata(path, 0));
     892             : 
     893           0 :     try {
     894           0 :         LOG(DEBUG, "Sending RPC ...");
     895             :         // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
     896             :         // can retry for RPC_TRIES (see old commits with margo)
     897             :         // TODO(amiranda): hermes will eventually provide a post(endpoint)
     898             :         // returning one result and a broadcast(endpoint_set) returning a
     899             :         // result_set. When that happens we can remove the .at(0) :/
     900           0 :         auto out =
     901             :                 ld_network_service
     902           0 :                         ->post<gkfs::rpc::mk_symlink>(endp, path, target_path)
     903           0 :                         .get()
     904           0 :                         .at(0);
     905             : 
     906           0 :         LOG(DEBUG, "Got response success: {}", out.err());
     907             : 
     908           0 :         return out.err() ? out.err() : 0;
     909           0 :     } catch(const std::exception& ex) {
     910           0 :         LOG(ERROR, "while getting rpc output");
     911           0 :         return EBUSY;
     912             :     }
     913             : }
     914             : 
     915             : #endif
     916             : 
     917             : } // namespace gkfs::rpc

Generated by: LCOV version 1.16