LCOV - code coverage report
Current view: top level - include/daemon/ops - data.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 22 29 75.9 %
Date: 2024-04-30 13:21:35 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :   Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
       3             :   Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
       4             : 
       5             :   This software was partially supported by the
       6             :   EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
       7             : 
       8             :   This software was partially supported by the
       9             :   ADA-FS project under the SPPEXA project funded by the DFG.
      10             : 
      11             :   This file is part of GekkoFS.
      12             : 
      13             :   GekkoFS is free software: you can redistribute it and/or modify
      14             :   it under the terms of the GNU General Public License as published by
      15             :   the Free Software Foundation, either version 3 of the License, or
      16             :   (at your option) any later version.
      17             : 
      18             :   GekkoFS is distributed in the hope that it will be useful,
      19             :   but WITHOUT ANY WARRANTY; without even the implied warranty of
      20             :   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      21             :   GNU General Public License for more details.
      22             : 
      23             :   You should have received a copy of the GNU General Public License
      24             :   along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.
      25             : 
      26             :   SPDX-License-Identifier: GPL-3.0-or-later
      27             : */
      28             : /**
      29             :  * @brief Classes to encapsulate asynchronous chunk operations.
      30             :  * All operations on chunk files must go through the Argobots' task queues.
      31             :  * Otherwise operations may overtake operations in the I/O queues.
      32             :  * This applies to write, read, and truncate which may modify the middle of a
      33             :  * chunk, essentially a write operation.
      34             :  *
      35             :  * In the future, this class may be used to provide failure tolerance for IO
      36             :  * tasks
      37             :  *
      38             :  * Base class using the CRTP idiom
      39             :  */
      40             : 
      41             : #ifndef GEKKOFS_DAEMON_DATA_HPP
      42             : #define GEKKOFS_DAEMON_DATA_HPP
      43             : 
      44             : #include <daemon/daemon.hpp>
      45             : #include <common/common_defs.hpp>
      46             : 
      47             : #include <string>
      48             : #include <vector>
      49             : 
      50             : extern "C" {
      51             : #include <abt.h>
      52             : #include <margo.h>
      53             : }
      54             : 
      55             : namespace gkfs::data {
      56             : 
      57             : /**
      58             :  * @brief Internal Exception for all general chunk operations.
      59             :  */
      60             : class ChunkOpException : public std::runtime_error {
      61             : public:
      62           0 :     explicit ChunkOpException(const std::string& s) : std::runtime_error(s){};
      63             : };
      64             : /**
      65             :  * @brief Internal Exception for all chunk write operations.
      66             :  */
      67             : class ChunkWriteOpException : public ChunkOpException {
      68             : public:
      69           0 :     explicit ChunkWriteOpException(const std::string& s)
      70           0 :         : ChunkOpException(s){};
      71             : };
      72             : /**
      73             :  * @brief Internal Exception for all chunk read operations.
      74             :  */
      75             : class ChunkReadOpException : public ChunkOpException {
      76             : public:
      77           0 :     explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s){};
      78             : };
      79             : /**
      80             :  * @brief Internal Exception for all chunk metadata operations.
      81             :  */
      82             : class ChunkMetaOpException : public ChunkOpException {
      83             : public:
      84           0 :     explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s){};
      85             : };
      86             : /**
      87             :  * @brief Base class (using CRTP idiom) for all chunk operations.
      88             :  *
      89             :  * This class is not thread-safe.
      90             :  * @internal
      91             :  * Each I/O operation, i.e., an write or read RPC request, operating on one or
      92             :  * multiple chunks is represented by a corresponding ChunkOperation object. To
      93             :  * keep conflicting operations of other I/O requests on the same chunk in order,
      94             :  * Argobots tasklets are used. Tasklets are lightweight threads compared to
      95             :  * User-Level Threads (ULTs). When ULTs run in an ES, their execution may be
      96             :  * interleaved inside an ES because they can yield control to the scheduler or
      97             :  * another ULT. If this happens during a write, for example, and data is written
      98             :  * after one another while sitting in the queue, data might get written in the
      99             :  * wrong order. Tasklets are an efficient way to prevent this.
     100             :  *
     101             :  * Each ChunkOperation includes the path to the directory where all chunks are
     102             :  * located, a number of tasks (one for each chunk), and their corresponding
     103             :  * eventuals (one for each task). ABT_eventuals offer a similar concept as
     104             :  * std::future to provide call-back functionality.
     105             :  *
     106             :  * Truncate requests also create a ChunkOperation since it requires removing a
     107             :  * number of chunks and must honor the same order of operations to chunks.
     108             :  *
     109             :  * In the future, additional optimizations can be made since atomicity of the
     110             :  * tasklets might be too long if they depend on the results of a, e.g., pread().
     111             :  * Therefore, a queue per chunk could be beneficial (this has not been tested
     112             :  * yet).
     113             :  *
     114             :  * Note, at this time, CRTP is only required for `cancel_all_tasks()`.
     115             :  *
     116             :  * @endinternal
     117             :  * @tparam OperationType for write, read, and truncate.
     118             :  */
     119             : template <class OperationType>
     120             : class ChunkOperation {
     121             : 
     122             : protected:
     123             :     const std::string path_; //!< Path to the chunk directory of the file
     124             : 
     125             :     std::vector<ABT_task> abt_tasks_; //!< Tasklets operating on the file
     126             :     std::vector<ABT_eventual>
     127             :             task_eventuals_; //!< Eventuals for tasklet callbacks
     128             : 
     129             : public:
     130             :     /**
     131             :      * @brief Constructor for a single chunk operation.
     132             :      * @param path Path to chunk directory
     133             :      */
     134             :     explicit ChunkOperation(const std::string& path)
     135             :         : ChunkOperation(path, 1){};
     136             : 
     137             :     /**
     138             :      * @brief Constructor to initialize tasklet and eventual lists.
     139             :      * @param path Path to chunk directory
     140             :      * @param n Number of chunk operations by I/O request
     141             :      */
     142         142 :     ChunkOperation(std::string path, size_t n) : path_(std::move(path)) {
     143             :         // Knowing n beforehand is important and cannot be dynamic. Otherwise
     144             :         // eventuals cause seg faults
     145          71 :         abt_tasks_.resize(n);
     146          71 :         task_eventuals_.resize(n);
     147          71 :     };
     148             :     /**
     149             :      * Destructor calls cancel_all_tasks to clean up all used resources.
     150             :      */
     151          71 :     ~ChunkOperation() {
     152          71 :         cancel_all_tasks();
     153          71 :     }
     154             : 
     155             :     /**
     156             :      * @brief Cancels all tasks in-flight and free resources.
     157             :      */
     158             :     void
     159          71 :     cancel_all_tasks() {
     160          71 :         GKFS_DATA->spdlogger()->trace("{}() enter", __func__);
     161         247 :         for(auto& task : abt_tasks_) {
     162         176 :             if(task) {
     163         176 :                 ABT_task_cancel(task);
     164         176 :                 ABT_task_free(&task);
     165             :             }
     166             :         }
     167         247 :         for(auto& eventual : task_eventuals_) {
     168         176 :             if(eventual) {
     169           0 :                 ABT_eventual_reset(eventual);
     170           0 :                 ABT_eventual_free(&eventual);
     171             :             }
     172             :         }
     173          71 :         abt_tasks_.clear();
     174          71 :         task_eventuals_.clear();
     175          71 :         static_cast<OperationType*>(this)->clear_task_args();
     176          71 :     }
     177             : };
     178             : 
     179             : /**
     180             :  * @brief Chunk operation class for truncate operations.
     181             :  *
     182             :  * Note, a truncate operation is a special case and forced to only use a single
     183             :  * task.
     184             :  */
     185             : class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> {
     186             :     friend class ChunkOperation<ChunkTruncateOperation>;
     187             : 
     188             : private:
     189             :     struct chunk_truncate_args {
     190             :         const std::string* path; //!< Path to affected chunk directory
     191             :         size_t size; //!< GekkoFS file offset (_NOT_ chunk file) to truncate to
     192             :         ABT_eventual eventual; //!< Attached eventual
     193             :     };                         //!< Struct for a truncate operation
     194             : 
     195             :     struct chunk_truncate_args task_arg_ {}; //!< tasklet input struct
     196             :     /**
     197             :      * @brief Exclusively used by the Argobots tasklet.
     198             :      * @param _arg Pointer to input struct of type <chunk_truncate_args>. Error
     199             :      * code<int> is placed into eventual to signal its failure or success.
     200             :      */
     201             :     static void
     202             :     truncate_abt(void* _arg);
     203             :     /**
     204             :      * @brief Resets the task_arg_ struct.
     205             :      */
     206             :     void
     207             :     clear_task_args();
     208             : 
     209             : public:
     210             :     explicit ChunkTruncateOperation(const std::string& path);
     211             : 
     212           2 :     ~ChunkTruncateOperation() = default;
     213             : 
     214             :     /**
     215             :      * @brief Truncate request called by RPC handler function and launches a
     216             :      * non-blocking tasklet.
     217             :      * @param size GekkoFS file offset (_NOT_ chunk file) to truncate to
     218             :      * @throws ChunkMetaOpException
     219             :      */
     220             :     void
     221             :     truncate(size_t size);
     222             :     /**
     223             :      * @brief Wait for the truncate tasklet to finish.
     224             :      * @return Error code for success (0) or failure
     225             :      */
     226             :     int
     227             :     wait_for_task();
     228             : };
     229             : 
     230             : /**
     231             :  * @brief Chunk operation class for write operations with one object per write
     232             :  * RPC request. May involve multiple I/O task depending on the number of chunks
     233             :  * involved.
     234             :  */
     235             : class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> {
     236             :     friend class ChunkOperation<ChunkWriteOperation>;
     237             : 
     238             : private:
     239             :     struct chunk_write_args {
     240             :         const std::string* path;      //!< Path to affected chunk directory
     241             :         const char* buf;              //!< Buffer for chunk
     242             :         gkfs::rpc::chnk_id_t chnk_id; //!< chunk id that is affected
     243             :         size_t size;                  //!< size to write for chunk
     244             :         off64_t off;                  //!< offset for individual chunk
     245             :         ABT_eventual eventual;        //!< Attached eventual
     246             :     };                                //!< Struct for an chunk write operation
     247             : 
     248             :     std::vector<struct chunk_write_args> task_args_; //!< tasklet input structs
     249             :     /**
     250             :      * @brief Exclusively used by the Argobots tasklet.
     251             :      * @param _arg Pointer to input struct of type <chunk_write_args>. Error
     252             :      * code<int> is placed into eventual to signal its failure or success.
     253             :      */
     254             :     static void
     255             :     write_file_abt(void* _arg);
     256             :     /**
     257             :      * @brief Resets the task_arg_ struct.
     258             :      */
     259             :     void
     260             :     clear_task_args();
     261             : 
     262             : public:
     263             :     ChunkWriteOperation(const std::string& path, size_t n);
     264             : 
     265          41 :     ~ChunkWriteOperation() = default;
     266             : 
     267             :     /**
     268             :      * @brief Write request called by RPC handler function and launches a
     269             :      * non-blocking tasklet.
     270             :      * @param idx Number of non-blocking write for write RPC request
     271             :      * @param chunk_id The affected chunk id
     272             :      * @param bulk_buf_ptr The buffer to write for the chunk
     273             :      * @param size Size to write for chunk
     274             :      * @param offset Offset for individual chunk
     275             :      * @throws ChunkWriteOpException
     276             :      */
     277             :     void
     278             :     write_nonblock(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr,
     279             :                    size_t size, off64_t offset);
     280             : 
     281             :     /**
     282             :      * @brief Wait for all write tasklets to finish.
     283             :      * @return Pair for error code for success (0) or failure and written size
     284             :      */
     285             :     std::pair<int, size_t>
     286             :     wait_for_tasks();
     287             : };
     288             : 
     289             : /**
     290             :  * @brief Chunk operation class for read operations with one object per read
     291             :  * RPC request. May involve multiple I/O task depending on the number of chunks
     292             :  * involved.
     293             :  */
     294             : class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> {
     295             :     friend class ChunkOperation<ChunkReadOperation>;
     296             : 
     297             : private:
     298             :     struct chunk_read_args {
     299             :         const std::string* path;      //!< Path to affected chunk directory
     300             :         char* buf;                    //!< Buffer for chunk
     301             :         gkfs::rpc::chnk_id_t chnk_id; //!< chunk id that is affected
     302             :         size_t size;                  //!< size to read from chunk
     303             :         off64_t off;                  //!< offset for individual chunk
     304             :         ABT_eventual eventual;        //!< Attached eventual
     305             :     };                                //!< Struct for an chunk read operation
     306             : 
     307             :     std::vector<struct chunk_read_args> task_args_; //!< tasklet input structs
     308             :     /**
     309             :      * @brief Exclusively used by the Argobots tasklet.
     310             :      * @param _arg Pointer to input struct of type <chunk_read_args>. Error
     311             :      * code<int> is placed into eventual to signal its failure or success.
     312             :      */
     313             :     static void
     314             :     read_file_abt(void* _arg);
     315             :     /**
     316             :      * @brief Resets the task_arg_ struct.
     317             :      */
     318             :     void
     319             :     clear_task_args();
     320             : 
     321             : public:
     322             :     struct bulk_args {
     323             :         margo_instance_id mid;               //!< Margo instance ID of server
     324             :         hg_addr_t origin_addr;               //!< abstract address of client
     325             :         hg_bulk_t origin_bulk_handle;        //!< bulk handle from client
     326             :         std::vector<size_t>* origin_offsets; //!< offsets in origin buffer
     327             :         hg_bulk_t local_bulk_handle;         //!< local bulk handle for PUSH
     328             :         std::vector<size_t>* local_offsets;  //!< offsets in local buffer
     329             :         std::vector<uint64_t>* chunk_ids;    //!< all chunk ids in this read
     330             :     }; //!< Struct to push read data to the client
     331             : 
     332             :     ChunkReadOperation(const std::string& path, size_t n);
     333             : 
     334          28 :     ~ChunkReadOperation() = default;
     335             : 
     336             :     /**
     337             :      * @brief Read request called by RPC handler function and launches a
     338             :      * non-blocking tasklet.
     339             :      * @param idx Number of non-blocking write for write RPC request
     340             :      * @param chunk_id The affected chunk id
     341             :      * @param bulk_buf_ptr The buffer for reading chunk
     342             :      * @param size Size to read from chunk
     343             :      * @param offset Offset for individual chunk
     344             :      * @throws ChunkReadOpException
     345             :      */
     346             :     void
     347             :     read_nonblock(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr,
     348             :                   size_t size, off64_t offset);
     349             : 
     350             :     /**
     351             :      * @brief Waits for all local I/O operations to finish and push buffers back
     352             :      * to the daemon.
     353             :      * @param args Bulk_args for push transfer
     354             :      * @return Pair for error code for success (0) or failure and read size
     355             :      */
     356             :     std::pair<int, size_t>
     357             :     wait_for_tasks_and_push_back(const bulk_args& args);
     358             : };
     359             : 
     360             : } // namespace gkfs::data
     361             : 
     362             : #endif // GEKKOFS_DAEMON_DATA_HPP

Generated by: LCOV version 1.16