LCOV - code coverage report
Current view: top level - src/daemon/handler - srv_data.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 201 266 75.6 %
Date: 2024-04-30 13:21:35 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief Provides all Margo RPC handler definitions called by Mercury on client
      30             :  * request for all file system data operations.
      31             :  * @internal
      32             :  * The end of the file defines the associates the Margo RPC handler functions
      33             :  * and associates them with their corresponding GekkoFS handler functions.
      34             :  * @endinternal
      35             :  */
      36             : #include <daemon/daemon.hpp>
      37             : #include <daemon/handler/rpc_defs.hpp>
      38             : #include <daemon/handler/rpc_util.hpp>
      39             : #include <daemon/backend/data/chunk_storage.hpp>
      40             : #include <daemon/ops/data.hpp>
      41             : 
      42             : #include <common/rpc/rpc_types.hpp>
      43             : #include <common/rpc/rpc_util.hpp>
      44             : #include <common/rpc/distributor.hpp>
      45             : #include <common/arithmetic/arithmetic.hpp>
      46             : #include <common/statistics/stats.hpp>
      47             : 
      48             : #ifdef GKFS_ENABLE_AGIOS
      49             : #include <daemon/scheduler/agios.hpp>
      50             : 
      51             : #define AGIOS_READ             0
      52             : #define AGIOS_WRITE            1
      53             : #define AGIOS_SERVER_ID_IGNORE 0
      54             : #endif
      55             : using namespace std;
      56             : 
      57             : 
      58             : namespace {
      59             : 
      60             : /**
      61             :  * @brief Serves a write request transferring the chunks associated with this
      62             :  * daemon and store them on the node-local FS.
      63             :  * @internal
      64             :  * The write operation has multiple steps:
      65             :  * 1. Setting up all RPC related information
      66             :  * 2. Allocating space for bulk transfer buffers
      67             :  * 3. By processing the RPC input, the chunk IDs that are hashing to this daemon
      68             :  * are computed based on a client-defined interval (start and endchunk id for
      69             :  * this write operation). The client does _not_ provide the daemons with a list
      70             :  * of chunk IDs because it is dynamic data that cannot be part of an RPC input
      71             :  * struct. Therefore, this information would need to be pulled with a bulk
      72             :  * transfer as well, adding unnecessary latency to the overall write operation.
      73             :  *
      74             :  * For each relevant chunk, a PULL bulk transfer is issued. Once finished, a
      75             :  * non-blocking Argobots tasklet is launched to write the data chunk to the
      76             :  * backend storage. Therefore, bulk transfer and the backend I/O operation are
      77             :  * overlapping for efficiency.
      78             :  * 4. Wait for all tasklets to complete adding up all the complete written data
      79             :  * size as reported by each task.
      80             :  * 5. Respond to client (when all backend write operations are finished) and
      81             :  * cleanup RPC resources. Any error is reported in the RPC output struct. Note,
      82             :  * that backend write operations are not canceled while in-flight when a task
      83             :  * encounters an error.
      84             :  *
      85             :  * Note, refer to the data backend documentation w.r.t. how Argobots tasklets
      86             :  * work and why they are used.
      87             :  *
      88             :  * All exceptions must be caught here and dealt with accordingly.
      89             :  * @endinteral
      90             :  * @param handle Mercury RPC handle
      91             :  * @return Mercury error code to Mercury
      92             :  */
      93             : hg_return_t
      94          41 : rpc_srv_write(hg_handle_t handle) {
      95             :     /*
      96             :      * 1. Setup
      97             :      */
      98          41 :     rpc_write_data_in_t in{};
      99          41 :     rpc_data_out_t out{};
     100          41 :     hg_bulk_t bulk_handle = nullptr;
     101             :     // default out for error
     102          41 :     out.err = EIO;
     103          41 :     out.io_size = 0;
     104             :     // Getting some information from margo
     105          41 :     auto ret = margo_get_input(handle, &in);
     106          41 :     if(ret != HG_SUCCESS) {
     107           0 :         GKFS_DATA->spdlogger()->error(
     108           0 :                 "{}() Could not get RPC input data with err {}", __func__, ret);
     109           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     110             :     }
     111          41 :     auto hgi = margo_get_info(handle);
     112          41 :     auto mid = margo_hg_info_get_instance(hgi);
     113          41 :     auto bulk_size = margo_bulk_get_size(in.bulk_handle);
     114          41 :     GKFS_DATA->spdlogger()->debug(
     115             :             "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
     116             :             __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
     117          41 :             in.total_chunk_size, bulk_size, in.offset);
     118             : 
     119          41 :     std::vector<uint8_t> write_ops_vect =
     120         123 :             gkfs::rpc::decompress_bitset(in.wbitset);
     121             : 
     122             : #ifdef GKFS_ENABLE_AGIOS
     123             :     int* data;
     124             :     ABT_eventual eventual = ABT_EVENTUAL_NULL;
     125             : 
     126             :     /* creating eventual */
     127             :     ABT_eventual_create(sizeof(int64_t), &eventual);
     128             : 
     129             :     unsigned long long int request_id = generate_unique_id();
     130             :     char* agios_path = (char*) in.path;
     131             : 
     132             :     // We should call AGIOS before chunking (as that is an internal way to
     133             :     // handle the requests)
     134             :     if(!agios_add_request(agios_path, AGIOS_WRITE, in.offset,
     135             :                           in.total_chunk_size, request_id,
     136             :                           AGIOS_SERVER_ID_IGNORE, agios_eventual_callback,
     137             :                           eventual)) {
     138             :         GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS",
     139             :                                       __func__);
     140             :     } else {
     141             :         GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS",
     142             :                                       __func__, request_id);
     143             :     }
     144             : 
     145             :     /* Block until the eventual is signaled */
     146             :     ABT_eventual_wait(eventual, (void**) &data);
     147             : 
     148             :     unsigned long long int result = *data;
     149             :     GKFS_DATA->spdlogger()->debug(
     150             :             "{}() request {} was unblocked (offset = {})!", __func__, result,
     151             :             in.offset);
     152             : 
     153             :     ABT_eventual_free(&eventual);
     154             : 
     155             :     // Let AGIOS knows it can release the request, as it is completed
     156             :     if(!agios_release_request(agios_path, AGIOS_WRITE, in.total_chunk_size,
     157             :                               in.offset)) {
     158             :         GKFS_DATA->spdlogger()->error(
     159             :                 "{}() Failed to release request from AGIOS", __func__);
     160             :     }
     161             : #endif
     162             : 
     163             :     /*
     164             :      * 2. Set up buffers for pull bulk transfers
     165             :      */
     166          41 :     void* bulk_buf;                          // buffer for bulk transfer
     167          82 :     vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
     168             :     // create bulk handle and allocated memory for buffer with buf_sizes
     169             :     // information
     170          41 :     ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size,
     171             :                             HG_BULK_READWRITE, &bulk_handle);
     172          41 :     if(ret != HG_SUCCESS) {
     173           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
     174           0 :                                       __func__);
     175           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     176             :                                           static_cast<hg_bulk_t*>(nullptr));
     177             :     }
     178             :     // access the internally allocated memory buffer and put it into buf_ptrs
     179          41 :     uint32_t actual_count;
     180          41 :     ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size,
     181             :                             HG_BULK_READWRITE, 1, &bulk_buf,
     182             :                             &in.total_chunk_size, &actual_count);
     183          41 :     if(ret != HG_SUCCESS || actual_count != 1) {
     184           0 :         GKFS_DATA->spdlogger()->error(
     185             :                 "{}() Failed to access allocated buffer from bulk handle",
     186           0 :                 __func__);
     187           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     188             :     }
     189          41 :     auto const host_id = in.host_id;
     190          41 :     [[maybe_unused]] auto const host_size = in.host_size;
     191             : 
     192          82 :     auto path = make_shared<string>(in.path);
     193             :     // chnk_ids used by this host
     194          82 :     vector<uint64_t> chnk_ids_host(in.chunk_n);
     195             :     // counter to track how many chunks have been assigned
     196          41 :     auto chnk_id_curr = static_cast<uint64_t>(0);
     197             :     // chnk sizes per chunk for this host
     198          82 :     vector<uint64_t> chnk_sizes(in.chunk_n);
     199             :     // how much size is left to assign chunks for writing
     200          41 :     auto chnk_size_left_host = in.total_chunk_size;
     201             :     // temporary traveling pointer
     202          41 :     auto chnk_ptr = static_cast<char*>(bulk_buf);
     203             :     /*
     204             :      * consider the following cases:
     205             :      * 1. Very first chunk has offset or not and is serviced by this node
     206             :      * 2. If offset, will still be only 1 chunk written (small IO): (offset +
     207             :      * bulk_size <= CHUNKSIZE) ? bulk_size
     208             :      * 3. If no offset, will only be 1 chunk written (small IO): (bulk_size <=
     209             :      * CHUNKSIZE) ? bulk_size
     210             :      * 4. Chunks between start and end chunk have size of the CHUNKSIZE
     211             :      * 5. Last chunk (if multiple chunks are written): Don't write CHUNKSIZE but
     212             :      * chnk_size_left for this destination Last chunk can also happen if only
     213             :      * one chunk is written. This is covered by 2 and 3.
     214             :      */
     215             :     // temporary variables
     216          41 :     auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize)
     217          41 :                                  ? bulk_size
     218             :                                  : gkfs::config::rpc::chunksize;
     219          41 :     uint64_t origin_offset;
     220          41 :     uint64_t local_offset;
     221             :     // object for asynchronous disk IO
     222         123 :     gkfs::data::ChunkWriteOperation chunk_op{in.path, in.chunk_n};
     223             : 
     224             :     /*
     225             :      * 3. Calculate chunk sizes that correspond to this host, transfer data, and
     226             :      * start tasks to write to disk
     227             :      */
     228             :     // Start to look for a chunk that hashes to this host with the first chunk
     229             :     // in the buffer
     230         141 :     for(auto chnk_id_file = in.chunk_start;
     231         141 :         chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
     232             :         chnk_id_file++) {
     233             :         // Continue if chunk does not hash to this host
     234             : 
     235         100 :         if(!(gkfs::rpc::get_bitset(write_ops_vect,
     236         100 :                                    chnk_id_file - in.chunk_start))) {
     237           0 :             GKFS_DATA->spdlogger()->trace(
     238             :                     "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
     239           0 :                     __func__, chnk_id_file, host_id, chnk_id_curr);
     240           0 :             continue;
     241             :         }
     242             : 
     243         100 :         if(GKFS_DATA->enable_chunkstats()) {
     244         200 :             GKFS_DATA->stats()->add_write(in.path, chnk_id_file);
     245             :         }
     246             : 
     247         100 :         chnk_ids_host[chnk_id_curr] =
     248             :                 chnk_id_file; // save this id to host chunk list
     249             :         // offset case. Only relevant in the first iteration of the loop and if
     250             :         // the chunk hashes to this host
     251         100 :         if(chnk_id_file == in.chunk_start && in.offset > 0) {
     252             :             // if only 1 destination and 1 chunk (small write) the transfer_size
     253             :             // == bulk_size
     254          11 :             size_t offset_transfer_size = 0;
     255          11 :             if(in.offset + bulk_size <= gkfs::config::rpc::chunksize)
     256             :                 offset_transfer_size = bulk_size;
     257             :             else
     258           0 :                 offset_transfer_size = static_cast<size_t>(
     259           0 :                         gkfs::config::rpc::chunksize - in.offset);
     260          11 :             ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
     261             :                                       in.bulk_handle, 0, bulk_handle, 0,
     262             :                                       offset_transfer_size);
     263          11 :             if(ret != HG_SUCCESS) {
     264           0 :                 GKFS_DATA->spdlogger()->error(
     265             :                         "{}() Failed to pull data from client for chunk {} (startchunk {}; endchunk {}",
     266             :                         __func__, chnk_id_file, in.chunk_start,
     267           0 :                         in.chunk_end - 1);
     268           0 :                 out.err = EBUSY;
     269           0 :                 return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     270           0 :                                                   &bulk_handle);
     271             :             }
     272          11 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     273          11 :             chnk_sizes[chnk_id_curr] = offset_transfer_size;
     274          11 :             chnk_ptr += offset_transfer_size;
     275          11 :             chnk_size_left_host -= offset_transfer_size;
     276             :         } else {
     277          89 :             local_offset = in.total_chunk_size - chnk_size_left_host;
     278             :             // origin offset of a chunk is dependent on a given offset in a
     279             :             // write operation
     280          89 :             if(in.offset > 0)
     281           0 :                 origin_offset = (gkfs::config::rpc::chunksize - in.offset) +
     282           0 :                                 ((chnk_id_file - in.chunk_start) - 1) *
     283             :                                         gkfs::config::rpc::chunksize;
     284             :             else
     285          89 :                 origin_offset = (chnk_id_file - in.chunk_start) *
     286             :                                 gkfs::config::rpc::chunksize;
     287             :             // last chunk might have different transfer_size
     288          89 :             if(chnk_id_curr == in.chunk_n - 1)
     289          30 :                 transfer_size = chnk_size_left_host;
     290          89 :             GKFS_DATA->spdlogger()->trace(
     291             :                     "{}() BULK_TRANSFER_PULL hostid {} file {} chnkid {} total_Csize {} Csize_left {} origin offset {} local offset {} transfersize {}",
     292             :                     __func__, host_id, in.path, chnk_id_file,
     293             :                     in.total_chunk_size, chnk_size_left_host, origin_offset,
     294          89 :                     local_offset, transfer_size);
     295             :             // RDMA the data to here
     296          89 :             ret = margo_bulk_transfer(mid, HG_BULK_PULL, hgi->addr,
     297             :                                       in.bulk_handle, origin_offset,
     298             :                                       bulk_handle, local_offset, transfer_size);
     299          89 :             if(ret != HG_SUCCESS) {
     300           0 :                 GKFS_DATA->spdlogger()->error(
     301             :                         "{}() Failed to pull data from client. file {} chunk {} (startchunk {}; endchunk {})",
     302             :                         __func__, in.path, chnk_id_file, in.chunk_start,
     303           0 :                         (in.chunk_end - 1));
     304           0 :                 out.err = EBUSY;
     305           0 :                 return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     306             :                                                   &bulk_handle);
     307             :             }
     308          89 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     309          89 :             chnk_sizes[chnk_id_curr] = transfer_size;
     310          89 :             chnk_ptr += transfer_size;
     311          89 :             chnk_size_left_host -= transfer_size;
     312             :         }
     313         100 :         try {
     314             :             // start tasklet for writing chunk
     315          41 :             chunk_op.write_nonblock(
     316         100 :                     chnk_id_curr, chnk_ids_host[chnk_id_curr],
     317         100 :                     bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr],
     318         100 :                     (chnk_id_file == in.chunk_start) ? in.offset : 0);
     319           0 :         } catch(const gkfs::data::ChunkWriteOpException& e) {
     320             :             // This exception is caused by setup of Argobots variables. If this
     321             :             // fails, something is really wrong
     322           0 :             GKFS_DATA->spdlogger()->error("{}() while write_nonblock err '{}'",
     323           0 :                                           __func__, e.what());
     324           0 :             return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     325             :         }
     326             :         // next chunk
     327         100 :         chnk_id_curr++;
     328             :     }
     329             :     // Sanity check that all chunks where detected in previous loop
     330             :     // TODO don't proceed if that happens.
     331          41 :     if(chnk_size_left_host != 0)
     332           0 :         GKFS_DATA->spdlogger()->warn(
     333             :                 "{}() Not all chunks were detected!!! Size left {}", __func__,
     334           0 :                 chnk_size_left_host);
     335             :     /*
     336             :      * 4. Read task results and accumulate in out.io_size
     337             :      */
     338          41 :     auto write_result = chunk_op.wait_for_tasks();
     339          41 :     out.err = write_result.first;
     340          41 :     out.io_size = write_result.second;
     341             : 
     342             :     // Sanity check to see if all data has been written
     343          41 :     if(in.total_chunk_size != out.io_size) {
     344           0 :         GKFS_DATA->spdlogger()->warn(
     345             :                 "{}() total chunk size {} and out.io_size {} mismatch!",
     346           0 :                 __func__, in.total_chunk_size, out.io_size);
     347             :     }
     348             : 
     349             :     /*
     350             :      * 5. Respond and cleanup
     351             :      */
     352          41 :     GKFS_DATA->spdlogger()->debug("{}() Sending output response {}", __func__,
     353          41 :                                   out.err);
     354          41 :     auto handler_ret =
     355          41 :             gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     356          41 :     if(GKFS_DATA->enable_stats()) {
     357          41 :         GKFS_DATA->stats()->add_value_size(
     358             :                 gkfs::utils::Stats::SizeOp::write_size, bulk_size);
     359             :     }
     360             :     return handler_ret;
     361             : }
     362             : 
     363             : /**
     364             :  * @brief Serves a read request reading the chunks associated with this
     365             :  * daemon from the node-local FS and transferring them back to the client.
     366             :  * @internal
     367             :  * The read operation has multiple steps:
     368             :  * 1. Setting up all RPC related information
     369             :  * 2. Allocating space for bulk transfer buffers
     370             :  * 3. By processing the RPC input, the chunk IDs that are hashing to this daemon
     371             :  * are computed based on a client-defined interval (start and endchunk id for
     372             :  * this read operation). The client does _not_ provide the daemons with a list
     373             :  * of chunk IDs because it is dynamic data that cannot be part of an RPC input
     374             :  * struct. Therefore, this information would need to be pulled with a bulk
     375             :  * transfer as well, adding unnecessary latency to the overall write operation.
     376             :  *
     377             :  * For each relevant chunk, a non-blocking Arbobots tasklet is launched to read
     378             :  * the data chunk from the backend storage to the allocated buffers.
     379             :  * 4. Wait for all tasklets to finish the read operation while PUSH bulk
     380             :  * transferring each chunk back to the client when a tasklet finishes.
     381             :  * Therefore, bulk transfer and the backend I/O operation are overlapping for
     382             :  * efficiency. The read size is added up for all tasklets.
     383             :  * 5. Respond to client (when all bulk transfers are finished) and cleanup RPC
     384             :  * resources. Any error is reported in the RPC output struct. Note, that backend
     385             :  * read operations are not canceled while in-flight when a task encounters an
     386             :  * error.
     387             :  *
     388             :  * Note, refer to the data backend documentation w.r.t. how Argobots tasklets
     389             :  * work and why they are used.
     390             :  *
     391             :  * All exceptions must be caught here and dealt with accordingly.
     392             :  * @endinteral
     393             :  * @param handle Mercury RPC handle
     394             :  * @return Mercury error code to Mercury
     395             :  */
     396             : hg_return_t
     397          28 : rpc_srv_read(hg_handle_t handle) {
     398             :     /*
     399             :      * 1. Setup
     400             :      */
     401          28 :     rpc_read_data_in_t in{};
     402          28 :     rpc_data_out_t out{};
     403          28 :     hg_bulk_t bulk_handle = nullptr;
     404             :     // Set default out for error
     405          28 :     out.err = EIO;
     406          28 :     out.io_size = 0;
     407             :     // Getting some information from margo
     408          28 :     auto ret = margo_get_input(handle, &in);
     409          28 :     if(ret != HG_SUCCESS) {
     410           0 :         GKFS_DATA->spdlogger()->error(
     411           0 :                 "{}() Could not get RPC input data with err {}", __func__, ret);
     412           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     413             :     }
     414          28 :     auto hgi = margo_get_info(handle);
     415          28 :     auto mid = margo_hg_info_get_instance(hgi);
     416          28 :     auto bulk_size = margo_bulk_get_size(in.bulk_handle);
     417             : 
     418          28 :     GKFS_DATA->spdlogger()->debug(
     419             :             "{}() path: '{}' chunk_start '{}' chunk_end '{}' chunk_n '{}' total_chunk_size '{}' bulk_size: '{}' offset: '{}'",
     420             :             __func__, in.path, in.chunk_start, in.chunk_end, in.chunk_n,
     421          28 :             in.total_chunk_size, bulk_size, in.offset);
     422          28 :     std::vector<uint8_t> read_bitset_vect =
     423          84 :             gkfs::rpc::decompress_bitset(in.wbitset);
     424             : #ifdef GKFS_ENABLE_AGIOS
     425             :     int* data;
     426             :     ABT_eventual eventual = ABT_EVENTUAL_NULL;
     427             : 
     428             :     /* creating eventual */
     429             :     ABT_eventual_create(sizeof(int64_t), &eventual);
     430             : 
     431             :     unsigned long long int request_id = generate_unique_id();
     432             :     char* agios_path = (char*) in.path;
     433             : 
     434             :     // We should call AGIOS before chunking (as that is an internal way to
     435             :     // handle the requests)
     436             :     if(!agios_add_request(agios_path, AGIOS_READ, in.offset,
     437             :                           in.total_chunk_size, request_id,
     438             :                           AGIOS_SERVER_ID_IGNORE, agios_eventual_callback,
     439             :                           eventual)) {
     440             :         GKFS_DATA->spdlogger()->error("{}() Failed to send request to AGIOS",
     441             :                                       __func__);
     442             :     } else {
     443             :         GKFS_DATA->spdlogger()->debug("{}() request {} was sent to AGIOS",
     444             :                                       __func__, request_id);
     445             :     }
     446             : 
     447             :     /* block until the eventual is signaled */
     448             :     ABT_eventual_wait(eventual, (void**) &data);
     449             : 
     450             :     unsigned long long int result = *data;
     451             :     GKFS_DATA->spdlogger()->debug(
     452             :             "{}() request {} was unblocked (offset = {})!", __func__, result,
     453             :             in.offset);
     454             : 
     455             :     ABT_eventual_free(&eventual);
     456             : 
     457             :     // let AGIOS knows it can release the request, as it is completed
     458             :     if(!agios_release_request(agios_path, AGIOS_READ, in.total_chunk_size,
     459             :                               in.offset)) {
     460             :         GKFS_DATA->spdlogger()->error(
     461             :                 "{}() Failed to release request from AGIOS", __func__);
     462             :     }
     463             : #endif
     464             : 
     465             :     /*
     466             :      * 2. Set up buffers for push bulk transfers
     467             :      */
     468          28 :     void* bulk_buf;                          // buffer for bulk transfer
     469          56 :     vector<char*> bulk_buf_ptrs(in.chunk_n); // buffer-chunk offsets
     470             :     // create bulk handle and allocated memory for buffer with buf_sizes
     471             :     // information
     472          28 :     ret = margo_bulk_create(mid, 1, nullptr, &in.total_chunk_size,
     473             :                             HG_BULK_READWRITE, &bulk_handle);
     474          28 :     if(ret != HG_SUCCESS) {
     475           0 :         GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
     476           0 :                                       __func__);
     477           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out,
     478             :                                           static_cast<hg_bulk_t*>(nullptr));
     479             :     }
     480             :     // access the internally allocated memory buffer and put it into buf_ptrs
     481          28 :     uint32_t actual_count;
     482          28 :     ret = margo_bulk_access(bulk_handle, 0, in.total_chunk_size,
     483             :                             HG_BULK_READWRITE, 1, &bulk_buf,
     484             :                             &in.total_chunk_size, &actual_count);
     485          28 :     if(ret != HG_SUCCESS || actual_count != 1) {
     486           0 :         GKFS_DATA->spdlogger()->error(
     487             :                 "{}() Failed to access allocated buffer from bulk handle",
     488           0 :                 __func__);
     489           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     490             :     }
     491             : 
     492          28 :     auto const host_id = in.host_id;
     493             : 
     494          56 :     auto path = make_shared<string>(in.path);
     495             :     // chnk_ids used by this host
     496          56 :     vector<uint64_t> chnk_ids_host(in.chunk_n);
     497             :     // counter to track how many chunks have been assigned
     498          28 :     auto chnk_id_curr = static_cast<uint64_t>(0);
     499             :     // chnk sizes per chunk for this host
     500          56 :     vector<uint64_t> chnk_sizes(in.chunk_n);
     501             :     // local and origin offsets for bulk operations
     502          56 :     vector<uint64_t> local_offsets(in.chunk_n);
     503          56 :     vector<uint64_t> origin_offsets(in.chunk_n);
     504             :     // how much size is left to assign chunks for reading
     505          28 :     auto chnk_size_left_host = in.total_chunk_size;
     506             :     // temporary traveling pointer
     507          28 :     auto chnk_ptr = static_cast<char*>(bulk_buf);
     508             :     // temporary variables
     509          28 :     auto transfer_size = (bulk_size <= gkfs::config::rpc::chunksize)
     510          28 :                                  ? bulk_size
     511             :                                  : gkfs::config::rpc::chunksize;
     512             :     // object for asynchronous disk IO
     513          84 :     gkfs::data::ChunkReadOperation chunk_read_op{in.path, in.chunk_n};
     514             :     /*
     515             :      * 3. Calculate chunk sizes that correspond to this host and start tasks to
     516             :      * read from disk
     517             :      */
     518             :     // Start to look for a chunk that hashes to this host with the first chunk
     519             :     // in the buffer
     520         102 :     for(auto chnk_id_file = in.chunk_start;
     521         102 :         chnk_id_file <= in.chunk_end && chnk_id_curr < in.chunk_n;
     522             :         chnk_id_file++) {
     523             :         // Continue if chunk does not hash to this host
     524             : 
     525             :         // We only check if we are not using replicas
     526             : 
     527          74 :         if(!(gkfs::rpc::get_bitset(read_bitset_vect,
     528          74 :                                    chnk_id_file - in.chunk_start))) {
     529           0 :             GKFS_DATA->spdlogger()->trace(
     530             :                     "{}() chunkid '{}' ignored as it does not match to this host with id '{}'. chnk_id_curr '{}'",
     531           0 :                     __func__, chnk_id_file, host_id, chnk_id_curr);
     532           0 :             continue;
     533             :         }
     534          74 :         if(GKFS_DATA->enable_chunkstats()) {
     535         148 :             GKFS_DATA->stats()->add_read(in.path, chnk_id_file);
     536             :         }
     537             : 
     538             : 
     539          74 :         chnk_ids_host[chnk_id_curr] =
     540             :                 chnk_id_file; // save this id to host chunk list
     541             :         // Only relevant in the first iteration of the loop and if the chunk
     542             :         // hashes to this host
     543          74 :         if(chnk_id_file == in.chunk_start && in.offset > 0) {
     544             :             // if only 1 destination and 1 chunk (small read) the transfer_size
     545             :             // == bulk_size
     546          14 :             size_t offset_transfer_size = 0;
     547          14 :             if(in.offset + bulk_size <= gkfs::config::rpc::chunksize)
     548             :                 offset_transfer_size = bulk_size;
     549             :             else
     550           6 :                 offset_transfer_size = static_cast<size_t>(
     551           6 :                         gkfs::config::rpc::chunksize - in.offset);
     552             :             // Setting later transfer offsets
     553          14 :             local_offsets[chnk_id_curr] = 0;
     554          14 :             origin_offsets[chnk_id_curr] = 0;
     555          14 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     556          14 :             chnk_sizes[chnk_id_curr] = offset_transfer_size;
     557             :             // utils variables
     558          14 :             chnk_ptr += offset_transfer_size;
     559          14 :             chnk_size_left_host -= offset_transfer_size;
     560             :         } else {
     561          60 :             local_offsets[chnk_id_curr] =
     562          60 :                     in.total_chunk_size - chnk_size_left_host;
     563             :             // origin offset of a chunk is dependent on a given offset in a
     564             :             // write operation
     565          60 :             if(in.offset > 0)
     566          14 :                 origin_offsets[chnk_id_curr] =
     567          14 :                         (gkfs::config::rpc::chunksize - in.offset) +
     568          14 :                         ((chnk_id_file - in.chunk_start) - 1) *
     569             :                                 gkfs::config::rpc::chunksize;
     570             :             else
     571          46 :                 origin_offsets[chnk_id_curr] = (chnk_id_file - in.chunk_start) *
     572             :                                                gkfs::config::rpc::chunksize;
     573             :             // last chunk might have different transfer_size
     574          60 :             if(chnk_id_curr == in.chunk_n - 1)
     575          20 :                 transfer_size = chnk_size_left_host;
     576          60 :             bulk_buf_ptrs[chnk_id_curr] = chnk_ptr;
     577          60 :             chnk_sizes[chnk_id_curr] = transfer_size;
     578             :             // utils variables
     579          60 :             chnk_ptr += transfer_size;
     580          60 :             chnk_size_left_host -= transfer_size;
     581             :         }
     582          74 :         try {
     583             :             // start tasklet for read operation
     584          28 :             chunk_read_op.read_nonblock(
     585          74 :                     chnk_id_curr, chnk_ids_host[chnk_id_curr],
     586          74 :                     bulk_buf_ptrs[chnk_id_curr], chnk_sizes[chnk_id_curr],
     587          74 :                     (chnk_id_file == in.chunk_start) ? in.offset : 0);
     588           0 :         } catch(const gkfs::data::ChunkReadOpException& e) {
     589             :             // This exception is caused by setup of Argobots variables. If this
     590             :             // fails, something is really wrong
     591           0 :             GKFS_DATA->spdlogger()->error("{}() while read_nonblock err '{}'",
     592           0 :                                           __func__, e.what());
     593           0 :             return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     594             :         }
     595          74 :         chnk_id_curr++;
     596             :     }
     597             :     // Sanity check that all chunks where detected in previous loop
     598             :     // TODO error out. If we continue this will crash the server when sending
     599             :     // results back that don't exist.
     600          28 :     if(chnk_size_left_host != 0)
     601           0 :         GKFS_DATA->spdlogger()->warn(
     602             :                 "{}() Not all chunks were detected!!! Size left {}", __func__,
     603           0 :                 chnk_size_left_host);
     604             : 
     605          28 :     if(chnk_size_left_host == in.total_chunk_size)
     606             :         return HG_CANCELED;
     607             : 
     608             :     /*
     609             :      * 4. Read task results and accumulate in out.io_size
     610             :      */
     611          28 :     gkfs::data::ChunkReadOperation::bulk_args bulk_args{};
     612          28 :     bulk_args.mid = mid;
     613          28 :     bulk_args.origin_addr = hgi->addr;
     614          28 :     bulk_args.origin_bulk_handle = in.bulk_handle;
     615          28 :     bulk_args.origin_offsets = &origin_offsets;
     616          28 :     bulk_args.local_bulk_handle = bulk_handle;
     617          28 :     bulk_args.local_offsets = &local_offsets;
     618          28 :     bulk_args.chunk_ids = &chnk_ids_host;
     619             :     // wait for all tasklets and push read data back to client
     620          28 :     auto read_result = chunk_read_op.wait_for_tasks_and_push_back(bulk_args);
     621          28 :     out.err = read_result.first;
     622          28 :     out.io_size = read_result.second;
     623             : 
     624             :     /*
     625             :      * 5. Respond and cleanup
     626             :      */
     627          28 :     GKFS_DATA->spdlogger()->debug("{}() Sending output response, err: {}",
     628          28 :                                   __func__, out.err);
     629          28 :     auto handler_ret =
     630          28 :             gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
     631          28 :     if(GKFS_DATA->enable_stats()) {
     632          28 :         GKFS_DATA->stats()->add_value_size(
     633             :                 gkfs::utils::Stats::SizeOp::read_size, bulk_size);
     634             :     }
     635             :     return handler_ret;
     636             : }
     637             : 
     638             : 
     639             : /**
     640             :  * @brief Serves a file truncate request and remove all corresponding chunk
     641             :  * files on this daemon.
     642             :  * @internal
     643             :  * A truncate operation includes decreasing the file size of the metadata entry
     644             :  * (if hashing to this daemon) and removing all corresponding chunks exceeding
     645             :  * the new file size.
     646             :  *
     647             :  * All exceptions must be caught here and dealt with accordingly.
     648             :  * @endinteral
     649             :  * @param handle Mercury RPC handle
     650             :  * @return Mercury error code to Mercury
     651             :  */
     652             : hg_return_t
     653           2 : rpc_srv_truncate(hg_handle_t handle) {
     654           2 :     rpc_trunc_in_t in{};
     655           2 :     rpc_err_out_t out{};
     656           2 :     out.err = EIO;
     657             :     // Getting some information from margo
     658           2 :     auto ret = margo_get_input(handle, &in);
     659           2 :     if(ret != HG_SUCCESS) {
     660           0 :         GKFS_DATA->spdlogger()->error(
     661           0 :                 "{}() Could not get RPC input data with err {}", __func__, ret);
     662           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     663             :     }
     664           2 :     GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__,
     665           2 :                                   in.path, in.length);
     666             : 
     667           6 :     gkfs::data::ChunkTruncateOperation chunk_op{in.path};
     668           2 :     try {
     669             :         // start tasklet for truncate operation
     670           2 :         chunk_op.truncate(in.length);
     671           0 :     } catch(const gkfs::data::ChunkMetaOpException& e) {
     672             :         // This exception is caused by setup of Argobots variables. If this
     673             :         // fails, something is really wrong
     674           0 :         GKFS_DATA->spdlogger()->error("{}() while truncate err '{}'", __func__,
     675           0 :                                       e.what());
     676           0 :         return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     677             :     }
     678             : 
     679             :     // wait and get output
     680           2 :     out.err = chunk_op.wait_for_task();
     681             : 
     682           2 :     GKFS_DATA->spdlogger()->debug("{}() Sending output response '{}'", __func__,
     683           2 :                                   out.err);
     684           2 :     return gkfs::rpc::cleanup_respond(&handle, &in, &out);
     685             : }
     686             : 
     687             : 
     688             : /**
     689             :  * @brief Serves a chunk stat request, responding with space information of the
     690             :  * node local file system.
     691             :  * @internal
     692             :  * All exceptions must be caught here and dealt with accordingly.
     693             :  * @endinteral
     694             :  * @param handle Mercury RPC handle
     695             :  * @return Mercury error code to Mercury
     696             :  */
     697             : hg_return_t
     698           2 : rpc_srv_get_chunk_stat(hg_handle_t handle) {
     699           2 :     GKFS_DATA->spdlogger()->debug("{}() enter", __func__);
     700           2 :     rpc_chunk_stat_out_t out{};
     701           2 :     out.err = EIO;
     702           2 :     try {
     703           2 :         auto chk_stat = GKFS_DATA->storage()->chunk_stat();
     704           2 :         out.chunk_size = chk_stat.chunk_size;
     705           2 :         out.chunk_total = chk_stat.chunk_total;
     706           2 :         out.chunk_free = chk_stat.chunk_free;
     707           2 :         out.err = 0;
     708           0 :     } catch(const gkfs::data::ChunkStorageException& err) {
     709           0 :         GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
     710           0 :         out.err = err.code().value();
     711           0 :     } catch(const ::exception& err) {
     712           0 :         GKFS_DATA->spdlogger()->error(
     713             :                 "{}() Unexpected error when chunk stat '{}'", __func__,
     714           0 :                 err.what());
     715           0 :         out.err = EAGAIN;
     716             :     }
     717             : 
     718             :     // Create output and send it back
     719           2 :     return gkfs::rpc::cleanup_respond(&handle, &out);
     720             : }
     721             : 
     722             : } // namespace
     723             : 
     724          82 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_write)
     725             : 
     726          56 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_read)
     727             : 
     728           4 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_truncate)
     729             : 
     730           4 : DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_chunk_stat)
     731             : 
     732             : #ifdef GKFS_ENABLE_AGIOS
     733             : void*
     734             : agios_eventual_callback(int64_t request_id, void* info) {
     735             :     GKFS_DATA->spdlogger()->debug("{}() custom callback request {} is ready",
     736             :                                   __func__, request_id);
     737             : 
     738             :     ABT_eventual_set((ABT_eventual) info, &request_id, sizeof(int64_t));
     739             : 
     740             :     return 0;
     741             : }
     742             : #endif

Generated by: LCOV version 1.16