LCOV - code coverage report
Current view: top level - src/daemon/ops - data.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 167 229 72.9 %
Date: 2024-04-23 00:09:24 Functions: 15 15 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief Member definitions for ChunkOperation classes.
      30             :  */
      31             : 
      32             : #include <daemon/ops/data.hpp>
      33             : #include <daemon/backend/data/chunk_storage.hpp>
      34             : #include <common/arithmetic/arithmetic.hpp>
      35             : #include <utility>
      36             : 
      37             : extern "C" {
      38             : #include <mercury_types.h>
      39             : }
      40             : 
      41             : using namespace std;
      42             : 
      43             : namespace gkfs::data {
      44             : 
      45             : /* ------------------------------------------------------------------------
      46             :  * -------------------------- TRUNCATE ------------------------------------
      47             :  * ------------------------------------------------------------------------*/
      48             : 
      49             : 
      50             : /**
      51             :  * @internal
      52             :  * Exclusively used by the Argobots tasklet. Argument args has the following
      53             :  fields:
      54             :  * const string* path;
      55             :    size_t size;
      56             :    ABT_eventual* eventual;
      57             :  * This function is driven by the IO pool. So, there is a maximum allowed number
      58             :  of concurrent operations allowed per daemon.
      59             :  * @endinternal
      60             :  */
      61             : void
      62           2 : ChunkTruncateOperation::truncate_abt(void* _arg) {
      63             : 
      64             :     // import pow2-optimized arithmetic functions
      65           2 :     using namespace gkfs::utils::arithmetic;
      66             : 
      67           2 :     assert(_arg);
      68             :     // Unpack args
      69           2 :     auto* arg = static_cast<struct chunk_truncate_args*>(_arg);
      70           2 :     const string& path = *(arg->path);
      71           2 :     const size_t size = arg->size;
      72           2 :     int err_response = 0;
      73           2 :     try {
      74             :         // get chunk from where to cut off
      75           2 :         auto chunk_id_start = block_index(size, gkfs::config::rpc::chunksize);
      76             :         // do not last delete chunk if it is in the middle of a chunk
      77           2 :         auto left_pad = block_overrun(size, gkfs::config::rpc::chunksize);
      78           2 :         if(left_pad != 0) {
      79           2 :             GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start,
      80             :                                                       left_pad);
      81           2 :             chunk_id_start++;
      82             :         }
      83           2 :         GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start);
      84           0 :     } catch(const ChunkStorageException& err) {
      85           0 :         GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
      86           0 :         err_response = err.code().value();
      87           0 :     } catch(const ::exception& err) {
      88           0 :         GKFS_DATA->spdlogger()->error(
      89             :                 "{}() Unexpected error truncating file '{}' to length '{}'",
      90           0 :                 __func__, path, size);
      91           0 :         err_response = EIO;
      92             :     }
      93           2 :     ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response));
      94           2 : }
      95             : 
      96             : void
      97           2 : ChunkTruncateOperation::clear_task_args() {
      98           2 :     task_arg_ = {};
      99           2 : }
     100             : 
     101           2 : ChunkTruncateOperation::ChunkTruncateOperation(const string& path)
     102           4 :     : ChunkOperation{path, 1} {}
     103             : 
     104             : /**
     105             :  * @internal
     106             :  * Starts a tasklet for requested truncate. In essence all chunk files after the
     107             :  * given offset is removed Only one truncate call is allowed at a time
     108             :  * @endinternal
     109             :  */
     110             : void
     111           2 : ChunkTruncateOperation::truncate(size_t size) {
     112           2 :     assert(!task_eventuals_[0]);
     113           2 :     GKFS_DATA->spdlogger()->trace(
     114             :             "ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__,
     115           2 :             path_, size);
     116             :     // sizeof(int) comes from truncate's return type
     117           2 :     auto abt_err = ABT_eventual_create(
     118           2 :             sizeof(int), &task_eventuals_[0]); // truncate file return value
     119           2 :     if(abt_err != ABT_SUCCESS) {
     120           0 :         auto err_str = fmt::format(
     121             :                 "ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'",
     122           0 :                 __func__, abt_err);
     123           0 :         throw ChunkMetaOpException(err_str);
     124             :     }
     125             : 
     126           2 :     auto& task_arg = task_arg_;
     127           2 :     task_arg.path = &path_;
     128           2 :     task_arg.size = size;
     129           2 :     task_arg.eventual = task_eventuals_[0];
     130             : 
     131           2 :     abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_,
     132           2 :                               &abt_tasks_[0]);
     133           2 :     if(abt_err != ABT_SUCCESS) {
     134           0 :         auto err_str = fmt::format(
     135             :                 "ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'",
     136           0 :                 __func__, abt_err);
     137           0 :         throw ChunkMetaOpException(err_str);
     138             :     }
     139           2 : }
     140             : 
     141             : 
     142             : int
     143           2 : ChunkTruncateOperation::wait_for_task() {
     144           2 :     GKFS_DATA->spdlogger()->trace(
     145           2 :             "ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_);
     146           2 :     int trunc_err = 0;
     147             : 
     148           2 :     int* task_err = nullptr;
     149           2 :     auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err);
     150           2 :     if(abt_err != ABT_SUCCESS) {
     151           0 :         GKFS_DATA->spdlogger()->error(
     152             :                 "ChunkTruncateOperation::{}() Error when waiting on ABT eventual",
     153           0 :                 __func__);
     154           0 :         ABT_eventual_free(&task_eventuals_[0]);
     155           0 :         return EIO;
     156             :     }
     157           2 :     assert(task_err != nullptr);
     158           2 :     if(*task_err != 0) {
     159           0 :         trunc_err = *task_err;
     160             :     }
     161           2 :     ABT_eventual_free(&task_eventuals_[0]);
     162             :     return trunc_err;
     163             : }
     164             : 
     165             : /* ------------------------------------------------------------------------
     166             :  * ----------------------------- WRITE ------------------------------------
     167             :  * ------------------------------------------------------------------------*/
     168             : 
     169             : /**
     170             :  * @internal
     171             :  * Used by an argobots tasklet. Argument args has the following fields:
     172             :  * const string* path;
     173             :    const char* buf;
     174             :    const gkfs::rpc::chnk_id_t* chnk_id;
     175             :    size_t size;
     176             :    off64_t off;
     177             :    ABT_eventual* eventual;
     178             :  * This function is driven by the IO pool. So, there is a maximum allowed number
     179             :  of concurrent IO operations per daemon.
     180             :  * This function is called by tasklets as this function cannot be allowed to
     181             :  block.
     182             :  * @endinternal
     183             :  */
     184             : void
     185         100 : ChunkWriteOperation::write_file_abt(void* _arg) {
     186         100 :     assert(_arg);
     187             :     // Unpack args
     188         100 :     auto* arg = static_cast<struct chunk_write_args*>(_arg);
     189         100 :     const string& path = *(arg->path);
     190         100 :     ssize_t wrote{0};
     191         100 :     try {
     192         100 :         wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf,
     193             :                                                   arg->size, arg->off);
     194           0 :     } catch(const ChunkStorageException& err) {
     195           0 :         GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
     196           0 :         wrote = -(err.code().value());
     197           0 :     } catch(const ::exception& err) {
     198           0 :         GKFS_DATA->spdlogger()->error(
     199             :                 "{}() Unexpected error writing chunk {} of file {}", __func__,
     200           0 :                 arg->chnk_id, path);
     201           0 :         wrote = -EIO;
     202             :     }
     203         100 :     ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote));
     204         100 : }
     205             : 
     206             : void
     207          41 : ChunkWriteOperation::clear_task_args() {
     208          41 :     task_args_.clear();
     209          41 : }
     210             : 
     211          41 : ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n)
     212          82 :     : ChunkOperation{path, n} {
     213          41 :     task_args_.resize(n);
     214          41 : }
     215             : 
     216             : /**
     217             :  * @internal
     218             :  * Write buffer from a single chunk referenced by its ID. Put task into IO
     219             :  * queue. On failure the write operations is aborted, throwing an error, and
     220             :  * cleaned up. The caller may repeat a failed call.
     221             :  * @endinternal
     222             :  */
     223             : void
     224         100 : ChunkWriteOperation::write_nonblock(size_t idx, const uint64_t chunk_id,
     225             :                                     const char* bulk_buf_ptr, const size_t size,
     226             :                                     const off64_t offset) {
     227         100 :     assert(idx < task_args_.size());
     228         100 :     GKFS_DATA->spdlogger()->trace(
     229             :             "ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
     230         100 :             __func__, idx, path_, size, offset);
     231             :     // sizeof(ssize_t) comes from pwrite's return type
     232         100 :     auto abt_err = ABT_eventual_create(
     233             :             sizeof(ssize_t),
     234         100 :             &task_eventuals_[idx]); // written file return value
     235         100 :     if(abt_err != ABT_SUCCESS) {
     236           0 :         auto err_str = fmt::format(
     237             :                 "ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'",
     238           0 :                 __func__, abt_err);
     239           0 :         throw ChunkWriteOpException(err_str);
     240             :     }
     241             : 
     242         100 :     auto& task_arg = task_args_[idx];
     243         100 :     task_arg.path = &path_;
     244         100 :     task_arg.buf = bulk_buf_ptr;
     245         100 :     task_arg.chnk_id = chunk_id;
     246         100 :     task_arg.size = size;
     247         100 :     task_arg.off = offset;
     248         100 :     task_arg.eventual = task_eventuals_[idx];
     249             : 
     250         100 :     abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt,
     251         100 :                               &task_args_[idx], &abt_tasks_[idx]);
     252         100 :     if(abt_err != ABT_SUCCESS) {
     253           0 :         auto err_str = fmt::format(
     254             :                 "ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'",
     255           0 :                 __func__, abt_err);
     256           0 :         throw ChunkWriteOpException(err_str);
     257             :     }
     258         100 : }
     259             : 
     260             : pair<int, size_t>
     261          41 : ChunkWriteOperation::wait_for_tasks() {
     262          41 :     GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'",
     263          41 :                                   __func__, path_);
     264          41 :     size_t total_written = 0;
     265          41 :     int io_err = 0;
     266             :     /*
     267             :      * gather all Eventual's information. do not throw here to properly cleanup
     268             :      * all eventuals On error, cleanup eventuals and set written data to 0 as
     269             :      * written data is corrupted
     270             :      */
     271         141 :     for(auto& e : task_eventuals_) {
     272         100 :         ssize_t* task_size = nullptr;
     273         100 :         auto abt_err = ABT_eventual_wait(e, (void**) &task_size);
     274         100 :         if(abt_err != ABT_SUCCESS) {
     275           0 :             GKFS_DATA->spdlogger()->error(
     276             :                     "ChunkWriteOperation::{}() Error when waiting on ABT eventual",
     277           0 :                     __func__);
     278           0 :             io_err = EIO;
     279           0 :             ABT_eventual_free(&e);
     280           0 :             continue;
     281             :         }
     282         100 :         if(io_err != 0) {
     283           0 :             ABT_eventual_free(&e);
     284           0 :             continue;
     285             :         }
     286         100 :         assert(task_size != nullptr);
     287         100 :         if(*task_size < 0) {
     288           0 :             io_err = -(*task_size);
     289             :         } else {
     290         100 :             total_written += *task_size;
     291             :         }
     292         100 :         ABT_eventual_free(&e);
     293             :     }
     294             :     // in case of error set written size to zero as data would be corrupted
     295          41 :     if(io_err != 0)
     296           0 :         total_written = 0;
     297          41 :     return make_pair(io_err, total_written);
     298             : }
     299             : 
     300             : /* ------------------------------------------------------------------------
     301             :  * -------------------------- READ ----------------------------------------
     302             :  * ------------------------------------------------------------------------*/
     303             : 
     304             : /**
     305             :  * @internal
     306             :  * Used by an argobots tasklet. Argument args has the following fields:
     307             :  * const string* path;
     308             :    char* buf;
     309             :    const gkfs::rpc::chnk_id_t* chnk_id;
     310             :    size_t size;
     311             :    off64_t off;
     312             :    ABT_eventual* eventual;
     313             :  * This function is driven by the IO pool. so there is a maximum allowed number
     314             :  of concurrent IO operations per daemon.
     315             :  * This function is called by tasklets, as this function cannot be allowed to
     316             :  block.
     317             :  * @endinternal
     318             :  */
     319             : void
     320          74 : ChunkReadOperation::read_file_abt(void* _arg) {
     321          74 :     assert(_arg);
     322             :     // unpack args
     323          74 :     auto* arg = static_cast<struct chunk_read_args*>(_arg);
     324          74 :     const string& path = *(arg->path);
     325          74 :     ssize_t read = 0;
     326          74 :     try {
     327             :         // Under expected circumstances (error or no error) read_chunk will
     328             :         // signal the eventual
     329          74 :         read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf,
     330             :                                                 arg->size, arg->off);
     331          28 :     } catch(const ChunkStorageException& err) {
     332          14 :         GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
     333          14 :         read = -(err.code().value());
     334           0 :     } catch(const ::exception& err) {
     335           0 :         GKFS_DATA->spdlogger()->error(
     336             :                 "{}() Unexpected error reading chunk {} of file {}", __func__,
     337           0 :                 arg->chnk_id, path);
     338           0 :         read = -EIO;
     339             :     }
     340          74 :     ABT_eventual_set(arg->eventual, &read, sizeof(read));
     341          74 : }
     342             : 
     343             : void
     344          28 : ChunkReadOperation::clear_task_args() {
     345          28 :     task_args_.clear();
     346          28 : }
     347             : 
     348          28 : ChunkReadOperation::ChunkReadOperation(const string& path, size_t n)
     349          56 :     : ChunkOperation{path, n} {
     350          28 :     task_args_.resize(n);
     351          28 : }
     352             : 
     353             : /**
     354             :  * @internal
     355             :  * Read buffer to a single chunk referenced by its ID. Put task into IO queue.
     356             :  * On failure the read operations is aborted, throwing an error, and cleaned up.
     357             :  * The caller may repeat a failed call.
     358             :  * @endinternal
     359             :  */
     360             : void
     361          74 : ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id,
     362             :                                   char* bulk_buf_ptr, const size_t size,
     363             :                                   const off64_t offset) {
     364          74 :     assert(idx < task_args_.size());
     365          74 :     GKFS_DATA->spdlogger()->trace(
     366             :             "ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
     367          74 :             __func__, idx, path_, size, offset);
     368             :     // sizeof(ssize_t) comes from pread's return type
     369          74 :     auto abt_err = ABT_eventual_create(
     370          74 :             sizeof(ssize_t), &task_eventuals_[idx]); // read file return value
     371          74 :     if(abt_err != ABT_SUCCESS) {
     372           0 :         auto err_str = fmt::format(
     373             :                 "ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'",
     374           0 :                 __func__, abt_err);
     375           0 :         throw ChunkReadOpException(err_str);
     376             :     }
     377             : 
     378          74 :     auto& task_arg = task_args_[idx];
     379          74 :     task_arg.path = &path_;
     380          74 :     task_arg.buf = bulk_buf_ptr;
     381          74 :     task_arg.chnk_id = chunk_id;
     382          74 :     task_arg.size = size;
     383          74 :     task_arg.off = offset;
     384          74 :     task_arg.eventual = task_eventuals_[idx];
     385             : 
     386          74 :     abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt,
     387          74 :                               &task_args_[idx], &abt_tasks_[idx]);
     388          74 :     if(abt_err != ABT_SUCCESS) {
     389           0 :         auto err_str = fmt::format(
     390             :                 "ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'",
     391           0 :                 __func__, abt_err);
     392           0 :         throw ChunkReadOpException(err_str);
     393             :     }
     394          74 : }
     395             : 
     396             : pair<int, size_t>
     397          28 : ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) {
     398          28 :     GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'",
     399          28 :                                   __func__, path_);
     400          28 :     assert(args.chunk_ids->size() == task_args_.size());
     401             :     size_t total_read = 0;
     402             :     int io_err = 0;
     403             : 
     404             :     /*
     405             :      * gather all Eventual's information. do not throw here to properly cleanup
     406             :      * all eventuals As soon as an error is encountered, bulk_transfers will no
     407             :      * longer be executed as the data would be corrupted The loop continues
     408             :      * until all eventuals have been cleaned and freed.
     409             :      */
     410         102 :     for(uint64_t idx = 0; idx < task_args_.size(); idx++) {
     411          74 :         ssize_t* task_size = nullptr;
     412          74 :         auto abt_err =
     413          74 :                 ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size);
     414          74 :         if(abt_err != ABT_SUCCESS) {
     415           0 :             GKFS_DATA->spdlogger()->error(
     416             :                     "ChunkReadOperation::{}() Error when waiting on ABT eventual",
     417           0 :                     __func__);
     418           0 :             io_err = EIO;
     419           0 :             ABT_eventual_free(&task_eventuals_[idx]);
     420          24 :             continue;
     421             :         }
     422             :         // error occured. stop processing but clean up
     423          74 :         if(io_err != 0) {
     424           0 :             ABT_eventual_free(&task_eventuals_[idx]);
     425           0 :             continue;
     426             :         }
     427          74 :         assert(task_size != nullptr);
     428          74 :         if(*task_size < 0) {
     429             :             // sparse regions do not have chunk files and are therefore skipped
     430          14 :             if(-(*task_size) == ENOENT) {
     431          14 :                 ABT_eventual_free(&task_eventuals_[idx]);
     432          14 :                 continue;
     433             :             }
     434           0 :             io_err = -(*task_size); // make error code > 0
     435          60 :         } else if(*task_size == 0) {
     436             :             // read size of 0 is not an error and can happen because reading the
     437             :             // end-of-file
     438          10 :             ABT_eventual_free(&task_eventuals_[idx]);
     439          10 :             continue;
     440             :         } else {
     441             :             // successful case, push read data back to client
     442          50 :             GKFS_DATA->spdlogger()->trace(
     443             :                     "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'",
     444          50 :                     __func__, path_, args.chunk_ids->at(idx),
     445          50 :                     args.origin_offsets->at(idx), args.local_offsets->at(idx),
     446          50 :                     *task_size);
     447          50 :             assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx));
     448         150 :             auto margo_err = margo_bulk_transfer(
     449          50 :                     args.mid, HG_BULK_PUSH, args.origin_addr,
     450          50 :                     args.origin_bulk_handle, args.origin_offsets->at(idx),
     451          50 :                     args.local_bulk_handle, args.local_offsets->at(idx),
     452          50 :                     *task_size);
     453          50 :             if(margo_err != HG_SUCCESS) {
     454           0 :                 GKFS_DATA->spdlogger()->error(
     455             :                         "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'",
     456           0 :                         __func__, margo_err);
     457           0 :                 io_err = EBUSY;
     458           0 :                 continue;
     459             :             }
     460          50 :             total_read += *task_size;
     461             :         }
     462          50 :         ABT_eventual_free(&task_eventuals_[idx]);
     463             :     }
     464             :     // in case of error set read size to zero as data would be corrupted
     465          28 :     if(io_err != 0)
     466           0 :         total_read = 0;
     467          28 :     return make_pair(io_err, total_read);
     468             : }
     469             : 
     470             : } // namespace gkfs::data

Generated by: LCOV version 1.16