LCOV - code coverage report
Current view: top level - src/daemon/handler - srv_data.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 203 268 75.7 %
Date: 2024-04-23 00:09:24 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief Provides all Margo RPC handler definitions called by Mercury on client
      30             :  * request for all file system data operations.
      31             :  * @internal
      32             :  * The end of the file defines the associates the Margo RPC handler functions
      33             :  * and associates them with their corresponding GekkoFS handler functions.
      34             :  * @endinternal
      35             :  */
      36             : #include <daemon/daemon.hpp>
      37             : #include <daemon/handler/rpc_defs.hpp>
      38             : #include <daemon/handler/rpc_util.hpp>
      39             : #include <daemon/backend/data/chunk_storage.hpp>
      40             : #include <daemon/ops/data.hpp>
      41             : 
      42             : #include <common/rpc/rpc_types.hpp>
      43             : #include <common/rpc/rpc_util.hpp>
      44             : #include <common/rpc/distributor.hpp>
      45             : #include <common/arithmetic/arithmetic.hpp>
      46             : #include <common/statistics/stats.hpp>
      47             : 
      48             : #ifdef GKFS_ENABLE_AGIOS
      49             : #include <daemon/scheduler/agios.hpp>
      50             : 
      51             : #define AGIOS_READ             0
      52             : #define AGIOS_WRITE            1
      53             : #define AGIOS_SERVER_ID_IGNORE 0
      54             : #endif
      55             : using namespace std;
      56             : 
      57             : 
      58             : namespace {
      59             : 
      60             : /**
      61             :  * @brief Serves a write request transferring the chunks associated with this
      62             :  * daemon and store them on the node-local FS.
      63             :  * @internal
      64             :  * The write operation has multiple steps:
      65             :  * 1. Setting up all RPC related information
      66             :  * 2. Allocating space for bulk transfer buffers
      67             :  * 3. By processing the RPC input, the chunk IDs that are hashing to this daemon
      68             :  * are computed based on a client-defined interval (start and endchunk id for
      69             :  * this write operation). The client does _not_ provide the daemons with a list
      70             :  * of chunk IDs because it is dynamic data that cannot be part of an RPC input
      71             :  * struct. Therefore, this information would need to be pulled with a bulk
      72             :  * transfer as well, adding unnecessary latency to the overall write operation.
      73             :  *
      74             :  * For each relevant chunk, a PULL bulk transfer is issued. Once finished, a
      75             :  * non-blocking Argobots tasklet is launched to write the data chunk to the
      76             :  * backend storage. Therefore, bulk transfer and the backend I/O operation are
      77             :  * overlapping for efficiency.
      78             :  * 4. Wait for all tasklets to complete adding up all the complete written data
      79             :  * size as reported by each task.
      80             :  * 5. Respond to client (when all backend write operations are finished) and
      81             :  * cleanup RPC resources. Any error is reported in the RPC output struct. Note,
      82             :  * that backend write operations are not canceled while in-flight when a task
      83             :  * encounters an error.
      84             :  *
      85             :  * Note, refer to the data backend documentation w.r.t. how Argobots tasklets
      86             :  * work and why they are used.
      87             :  *
      88             :  * All exceptions must be caught here and dealt with accordingly.
      89             :  * @endinteral
      90             :  * @param handle Mercury RPC handle
      91             :  * @return Mercury error code to Mercury
      92             :  */
      93             : hg_return_t
      94          41 : rpc_srv_write(hg_handle_t handle) {
      95             :     /*
      96             :      * 1. Setup
      97             :      */
      98          41 :     rpc_write_data_in_t in{};
      99          41 :     rpc_data_out_t out{};
     100          41 :     hg_bulk_t bulk_handle = nullptr;
     101             :     // default out for error
     102          41 :     out.err = EIO;
     103          41 :     out.io_size = 0;
     104             :     // Getting some information from margo
     105          41 :     auto ret = margo_get_input(handle, &in);
     106          41 :     if(ret != HG_SUCCESS) {
     107           0 :         GKFS_DATA->spdlogger()->error(
     108           0 :                 "{}() Could not get RPC input data with err {}", __func__, ret);
     109           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     110             :     }
     111          41 :     auto hgi = margo_get_info(handle);
     112          41 :     auto mid = margo_hg_info_get_instance(hgi);
     113          41 :     auto bulk_size = margo_bulk_get_size(in.bulk_handle);
     114          41 :     GKFS_DATA->spdlogger()->debug(
     115             :             "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
     116             :             __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
     117          41 :             in.total_chunk_size, bulk_size, in.offset);
     118             : 
     119          41 :     std::vector<uint8_t> write_ops_vect =
     120         123 :             gkfs::rpc::decompress_bitset(in.wbitset);
     121             : 
     122             : #ifdef GKFS_ENABLE_AGIOS
     123             :     int* data;
     124             :     ABT_eventual eventual = ABT_EVENTUAL_NULL;
     125             : 
     126             :     /* creating eventual */
     127             :     ABT_eventual_create(sizeof(int64_t), &eventual);
     128             : 
     129             :     unsigned long long int request_id = generate_unique_id();
     130             :     char* agios_path = (char*) in.path;
     131             : 
     132             :     // We should call AGIOS before chunking (as that is an internal way to
     133             :     // handle the requests)
     134             :     if(!agios_add_request(agios_path, AGIOS_WRITE, in.offset,
     135             :                           in.total_chunk_size, request_id,
     136             :                           AGIOS_SERVER_ID_IGNORE, agios_eventual_callback,
     137             :                           eventual)) {
     138             :         GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS",
     139             :                                       __func__);
     140             :     } else {
     141             :         GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS",
     142             :                                       __func__, request_id);
     143             :     }
     144             : 
     145             :     /* Block until the eventual is signaled */
     146             :     ABT_eventual_wait(eventual, (void**) &data);
     147             : 
     148             :     unsigned long long int result = *data;
     149             :     GKFS_DATA->spdlogger()->debug(
     150             :             "{}() request {} was unblocked (offset = {})!", __func__, result,
     151             :             in.offset);
     152             : 
     153             :     ABT_eventual_free(&eventual);
     154             : 
     155             :     // Let AGIOS knows it can release the request, as it is completed
     156             :     if(!agios_release_request(agios_path, AGIOS_WRITE, in.total_chunk_size,
     157             :                               in.offset)) {
     158             :         GKFS_DATA->spdlogger()->error(
     159             :                 "{}() Failed to release request from AGIOS", __func__);
     160             :     }
     161             : #endif
     162             : 
     163             :     /*
     164             :      * 2. Set up buffers for pull bulk transfers
     165             :      */
     166          41 :     void* bulk_buf;                          // buffer for bulk transfer
     167          82 :     vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
     168             :     // create bulk handle and allocated memory for buffer with buf_sizes
     169             :     // information
     170          41 :     ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size,
     171             :                             HG_BULK_READWRITE, &bulk_handle);
     172          41 :     if(ret != HG_SUCCESS) {
     173           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
     174           0 :                                       __func__);
     175           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     176             :                                           static_cast<hg_bulk_t*>(nullptr));
     177             :     }
     178             :     // access the internally allocated memory buffer and put it into buf_ptrs
     179          41 :     uint32_t actual_count;
     180          41 :     ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size,
     181             :                             HG_BULK_READWRITE, 1, &bulk_buf,
     182             :                             &in.total_chunk_size, &actual_count);
     183          41 :     if(ret != HG_SUCCESS || actual_count != 1) {
     184           0 :         GKFS_DATA->spdlogger()->error(
     185             :                 "{}() Failed to access allocated buffer from bulk handle",
     186           0 :                 __func__);
     187           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     188             :     }
     189          41 :     auto const host_id = in.host_id;
     190          41 :     [[maybe_unused]] auto const host_size = in.host_size;
     191             : 
     192          82 :     auto path = make_shared<string>(in.path);
     193             :     // chnk_ids used by this host
     194          82 :     vector<uint64_t> chnk_ids_host(in.chunk_n);
     195             :     // counter to track how many chunks have been assigned
     196          41 :     auto chnk_id_curr = static_cast<uint64_t>(0);
     197             :     // chnk sizes per chunk for this host
     198          82 :     vector<uint64_t> chnk_sizes(in.chunk_n);
     199             :     // how much size is left to assign chunks for writing
     200          41 :     auto chnk_size_left_host = in.total_chunk_size;
     201             :     // temporary traveling pointer
     202          41 :     auto chnk_ptr = static_cast<char*>(bulk_buf);
     203             :     /*
     204             :      * consider the following cases:
     205             :      * 1. Very first chunk has offset or not and is serviced by this node
     206             :      * 2. If offset, will still be only 1 chunk written (small IO): (offset +
     207             :      * bulk_size <= CHUNKSIZE) ? bulk_size
     208             :      * 3. If no offset, will only be 1 chunk written (small IO): (bulk_size <=
     209             :      * CHUNKSIZE) ? bulk_size
     210             :      * 4. Chunks between start and end chunk have size of the CHUNKSIZE
     211             :      * 5. Last chunk (if multiple chunks are written): Don't write CHUNKSIZE but
     212             :      * chnk_size_left for this destination Last chunk can also happen if only
     213             :      * one chunk is written. This is covered by 2 and 3.
     214             :      */
     215             :     // temporary variables
     216          41 :     auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize)
     217          41 :                                  ? bulk_size
     218             :                                  : gkfs::config::rpc::chunksize;
     219          41 :     uint64_t origin_offset;
     220          41 :     uint64_t local_offset;
     221             :     // object for asynchronous disk IO
     222         123 :     gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n};
     223             : 
     224             :     /*
     225             :      * 3. Calculate chunk sizes that correspond to this host, transfer data, and
     226             :      * start tasks to write to disk
     227             :      */
     228             :     // Start to look for a chunk that hashes to this host with the first chunk
     229             :     // in the buffer
     230         141 :     for(auto chnk_id_file = in.chunk_start;
     231         141 :         chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
     232             :         chnk_id_file++) {
     233             :         // Continue if chunk does not hash to this host
     234             : 
     235         100 :         if(!(gkfs::rpc::get_bitset(write_ops_vect,
     236         100 :                                    chnk_id_file - in.chunk_start))) {
     237           0 :             GKFS_DATA->spdlogger()->trace(
     238             :                     "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
     239           0 :                     __func__, chnk_id_file, host_id, chnk_id_curr);
     240           0 :             continue;
     241             :         }
     242             : 
     243         100 :         if(GKFS_DATA->enable_chunkstats()) {
     244         200 :             GKFS_DATA->stats()->add_write(in.path, chnk_id_file);
     245             :         }
     246             : 
     247         100 :         GKFS_DATA->spdlogger()->error("{}() Processing at host {} -> {}",
     248         100 :                                       __func__, host_id, chnk_id_file);
     249         100 :         chnk_ids_host[chnk_id_curr] =
     250             :                 chnk_id_file; // save this id to host chunk list
     251             :         // offset case. Only relevant in the first iteration of the loop and if
     252             :         // the chunk hashes to this host
     253         100 :         if(chnk_id_file == in.chunk_start && in.offset > 0) {
     254             :             // if only 1 destination and 1 chunk (small write) the transfer_size
     255             :             // == bulk_size
     256          11 :             size_t offset_transfer_size = 0;
     257          11 :             if(in.offset + bulk_size <= gkfs::config::rpc::chunksize)
     258             :                 offset_transfer_size = bulk_size;
     259             :             else
     260           0 :                 offset_transfer_size = static_cast<size_t>(
     261           0 :                         gkfs::config::rpc::chunksize - in.offset);
     262          11 :             ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
     263             :                                       in.bulk_handle, 0, bulk_handle, 0,
     264             :                                       offset_transfer_size);
     265          11 :             if(ret != HG_SUCCESS) {
     266           0 :                 GKFS_DATA->spdlogger()->error(
     267             :                         "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}",
     268             :                         __func__, chnk_id_file, in.chunk_start,
     269           0 :                         in.chunk_end - 1);
     270           0 :                 out.err = EBUSY;
     271           0 :                 return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     272           0 :                                                   &bulk_handle);
     273             :             }
     274          11 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     275          11 :             chnk_sizes[chnk_id_curr] = offset_transfer_size;
     276          11 :             chnk_ptr += offset_transfer_size;
     277          11 :             chnk_size_left_host -= offset_transfer_size;
     278             :         } else {
     279          89 :             local_offset = in.total_chunk_size - chnk_size_left_host;
     280             :             // origin offset of a chunk is dependent on a given offset in a
     281             :             // write operation
     282          89 :             if(in.offset > 0)
     283           0 :                 origin_offset = (gkfs::config::rpc::chunksize - in.offset) +
     284           0 :                                 ((chnk_id_file - in.chunk_start) - 1) *
     285             :                                         gkfs::config::rpc::chunksize;
     286             :             else
     287          89 :                 origin_offset = (chnk_id_file - in.chunk_start) *
     288             :                                 gkfs::config::rpc::chunksize;
     289             :             // last chunk might have different transfer_size
     290          89 :             if(chnk_id_curr == in.chunk_n - 1)
     291          30 :                 transfer_size = chnk_size_left_host;
     292          89 :             GKFS_DATA->spdlogger()->trace(
     293             :                     "{}() BULK_TRANSFER_PULL hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}",
     294             :                     __func__, host_id, in.path, chnk_id_file,
     295             :                     in.total_chunk_size, chnk_size_left_host, origin_offset,
     296          89 :                     local_offset, transfer_size);
     297             :             // RDMA the data to here
     298          89 :             ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
     299             :                                       in.bulk_handle, origin_offset,
     300             :                                       bulk_handle, local_offset, transfer_size);
     301          89 :             if(ret != HG_SUCCESS) {
     302           0 :                 GKFS_DATA->spdlogger()->error(
     303             :                         "{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})",
     304             :                         __func__, in.path, chnk_id_file, in.chunk_start,
     305           0 :                         (in.chunk_end - 1));
     306           0 :                 out.err = EBUSY;
     307           0 :                 return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     308             :                                                   &bulk_handle);
     309             :             }
     310          89 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     311          89 :             chnk_sizes[chnk_id_curr] = transfer_size;
     312          89 :             chnk_ptr += transfer_size;
     313          89 :             chnk_size_left_host -= transfer_size;
     314             :         }
     315         100 :         try {
     316             :             // start tasklet for writing chunk
     317          41 :             chunk_op.write_nonblock(
     318         100 :                     chnk_id_curr, chnk_ids_host[chnk_id_curr],
     319         100 :                     bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr],
     320         100 :                     (chnk_id_file == in.chunk_start) ? in.offset : 0);
     321           0 :         } catch(const gkfs::data::ChunkWriteOpException& e) {
     322             :             // This exception is caused by setup of Argobots variables. If this
     323             :             // fails, something is really wrong
     324           0 :             GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'",
     325           0 :                                           __func__, e.what());
     326           0 :             return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     327             :         }
     328             :         // next chunk
     329         100 :         chnk_id_curr++;
     330             :     }
     331             :     // Sanity check that all chunks where detected in previous loop
     332             :     // TODO don't proceed if that happens.
     333          41 :     if(chnk_size_left_host != 0)
     334           0 :         GKFS_DATA->spdlogger()->warn(
     335             :                 "{}() Not all chunks were detected!!! Size left {}", __func__,
     336           0 :                 chnk_size_left_host);
     337             :     /*
     338             :      * 4. Read task results and accumulate in out.io_size
     339             :      */
     340          41 :     auto write_result = chunk_op.wait_for_tasks();
     341          41 :     out.err = write_result.first;
     342          41 :     out.io_size = write_result.second;
     343             : 
     344             :     // Sanity check to see if all data has been written
     345          41 :     if(in.total_chunk_size != out.io_size) {
     346           0 :         GKFS_DATA->spdlogger()->warn(
     347             :                 "{}() total chunk size {} and out.io_size {} mismatch!",
     348           0 :                 __func__, in.total_chunk_size, out.io_size);
     349             :     }
     350             : 
     351             :     /*
     352             :      * 5. Respond and cleanup
     353             :      */
     354          41 :     GKFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__,
     355          41 :                                   out.err);
     356          41 :     auto handler_ret =
     357          41 :             gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     358          41 :     if(GKFS_DATA->enable_stats()) {
     359          41 :         GKFS_DATA->stats()->add_value_size(
     360             :                 gkfs::utils::Stats::SizeOp::write_size, bulk_size);
     361             :     }
     362             :     return handler_ret;
     363             : }
     364             : 
     365             : /**
     366             :  * @brief Serves a read request reading the chunks associated with this
     367             :  * daemon from the node-local FS and transferring them back to the client.
     368             :  * @internal
     369             :  * The read operation has multiple steps:
     370             :  * 1. Setting up all RPC related information
     371             :  * 2. Allocating space for bulk transfer buffers
     372             :  * 3. By processing the RPC input, the chunk IDs that are hashing to this daemon
     373             :  * are computed based on a client-defined interval (start and endchunk id for
     374             :  * this read operation). The client does _not_ provide the daemons with a list
     375             :  * of chunk IDs because it is dynamic data that cannot be part of an RPC input
     376             :  * struct. Therefore, this information would need to be pulled with a bulk
     377             :  * transfer as well, adding unnecessary latency to the overall write operation.
     378             :  *
     379             :  * For each relevant chunk, a non-blocking Arbobots tasklet is launched to read
     380             :  * the data chunk from the backend storage to the allocated buffers.
     381             :  * 4. Wait for all tasklets to finish the read operation while PUSH bulk
     382             :  * transferring each chunk back to the client when a tasklet finishes.
     383             :  * Therefore, bulk transfer and the backend I/O operation are overlapping for
     384             :  * efficiency. The read size is added up for all tasklets.
     385             :  * 5. Respond to client (when all bulk transfers are finished) and cleanup RPC
     386             :  * resources. Any error is reported in the RPC output struct. Note, that backend
     387             :  * read operations are not canceled while in-flight when a task encounters an
     388             :  * error.
     389             :  *
     390             :  * Note, refer to the data backend documentation w.r.t. how Argobots tasklets
     391             :  * work and why they are used.
     392             :  *
     393             :  * All exceptions must be caught here and dealt with accordingly.
     394             :  * @endinteral
     395             :  * @param handle Mercury RPC handle
     396             :  * @return Mercury error code to Mercury
     397             :  */
     398             : hg_return_t
     399          28 : rpc_srv_read(hg_handle_t handle) {
     400             :     /*
     401             :      * 1. Setup
     402             :      */
     403          28 :     rpc_read_data_in_t in{};
     404          28 :     rpc_data_out_t out{};
     405          28 :     hg_bulk_t bulk_handle = nullptr;
     406             :     // Set default out for error
     407          28 :     out.err = EIO;
     408          28 :     out.io_size = 0;
     409             :     // Getting some information from margo
     410          28 :     auto ret = margo_get_input(handle, &in);
     411          28 :     if(ret != HG_SUCCESS) {
     412           0 :         GKFS_DATA->spdlogger()->error(
     413           0 :                 "{}() Could not get RPC input data with err {}", __func__, ret);
     414           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     415             :     }
     416          28 :     auto hgi = margo_get_info(handle);
     417          28 :     auto mid = margo_hg_info_get_instance(hgi);
     418          28 :     auto bulk_size = margo_bulk_get_size(in.bulk_handle);
     419             : 
     420          28 :     GKFS_DATA->spdlogger()->debug(
     421             :             "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
     422             :             __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
     423          28 :             in.total_chunk_size, bulk_size, in.offset);
     424          28 :     std::vector<uint8_t> read_bitset_vect =
     425          84 :             gkfs::rpc::decompress_bitset(in.wbitset);
     426             : #ifdef GKFS_ENABLE_AGIOS
     427             :     int* data;
     428             :     ABT_eventual eventual = ABT_EVENTUAL_NULL;
     429             : 
     430             :     /* creating eventual */
     431             :     ABT_eventual_create(sizeof(int64_t), &eventual);
     432             : 
     433             :     unsigned long long int request_id = generate_unique_id();
     434             :     char* agios_path = (char*) in.path;
     435             : 
     436             :     // We should call AGIOS before chunking (as that is an internal way to
     437             :     // handle the requests)
     438             :     if(!agios_add_request(agios_path, AGIOS_READ, in.offset,
     439             :                           in.total_chunk_size, request_id,
     440             :                           AGIOS_SERVER_ID_IGNORE, agios_eventual_callback,
     441             :                           eventual)) {
     442             :         GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS",
     443             :                                       __func__);
     444             :     } else {
     445             :         GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS",
     446             :                                       __func__, request_id);
     447             :     }
     448             : 
     449             :     /* block until the eventual is signaled */
     450             :     ABT_eventual_wait(eventual, (void**) &data);
     451             : 
     452             :     unsigned long long int result = *data;
     453             :     GKFS_DATA->spdlogger()->debug(
     454             :             "{}() request {} was unblocked (offset = {})!", __func__, result,
     455             :             in.offset);
     456             : 
     457             :     ABT_eventual_free(&eventual);
     458             : 
     459             :     // let AGIOS knows it can release the request, as it is completed
     460             :     if(!agios_release_request(agios_path, AGIOS_READ, in.total_chunk_size,
     461             :                               in.offset)) {
     462             :         GKFS_DATA->spdlogger()->error(
     463             :                 "{}() Failed to release request from AGIOS", __func__);
     464             :     }
     465             : #endif
     466             : 
     467             :     /*
     468             :      * 2. Set up buffers for push bulk transfers
     469             :      */
     470          28 :     void* bulk_buf;                          // buffer for bulk transfer
     471          56 :     vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
     472             :     // create bulk handle and allocated memory for buffer with buf_sizes
     473             :     // information
     474          28 :     ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size,
     475             :                             HG_BULK_READWRITE, &bulk_handle);
     476          28 :     if(ret != HG_SUCCESS) {
     477           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
     478           0 :                                       __func__);
     479           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     480             :                                           static_cast<hg_bulk_t*>(nullptr));
     481             :     }
     482             :     // access the internally allocated memory buffer and put it into buf_ptrs
     483          28 :     uint32_t actual_count;
     484          28 :     ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size,
     485             :                             HG_BULK_READWRITE, 1, &bulk_buf,
     486             :                             &in.total_chunk_size, &actual_count);
     487          28 :     if(ret != HG_SUCCESS || actual_count != 1) {
     488           0 :         GKFS_DATA->spdlogger()->error(
     489             :                 "{}() Failed to access allocated buffer from bulk handle",
     490           0 :                 __func__);
     491           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     492             :     }
     493             : 
     494          28 :     auto const host_id = in.host_id;
     495             : 
     496          56 :     auto path = make_shared<string>(in.path);
     497             :     // chnk_ids used by this host
     498          56 :     vector<uint64_t> chnk_ids_host(in.chunk_n);
     499             :     // counter to track how many chunks have been assigned
     500          28 :     auto chnk_id_curr = static_cast<uint64_t>(0);
     501             :     // chnk sizes per chunk for this host
     502          56 :     vector<uint64_t> chnk_sizes(in.chunk_n);
     503             :     // local and origin offsets for bulk operations
     504          56 :     vector<uint64_t> local_offsets(in.chunk_n);
     505          56 :     vector<uint64_t> origin_offsets(in.chunk_n);
     506             :     // how much size is left to assign chunks for reading
     507          28 :     auto chnk_size_left_host = in.total_chunk_size;
     508             :     // temporary traveling pointer
     509          28 :     auto chnk_ptr = static_cast<char*>(bulk_buf);
     510             :     // temporary variables
     511          28 :     auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize)
     512          28 :                                  ? bulk_size
     513             :                                  : gkfs::config::rpc::chunksize;
     514             :     // object for asynchronous disk IO
     515          84 :     gkfs::data::ChunkReadOperation chunk_read_op{in.path, in.chunk_n};
     516             :     /*
     517             :      * 3. Calculate chunk sizes that correspond to this host and start tasks to
     518             :      * read from disk
     519             :      */
     520             :     // Start to look for a chunk that hashes to this host with the first chunk
     521             :     // in the buffer
     522         102 :     for(auto chnk_id_file = in.chunk_start;
     523         102 :         chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
     524             :         chnk_id_file++) {
     525             :         // Continue if chunk does not hash to this host
     526             : 
     527             :         // We only check if we are not using replicas
     528             : 
     529          74 :         if(!(gkfs::rpc::get_bitset(read_bitset_vect,
     530          74 :                                    chnk_id_file - in.chunk_start))) {
     531           0 :             GKFS_DATA->spdlogger()->trace(
     532             :                     "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
     533           0 :                     __func__, chnk_id_file, host_id, chnk_id_curr);
     534           0 :             continue;
     535             :         }
     536          74 :         if(GKFS_DATA->enable_chunkstats()) {
     537         148 :             GKFS_DATA->stats()->add_read(in.path, chnk_id_file);
     538             :         }
     539             : 
     540             : 
     541          74 :         chnk_ids_host[chnk_id_curr] =
     542             :                 chnk_id_file; // save this id to host chunk list
     543             :         // Only relevant in the first iteration of the loop and if the chunk
     544             :         // hashes to this host
     545          74 :         if(chnk_id_file == in.chunk_start && in.offset > 0) {
     546             :             // if only 1 destination and 1 chunk (small read) the transfer_size
     547             :             // == bulk_size
     548          14 :             size_t offset_transfer_size = 0;
     549          14 :             if(in.offset + bulk_size <= gkfs::config::rpc::chunksize)
     550             :                 offset_transfer_size = bulk_size;
     551             :             else
     552           6 :                 offset_transfer_size = static_cast<size_t>(
     553           6 :                         gkfs::config::rpc::chunksize - in.offset);
     554             :             // Setting later transfer offsets
     555          14 :             local_offsets[chnk_id_curr] = 0;
     556          14 :             origin_offsets[chnk_id_curr] = 0;
     557          14 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     558          14 :             chnk_sizes[chnk_id_curr] = offset_transfer_size;
     559             :             // utils variables
     560          14 :             chnk_ptr += offset_transfer_size;
     561          14 :             chnk_size_left_host -= offset_transfer_size;
     562             :         } else {
     563          60 :             local_offsets[chnk_id_curr] =
     564          60 :                     in.total_chunk_size - chnk_size_left_host;
     565             :             // origin offset of a chunk is dependent on a given offset in a
     566             :             // write operation
     567          60 :             if(in.offset > 0)
     568          14 :                 origin_offsets[chnk_id_curr] =
     569          14 :                         (gkfs::config::rpc::chunksize - in.offset) +
     570          14 :                         ((chnk_id_file - in.chunk_start) - 1) *
     571             :                                 gkfs::config::rpc::chunksize;
     572             :             else
     573          46 :                 origin_offsets[chnk_id_curr] = (chnk_id_file - in.chunk_start) *
     574             :                                                gkfs::config::rpc::chunksize;
     575             :             // last chunk might have different transfer_size
     576          60 :             if(chnk_id_curr == in.chunk_n - 1)
     577          20 :                 transfer_size = chnk_size_left_host;
     578          60 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     579          60 :             chnk_sizes[chnk_id_curr] = transfer_size;
     580             :             // utils variables
     581          60 :             chnk_ptr += transfer_size;
     582          60 :             chnk_size_left_host -= transfer_size;
     583             :         }
     584          74 :         try {
     585             :             // start tasklet for read operation
     586          28 :             chunk_read_op.read_nonblock(
     587          74 :                     chnk_id_curr, chnk_ids_host[chnk_id_curr],
     588          74 :                     bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr],
     589          74 :                     (chnk_id_file == in.chunk_start) ? in.offset : 0);
     590           0 :         } catch(const gkfs::data::ChunkReadOpException& e) {
     591             :             // This exception is caused by setup of Argobots variables. If this
     592             :             // fails, something is really wrong
     593           0 :             GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'",
     594           0 :                                           __func__, e.what());
     595           0 :             return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     596             :         }
     597          74 :         chnk_id_curr++;
     598             :     }
     599             :     // Sanity check that all chunks where detected in previous loop
     600             :     // TODO error out. If we continue this will crash the server when sending
     601             :     // results back that don't exist.
     602          28 :     if(chnk_size_left_host != 0)
     603           0 :         GKFS_DATA->spdlogger()->warn(
     604             :                 "{}() Not all chunks were detected!!! Size left {}", __func__,
     605           0 :                 chnk_size_left_host);
     606             : 
     607          28 :     if(chnk_size_left_host == in.total_chunk_size)
     608             :         return HG_CANCELED;
     609             : 
     610             :     /*
     611             :      * 4. Read task results and accumulate in out.io_size
     612             :      */
     613          28 :     gkfs::data::ChunkReadOperation::bulk_args bulk_args{};
     614          28 :     bulk_args.mid = mid;
     615          28 :     bulk_args.origin_addr = hgi->addr;
     616          28 :     bulk_args.origin_bulk_handle = in.bulk_handle;
     617          28 :     bulk_args.origin_offsets = &origin_offsets;
     618          28 :     bulk_args.local_bulk_handle = bulk_handle;
     619          28 :     bulk_args.local_offsets = &local_offsets;
     620          28 :     bulk_args.chunk_ids = &chnk_ids_host;
     621             :     // wait for all tasklets and push read data back to client
     622          28 :     auto read_result = chunk_read_op.wait_for_tasks_and_push_back(bulk_args);
     623          28 :     out.err = read_result.first;
     624          28 :     out.io_size = read_result.second;
     625             : 
     626             :     /*
     627             :      * 5. Respond and cleanup
     628             :      */
     629          28 :     GKFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}",
     630          28 :                                   __func__, out.err);
     631          28 :     auto handler_ret =
     632          28 :             gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     633          28 :     if(GKFS_DATA->enable_stats()) {
     634          28 :         GKFS_DATA->stats()->add_value_size(
     635             :                 gkfs::utils::Stats::SizeOp::read_size, bulk_size);
     636             :     }
     637             :     return handler_ret;
     638             : }
     639             : 
     640             : 
     641             : /**
     642             :  * @brief Serves a file truncate request and remove all corresponding chunk
     643             :  * files on this daemon.
     644             :  * @internal
     645             :  * A truncate operation includes decreasing the file size of the metadata entry
     646             :  * (if hashing to this daemon) and removing all corresponding chunks exceeding
     647             :  * the new file size.
     648             :  *
     649             :  * All exceptions must be caught here and dealt with accordingly.
     650             :  * @endinteral
     651             :  * @param handle Mercury RPC handle
     652             :  * @return Mercury error code to Mercury
     653             :  */
     654             : hg_return_t
     655           2 : rpc_srv_truncate(hg_handle_t handle) {
     656           2 :     rpc_trunc_in_t in{};
     657           2 :     rpc_err_out_t out{};
     658           2 :     out.err = EIO;
     659             :     // Getting some information from margo
     660           2 :     auto ret = margo_get_input(handle, &in);
     661           2 :     if(ret != HG_SUCCESS) {
     662           0 :         GKFS_DATA->spdlogger()->error(
     663           0 :                 "{}() Could not get RPC input data with err {}", __func__, ret);
     664           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     665             :     }
     666           2 :     GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__,
     667           2 :                                   in.path, in.length);
     668             : 
     669           6 :     gkfs::data::ChunkTruncateOperation chunk_op{in.path};
     670           2 :     try {
     671             :         // start tasklet for truncate operation
     672           2 :         chunk_op.truncate(in.length);
     673           0 :     } catch(const gkfs::data::ChunkMetaOpException& e) {
     674             :         // This exception is caused by setup of Argobots variables. If this
     675             :         // fails, something is really wrong
     676           0 :         GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__,
     677           0 :                                       e.what());
     678           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     679             :     }
     680             : 
     681             :     // wait and get output
     682           2 :     out.err = chunk_op.wait_for_task();
     683             : 
     684           2 :     GKFS_DATA->spdlogger()->debug("{}() Sending output response '{}'", __func__,
     685           2 :                                   out.err);
     686           2 :     return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     687             : }
     688             : 
     689             : 
     690             : /**
     691             :  * @brief Serves a chunk stat request, responding with space information of the
     692             :  * node local file system.
     693             :  * @internal
     694             :  * All exceptions must be caught here and dealt with accordingly.
     695             :  * @endinteral
     696             :  * @param handle Mercury RPC handle
     697             :  * @return Mercury error code to Mercury
     698             :  */
     699             : hg_return_t
     700           2 : rpc_srv_get_chunk_stat(hg_handle_t handle) {
     701           2 :     GKFS_DATA->spdlogger()->debug("{}() enter", __func__);
     702           2 :     rpc_chunk_stat_out_t out{};
     703           2 :     out.err = EIO;
     704           2 :     try {
     705           2 :         auto chk_stat = GKFS_DATA->storage()->chunk_stat();
     706           2 :         out.chunk_size = chk_stat.chunk_size;
     707           2 :         out.chunk_total = chk_stat.chunk_total;
     708           2 :         out.chunk_free = chk_stat.chunk_free;
     709           2 :         out.err = 0;
     710           0 :     } catch(const gkfs::data::ChunkStorageException& err) {
     711           0 :         GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
     712           0 :         out.err = err.code().value();
     713           0 :     } catch(const ::exception& err) {
     714           0 :         GKFS_DATA->spdlogger()->error(
     715             :                 "{}() Unexpected error when chunk stat '{}'", __func__,
     716           0 :                 err.what());
     717           0 :         out.err = EAGAIN;
     718             :     }
     719             : 
     720             :     // Create output and send it back
     721           2 :     return gkfs::rpc::cleanup_respond(&handle, &out);
     722             : }
     723             : 
     724             : } // namespace
     725             : 
     726          82 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_write)
     727             : 
     728          56 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_read)
     729             : 
     730           4 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate)
     731             : 
     732           4 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat)
     733             : 
     734             : #ifdef GKFS_ENABLE_AGIOS
     735             : void*
     736             : agios_eventual_callback(int64_t request_id, void* info) {
     737             :     GKFS_DATA->spdlogger()->debug("{}() custom callback request {} is ready",
     738             :                                   __func__, request_id);
     739             : 
     740             :     ABT_eventual_set((ABT_eventual) info, &request_id, sizeof(int64_t));
     741             : 
     742             :     return 0;
     743             : }
     744             : #endif

Generated by: LCOV version 1.16