Line data Source code
1 : /* 2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain 3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany 4 : 5 : This software was partially supported by the 6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). 7 : 8 : This software was partially supported by the 9 : ADA-FS project under the SPPEXA project funded by the DFG. 10 : 11 : This file is part of GekkoFS. 12 : 13 : GekkoFS is free software: you can redistribute it and/or modify 14 : it under the terms of the GNU General Public License as published by 15 : the Free Software Foundation, either version 3 of the License, or 16 : (at your option) any later version. 17 : 18 : GekkoFS is distributed in the hope that it will be useful, 19 : but WITHOUT ANY WARRANTY; without even the implied warranty of 20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 21 : GNU General Public License for more details. 22 : 23 : You should have received a copy of the GNU General Public License 24 : along with GekkoFS. If not, see <https://www.gnu.org/licenses/>. 25 : 26 : SPDX-License-Identifier: GPL-3.0-or-later 27 : */ 28 : /** 29 : * @brief Classes to encapsulate asynchronous chunk operations. 30 : * All operations on chunk files must go through the Argobots' task queues. 31 : * Otherwise operations may overtake operations in the I/O queues. 32 : * This applies to write, read, and truncate which may modify the middle of a 33 : * chunk, essentially a write operation. 34 : * 35 : * In the future, this class may be used to provide failure tolerance for IO 36 : * tasks 37 : * 38 : * Base class using the CRTP idiom 39 : */ 40 : 41 : #ifndef GEKKOFS_DAEMON_DATA_HPP 42 : #define GEKKOFS_DAEMON_DATA_HPP 43 : 44 : #include <daemon/daemon.hpp> 45 : #include <common/common_defs.hpp> 46 : 47 : #include <string> 48 : #include <vector> 49 : 50 : extern "C" { 51 : #include <abt.h> 52 : #include <margo.h> 53 : } 54 : 55 : namespace gkfs::data { 56 : 57 : /** 58 : * @brief Internal Exception for all general chunk operations. 59 : */ 60 : class ChunkOpException : public std::runtime_error { 61 : public: 62 0 : explicit ChunkOpException(const std::string& s) : std::runtime_error(s){}; 63 : }; 64 : /** 65 : * @brief Internal Exception for all chunk write operations. 66 : */ 67 : class ChunkWriteOpException : public ChunkOpException { 68 : public: 69 0 : explicit ChunkWriteOpException(const std::string& s) 70 0 : : ChunkOpException(s){}; 71 : }; 72 : /** 73 : * @brief Internal Exception for all chunk read operations. 74 : */ 75 : class ChunkReadOpException : public ChunkOpException { 76 : public: 77 0 : explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s){}; 78 : }; 79 : /** 80 : * @brief Internal Exception for all chunk metadata operations. 81 : */ 82 : class ChunkMetaOpException : public ChunkOpException { 83 : public: 84 0 : explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s){}; 85 : }; 86 : /** 87 : * @brief Base class (using CRTP idiom) for all chunk operations. 88 : * 89 : * This class is not thread-safe. 90 : * @internal 91 : * Each I/O operation, i.e., an write or read RPC request, operating on one or 92 : * multiple chunks is represented by a corresponding ChunkOperation object. To 93 : * keep conflicting operations of other I/O requests on the same chunk in order, 94 : * Argobots tasklets are used. Tasklets are lightweight threads compared to 95 : * User-Level Threads (ULTs). When ULTs run in an ES, their execution may be 96 : * interleaved inside an ES because they can yield control to the scheduler or 97 : * another ULT. If this happens during a write, for example, and data is written 98 : * after one another while sitting in the queue, data might get written in the 99 : * wrong order. Tasklets are an efficient way to prevent this. 100 : * 101 : * Each ChunkOperation includes the path to the directory where all chunks are 102 : * located, a number of tasks (one for each chunk), and their corresponding 103 : * eventuals (one for each task). ABT_eventuals offer a similar concept as 104 : * std::future to provide call-back functionality. 105 : * 106 : * Truncate requests also create a ChunkOperation since it requires removing a 107 : * number of chunks and must honor the same order of operations to chunks. 108 : * 109 : * In the future, additional optimizations can be made since atomicity of the 110 : * tasklets might be too long if they depend on the results of a, e.g., pread(). 111 : * Therefore, a queue per chunk could be beneficial (this has not been tested 112 : * yet). 113 : * 114 : * Note, at this time, CRTP is only required for `cancel_all_tasks()`. 115 : * 116 : * @endinternal 117 : * @tparam OperationType for write, read, and truncate. 118 : */ 119 : template <class OperationType> 120 : class ChunkOperation { 121 : 122 : protected: 123 : const std::string path_; //!< Path to the chunk directory of the file 124 : 125 : std::vector<ABT_task> abt_tasks_; //!< Tasklets operating on the file 126 : std::vector<ABT_eventual> 127 : task_eventuals_; //!< Eventuals for tasklet callbacks 128 : 129 : public: 130 : /** 131 : * @brief Constructor for a single chunk operation. 132 : * @param path Path to chunk directory 133 : */ 134 : explicit ChunkOperation(const std::string& path) 135 : : ChunkOperation(path, 1){}; 136 : 137 : /** 138 : * @brief Constructor to initialize tasklet and eventual lists. 139 : * @param path Path to chunk directory 140 : * @param n Number of chunk operations by I/O request 141 : */ 142 142 : ChunkOperation(std::string path, size_t n) : path_(std::move(path)) { 143 : // Knowing n beforehand is important and cannot be dynamic. Otherwise 144 : // eventuals cause seg faults 145 71 : abt_tasks_.resize(n); 146 71 : task_eventuals_.resize(n); 147 71 : }; 148 : /** 149 : * Destructor calls cancel_all_tasks to clean up all used resources. 150 : */ 151 71 : ~ChunkOperation() { 152 71 : cancel_all_tasks(); 153 71 : } 154 : 155 : /** 156 : * @brief Cancels all tasks in-flight and free resources. 157 : */ 158 : void 159 71 : cancel_all_tasks() { 160 71 : GKFS_DATA->spdlogger()->trace("{}() enter", __func__); 161 247 : for(auto& task : abt_tasks_) { 162 176 : if(task) { 163 176 : ABT_task_cancel(task); 164 176 : ABT_task_free(&task); 165 : } 166 : } 167 247 : for(auto& eventual : task_eventuals_) { 168 176 : if(eventual) { 169 0 : ABT_eventual_reset(eventual); 170 0 : ABT_eventual_free(&eventual); 171 : } 172 : } 173 71 : abt_tasks_.clear(); 174 71 : task_eventuals_.clear(); 175 71 : static_cast<OperationType*>(this)->clear_task_args(); 176 71 : } 177 : }; 178 : 179 : /** 180 : * @brief Chunk operation class for truncate operations. 181 : * 182 : * Note, a truncate operation is a special case and forced to only use a single 183 : * task. 184 : */ 185 : class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> { 186 : friend class ChunkOperation<ChunkTruncateOperation>; 187 : 188 : private: 189 : struct chunk_truncate_args { 190 : const std::string* path; //!< Path to affected chunk directory 191 : size_t size; //!< GekkoFS file offset (_NOT_ chunk file) to truncate to 192 : ABT_eventual eventual; //!< Attached eventual 193 : }; //!< Struct for a truncate operation 194 : 195 : struct chunk_truncate_args task_arg_ {}; //!< tasklet input struct 196 : /** 197 : * @brief Exclusively used by the Argobots tasklet. 198 : * @param _arg Pointer to input struct of type <chunk_truncate_args>. Error 199 : * code<int> is placed into eventual to signal its failure or success. 200 : */ 201 : static void 202 : truncate_abt(void* _arg); 203 : /** 204 : * @brief Resets the task_arg_ struct. 205 : */ 206 : void 207 : clear_task_args(); 208 : 209 : public: 210 : explicit ChunkTruncateOperation(const std::string& path); 211 : 212 2 : ~ChunkTruncateOperation() = default; 213 : 214 : /** 215 : * @brief Truncate request called by RPC handler function and launches a 216 : * non-blocking tasklet. 217 : * @param size GekkoFS file offset (_NOT_ chunk file) to truncate to 218 : * @throws ChunkMetaOpException 219 : */ 220 : void 221 : truncate(size_t size); 222 : /** 223 : * @brief Wait for the truncate tasklet to finish. 224 : * @return Error code for success (0) or failure 225 : */ 226 : int 227 : wait_for_task(); 228 : }; 229 : 230 : /** 231 : * @brief Chunk operation class for write operations with one object per write 232 : * RPC request. May involve multiple I/O task depending on the number of chunks 233 : * involved. 234 : */ 235 : class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> { 236 : friend class ChunkOperation<ChunkWriteOperation>; 237 : 238 : private: 239 : struct chunk_write_args { 240 : const std::string* path; //!< Path to affected chunk directory 241 : const char* buf; //!< Buffer for chunk 242 : gkfs::rpc::chnk_id_t chnk_id; //!< chunk id that is affected 243 : size_t size; //!< size to write for chunk 244 : off64_t off; //!< offset for individual chunk 245 : ABT_eventual eventual; //!< Attached eventual 246 : }; //!< Struct for an chunk write operation 247 : 248 : std::vector<struct chunk_write_args> task_args_; //!< tasklet input structs 249 : /** 250 : * @brief Exclusively used by the Argobots tasklet. 251 : * @param _arg Pointer to input struct of type <chunk_write_args>. Error 252 : * code<int> is placed into eventual to signal its failure or success. 253 : */ 254 : static void 255 : write_file_abt(void* _arg); 256 : /** 257 : * @brief Resets the task_arg_ struct. 258 : */ 259 : void 260 : clear_task_args(); 261 : 262 : public: 263 : ChunkWriteOperation(const std::string& path, size_t n); 264 : 265 41 : ~ChunkWriteOperation() = default; 266 : 267 : /** 268 : * @brief Write request called by RPC handler function and launches a 269 : * non-blocking tasklet. 270 : * @param idx Number of non-blocking write for write RPC request 271 : * @param chunk_id The affected chunk id 272 : * @param bulk_buf_ptr The buffer to write for the chunk 273 : * @param size Size to write for chunk 274 : * @param offset Offset for individual chunk 275 : * @throws ChunkWriteOpException 276 : */ 277 : void 278 : write_nonblock(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, 279 : size_t size, off64_t offset); 280 : 281 : /** 282 : * @brief Wait for all write tasklets to finish. 283 : * @return Pair for error code for success (0) or failure and written size 284 : */ 285 : std::pair<int, size_t> 286 : wait_for_tasks(); 287 : }; 288 : 289 : /** 290 : * @brief Chunk operation class for read operations with one object per read 291 : * RPC request. May involve multiple I/O task depending on the number of chunks 292 : * involved. 293 : */ 294 : class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> { 295 : friend class ChunkOperation<ChunkReadOperation>; 296 : 297 : private: 298 : struct chunk_read_args { 299 : const std::string* path; //!< Path to affected chunk directory 300 : char* buf; //!< Buffer for chunk 301 : gkfs::rpc::chnk_id_t chnk_id; //!< chunk id that is affected 302 : size_t size; //!< size to read from chunk 303 : off64_t off; //!< offset for individual chunk 304 : ABT_eventual eventual; //!< Attached eventual 305 : }; //!< Struct for an chunk read operation 306 : 307 : std::vector<struct chunk_read_args> task_args_; //!< tasklet input structs 308 : /** 309 : * @brief Exclusively used by the Argobots tasklet. 310 : * @param _arg Pointer to input struct of type <chunk_read_args>. Error 311 : * code<int> is placed into eventual to signal its failure or success. 312 : */ 313 : static void 314 : read_file_abt(void* _arg); 315 : /** 316 : * @brief Resets the task_arg_ struct. 317 : */ 318 : void 319 : clear_task_args(); 320 : 321 : public: 322 : struct bulk_args { 323 : margo_instance_id mid; //!< Margo instance ID of server 324 : hg_addr_t origin_addr; //!< abstract address of client 325 : hg_bulk_t origin_bulk_handle; //!< bulk handle from client 326 : std::vector<size_t>* origin_offsets; //!< offsets in origin buffer 327 : hg_bulk_t local_bulk_handle; //!< local bulk handle for PUSH 328 : std::vector<size_t>* local_offsets; //!< offsets in local buffer 329 : std::vector<uint64_t>* chunk_ids; //!< all chunk ids in this read 330 : }; //!< Struct to push read data to the client 331 : 332 : ChunkReadOperation(const std::string& path, size_t n); 333 : 334 28 : ~ChunkReadOperation() = default; 335 : 336 : /** 337 : * @brief Read request called by RPC handler function and launches a 338 : * non-blocking tasklet. 339 : * @param idx Number of non-blocking write for write RPC request 340 : * @param chunk_id The affected chunk id 341 : * @param bulk_buf_ptr The buffer for reading chunk 342 : * @param size Size to read from chunk 343 : * @param offset Offset for individual chunk 344 : * @throws ChunkReadOpException 345 : */ 346 : void 347 : read_nonblock(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, 348 : size_t size, off64_t offset); 349 : 350 : /** 351 : * @brief Waits for all local I/O operations to finish and push buffers back 352 : * to the daemon. 353 : * @param args Bulk_args for push transfer 354 : * @return Pair for error code for success (0) or failure and read size 355 : */ 356 : std::pair<int, size_t> 357 : wait_for_tasks_and_push_back(const bulk_args& args); 358 : }; 359 : 360 : } // namespace gkfs::data 361 : 362 : #endif // GEKKOFS_DAEMON_DATA_HPP