Line data Source code
1 : /*
2 : Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
3 : Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
4 :
5 : This software was partially supported by the
6 : EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
7 :
8 : This software was partially supported by the
9 : ADA-FS project under the SPPEXA project funded by the DFG.
10 :
11 : This file is part of GekkoFS.
12 :
13 : GekkoFS is free software: you can redistribute it and/or modify
14 : it under the terms of the GNU General Public License as published by
15 : the Free Software Foundation, either version 3 of the License, or
16 : (at your option) any later version.
17 :
18 : GekkoFS is distributed in the hope that it will be useful,
19 : but WITHOUT ANY WARRANTY; without even the implied warranty of
20 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 : GNU General Public License for more details.
22 :
23 : You should have received a copy of the GNU General Public License
24 : along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
25 :
26 : SPDX-License-Identifier: GPL-3.0-or-later
27 : */
28 : /**
29 : * @brief Member definitions for ChunkOperation classes.
30 : */
31 :
32 : #include <daemon/ops/data.hpp>
33 : #include <daemon/backend/data/chunk_storage.hpp>
34 : #include <common/arithmetic/arithmetic.hpp>
35 : #include <utility>
36 :
37 : extern "C" {
38 : #include <mercury_types.h>
39 : }
40 :
41 : using namespace std;
42 :
43 : namespace gkfs::data {
44 :
45 : /* ------------------------------------------------------------------------
46 : * -------------------------- TRUNCATE ------------------------------------
47 : * ------------------------------------------------------------------------*/
48 :
49 :
50 : /**
51 : * @internal
52 : * Exclusively used by the Argobots tasklet. Argument args has the following
53 : fields:
54 : * const string* path;
55 : size_t size;
56 : ABT_eventual* eventual;
57 : * This function is driven by the IO pool. So, there is a maximum allowed number
58 : of concurrent operations allowed per daemon.
59 : * @endinternal
60 : */
61 : void
62 2 : ChunkTruncateOperation::truncate_abt(void* _arg) {
63 :
64 : // import pow2-optimized arithmetic functions
65 2 : using namespace gkfs::utils::arithmetic;
66 :
67 2 : assert(_arg);
68 : // Unpack args
69 2 : auto* arg = static_cast<struct chunk_truncate_args*>(_arg);
70 2 : const string& path = *(arg->path);
71 2 : const size_t size = arg->size;
72 2 : int err_response = 0;
73 2 : try {
74 : // get chunk from where to cut off
75 2 : auto chunk_id_start = block_index(size, gkfs::config::rpc::chunksize);
76 : // do not last delete chunk if it is in the middle of a chunk
77 2 : auto left_pad = block_overrun(size, gkfs::config::rpc::chunksize);
78 2 : if(left_pad != 0) {
79 2 : GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start,
80 : left_pad);
81 2 : chunk_id_start++;
82 : }
83 2 : GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start);
84 0 : } catch(const ChunkStorageException& err) {
85 0 : GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
86 0 : err_response = err.code().value();
87 0 : } catch(const ::exception& err) {
88 0 : GKFS_DATA->spdlogger()->error(
89 : "{}() Unexpected error truncating file '{}' to length '{}'",
90 0 : __func__, path, size);
91 0 : err_response = EIO;
92 : }
93 2 : ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response));
94 2 : }
95 :
96 : void
97 2 : ChunkTruncateOperation::clear_task_args() {
98 2 : task_arg_ = {};
99 2 : }
100 :
101 2 : ChunkTruncateOperation::ChunkTruncateOperation(const string& path)
102 4 : : ChunkOperation{path, 1} {}
103 :
104 : /**
105 : * @internal
106 : * Starts a tasklet for requested truncate. In essence all chunk files after the
107 : * given offset is removed Only one truncate call is allowed at a time
108 : * @endinternal
109 : */
110 : void
111 2 : ChunkTruncateOperation::truncate(size_t size) {
112 2 : assert(!task_eventuals_[0]);
113 2 : GKFS_DATA->spdlogger()->trace(
114 : "ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__,
115 2 : path_, size);
116 : // sizeof(int) comes from truncate's return type
117 2 : auto abt_err = ABT_eventual_create(
118 2 : sizeof(int), &task_eventuals_[0]); // truncate file return value
119 2 : if(abt_err != ABT_SUCCESS) {
120 0 : auto err_str = fmt::format(
121 : "ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'",
122 0 : __func__, abt_err);
123 0 : throw ChunkMetaOpException(err_str);
124 : }
125 :
126 2 : auto& task_arg = task_arg_;
127 2 : task_arg.path = &path_;
128 2 : task_arg.size = size;
129 2 : task_arg.eventual = task_eventuals_[0];
130 :
131 2 : abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_,
132 2 : &abt_tasks_[0]);
133 2 : if(abt_err != ABT_SUCCESS) {
134 0 : auto err_str = fmt::format(
135 : "ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'",
136 0 : __func__, abt_err);
137 0 : throw ChunkMetaOpException(err_str);
138 : }
139 2 : }
140 :
141 :
142 : int
143 2 : ChunkTruncateOperation::wait_for_task() {
144 2 : GKFS_DATA->spdlogger()->trace(
145 2 : "ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_);
146 2 : int trunc_err = 0;
147 :
148 2 : int* task_err = nullptr;
149 2 : auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err);
150 2 : if(abt_err != ABT_SUCCESS) {
151 0 : GKFS_DATA->spdlogger()->error(
152 : "ChunkTruncateOperation::{}() Error when waiting on ABT eventual",
153 0 : __func__);
154 0 : ABT_eventual_free(&task_eventuals_[0]);
155 0 : return EIO;
156 : }
157 2 : assert(task_err != nullptr);
158 2 : if(*task_err != 0) {
159 0 : trunc_err = *task_err;
160 : }
161 2 : ABT_eventual_free(&task_eventuals_[0]);
162 : return trunc_err;
163 : }
164 :
165 : /* ------------------------------------------------------------------------
166 : * ----------------------------- WRITE ------------------------------------
167 : * ------------------------------------------------------------------------*/
168 :
169 : /**
170 : * @internal
171 : * Used by an argobots tasklet. Argument args has the following fields:
172 : * const string* path;
173 : const char* buf;
174 : const gkfs::rpc::chnk_id_t* chnk_id;
175 : size_t size;
176 : off64_t off;
177 : ABT_eventual* eventual;
178 : * This function is driven by the IO pool. So, there is a maximum allowed number
179 : of concurrent IO operations per daemon.
180 : * This function is called by tasklets as this function cannot be allowed to
181 : block.
182 : * @endinternal
183 : */
184 : void
185 100 : ChunkWriteOperation::write_file_abt(void* _arg) {
186 100 : assert(_arg);
187 : // Unpack args
188 100 : auto* arg = static_cast<struct chunk_write_args*>(_arg);
189 100 : const string& path = *(arg->path);
190 100 : ssize_t wrote{0};
191 100 : try {
192 100 : wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf,
193 : arg->size, arg->off);
194 0 : } catch(const ChunkStorageException& err) {
195 0 : GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
196 0 : wrote = -(err.code().value());
197 0 : } catch(const ::exception& err) {
198 0 : GKFS_DATA->spdlogger()->error(
199 : "{}() Unexpected error writing chunk {} of file {}", __func__,
200 0 : arg->chnk_id, path);
201 0 : wrote = -EIO;
202 : }
203 100 : ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote));
204 100 : }
205 :
206 : void
207 41 : ChunkWriteOperation::clear_task_args() {
208 41 : task_args_.clear();
209 41 : }
210 :
211 41 : ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n)
212 82 : : ChunkOperation{path, n} {
213 41 : task_args_.resize(n);
214 41 : }
215 :
216 : /**
217 : * @internal
218 : * Write buffer from a single chunk referenced by its ID. Put task into IO
219 : * queue. On failure the write operations is aborted, throwing an error, and
220 : * cleaned up. The caller may repeat a failed call.
221 : * @endinternal
222 : */
223 : void
224 100 : ChunkWriteOperation::write_nonblock(size_t idx, const uint64_t chunk_id,
225 : const char* bulk_buf_ptr, const size_t size,
226 : const off64_t offset) {
227 100 : assert(idx < task_args_.size());
228 100 : GKFS_DATA->spdlogger()->trace(
229 : "ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
230 100 : __func__, idx, path_, size, offset);
231 : // sizeof(ssize_t) comes from pwrite's return type
232 100 : auto abt_err = ABT_eventual_create(
233 : sizeof(ssize_t),
234 100 : &task_eventuals_[idx]); // written file return value
235 100 : if(abt_err != ABT_SUCCESS) {
236 0 : auto err_str = fmt::format(
237 : "ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'",
238 0 : __func__, abt_err);
239 0 : throw ChunkWriteOpException(err_str);
240 : }
241 :
242 100 : auto& task_arg = task_args_[idx];
243 100 : task_arg.path = &path_;
244 100 : task_arg.buf = bulk_buf_ptr;
245 100 : task_arg.chnk_id = chunk_id;
246 100 : task_arg.size = size;
247 100 : task_arg.off = offset;
248 100 : task_arg.eventual = task_eventuals_[idx];
249 :
250 100 : abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt,
251 100 : &task_args_[idx], &abt_tasks_[idx]);
252 100 : if(abt_err != ABT_SUCCESS) {
253 0 : auto err_str = fmt::format(
254 : "ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'",
255 0 : __func__, abt_err);
256 0 : throw ChunkWriteOpException(err_str);
257 : }
258 100 : }
259 :
260 : pair<int, size_t>
261 41 : ChunkWriteOperation::wait_for_tasks() {
262 41 : GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'",
263 41 : __func__, path_);
264 41 : size_t total_written = 0;
265 41 : int io_err = 0;
266 : /*
267 : * gather all Eventual's information. do not throw here to properly cleanup
268 : * all eventuals On error, cleanup eventuals and set written data to 0 as
269 : * written data is corrupted
270 : */
271 141 : for(auto& e : task_eventuals_) {
272 100 : ssize_t* task_size = nullptr;
273 100 : auto abt_err = ABT_eventual_wait(e, (void**) &task_size);
274 100 : if(abt_err != ABT_SUCCESS) {
275 0 : GKFS_DATA->spdlogger()->error(
276 : "ChunkWriteOperation::{}() Error when waiting on ABT eventual",
277 0 : __func__);
278 0 : io_err = EIO;
279 0 : ABT_eventual_free(&e);
280 0 : continue;
281 : }
282 100 : if(io_err != 0) {
283 0 : ABT_eventual_free(&e);
284 0 : continue;
285 : }
286 100 : assert(task_size != nullptr);
287 100 : if(*task_size < 0) {
288 0 : io_err = -(*task_size);
289 : } else {
290 100 : total_written += *task_size;
291 : }
292 100 : ABT_eventual_free(&e);
293 : }
294 : // in case of error set written size to zero as data would be corrupted
295 41 : if(io_err != 0)
296 0 : total_written = 0;
297 41 : return make_pair(io_err, total_written);
298 : }
299 :
300 : /* ------------------------------------------------------------------------
301 : * -------------------------- READ ----------------------------------------
302 : * ------------------------------------------------------------------------*/
303 :
304 : /**
305 : * @internal
306 : * Used by an argobots tasklet. Argument args has the following fields:
307 : * const string* path;
308 : char* buf;
309 : const gkfs::rpc::chnk_id_t* chnk_id;
310 : size_t size;
311 : off64_t off;
312 : ABT_eventual* eventual;
313 : * This function is driven by the IO pool. so there is a maximum allowed number
314 : of concurrent IO operations per daemon.
315 : * This function is called by tasklets, as this function cannot be allowed to
316 : block.
317 : * @endinternal
318 : */
319 : void
320 74 : ChunkReadOperation::read_file_abt(void* _arg) {
321 74 : assert(_arg);
322 : // unpack args
323 74 : auto* arg = static_cast<struct chunk_read_args*>(_arg);
324 74 : const string& path = *(arg->path);
325 74 : ssize_t read = 0;
326 74 : try {
327 : // Under expected circumstances (error or no error) read_chunk will
328 : // signal the eventual
329 74 : read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf,
330 : arg->size, arg->off);
331 28 : } catch(const ChunkStorageException& err) {
332 14 : GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
333 14 : read = -(err.code().value());
334 0 : } catch(const ::exception& err) {
335 0 : GKFS_DATA->spdlogger()->error(
336 : "{}() Unexpected error reading chunk {} of file {}", __func__,
337 0 : arg->chnk_id, path);
338 0 : read = -EIO;
339 : }
340 74 : ABT_eventual_set(arg->eventual, &read, sizeof(read));
341 74 : }
342 :
343 : void
344 28 : ChunkReadOperation::clear_task_args() {
345 28 : task_args_.clear();
346 28 : }
347 :
348 28 : ChunkReadOperation::ChunkReadOperation(const string& path, size_t n)
349 56 : : ChunkOperation{path, n} {
350 28 : task_args_.resize(n);
351 28 : }
352 :
353 : /**
354 : * @internal
355 : * Read buffer to a single chunk referenced by its ID. Put task into IO queue.
356 : * On failure the read operations is aborted, throwing an error, and cleaned up.
357 : * The caller may repeat a failed call.
358 : * @endinternal
359 : */
360 : void
361 74 : ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id,
362 : char* bulk_buf_ptr, const size_t size,
363 : const off64_t offset) {
364 74 : assert(idx < task_args_.size());
365 74 : GKFS_DATA->spdlogger()->trace(
366 : "ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
367 74 : __func__, idx, path_, size, offset);
368 : // sizeof(ssize_t) comes from pread's return type
369 74 : auto abt_err = ABT_eventual_create(
370 74 : sizeof(ssize_t), &task_eventuals_[idx]); // read file return value
371 74 : if(abt_err != ABT_SUCCESS) {
372 0 : auto err_str = fmt::format(
373 : "ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'",
374 0 : __func__, abt_err);
375 0 : throw ChunkReadOpException(err_str);
376 : }
377 :
378 74 : auto& task_arg = task_args_[idx];
379 74 : task_arg.path = &path_;
380 74 : task_arg.buf = bulk_buf_ptr;
381 74 : task_arg.chnk_id = chunk_id;
382 74 : task_arg.size = size;
383 74 : task_arg.off = offset;
384 74 : task_arg.eventual = task_eventuals_[idx];
385 :
386 74 : abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt,
387 74 : &task_args_[idx], &abt_tasks_[idx]);
388 74 : if(abt_err != ABT_SUCCESS) {
389 0 : auto err_str = fmt::format(
390 : "ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'",
391 0 : __func__, abt_err);
392 0 : throw ChunkReadOpException(err_str);
393 : }
394 74 : }
395 :
396 : pair<int, size_t>
397 28 : ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) {
398 28 : GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'",
399 28 : __func__, path_);
400 28 : assert(args.chunk_ids->size() == task_args_.size());
401 : size_t total_read = 0;
402 : int io_err = 0;
403 :
404 : /*
405 : * gather all Eventual's information. do not throw here to properly cleanup
406 : * all eventuals As soon as an error is encountered, bulk_transfers will no
407 : * longer be executed as the data would be corrupted The loop continues
408 : * until all eventuals have been cleaned and freed.
409 : */
410 102 : for(uint64_t idx = 0; idx < task_args_.size(); idx++) {
411 74 : ssize_t* task_size = nullptr;
412 74 : auto abt_err =
413 74 : ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size);
414 74 : if(abt_err != ABT_SUCCESS) {
415 0 : GKFS_DATA->spdlogger()->error(
416 : "ChunkReadOperation::{}() Error when waiting on ABT eventual",
417 0 : __func__);
418 0 : io_err = EIO;
419 0 : ABT_eventual_free(&task_eventuals_[idx]);
420 24 : continue;
421 : }
422 : // error occured. stop processing but clean up
423 74 : if(io_err != 0) {
424 0 : ABT_eventual_free(&task_eventuals_[idx]);
425 0 : continue;
426 : }
427 74 : assert(task_size != nullptr);
428 74 : if(*task_size < 0) {
429 : // sparse regions do not have chunk files and are therefore skipped
430 14 : if(-(*task_size) == ENOENT) {
431 14 : ABT_eventual_free(&task_eventuals_[idx]);
432 14 : continue;
433 : }
434 0 : io_err = -(*task_size); // make error code > 0
435 60 : } else if(*task_size == 0) {
436 : // read size of 0 is not an error and can happen because reading the
437 : // end-of-file
438 10 : ABT_eventual_free(&task_eventuals_[idx]);
439 10 : continue;
440 : } else {
441 : // successful case, push read data back to client
442 50 : GKFS_DATA->spdlogger()->trace(
443 : "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'",
444 50 : __func__, path_, args.chunk_ids->at(idx),
445 50 : args.origin_offsets->at(idx), args.local_offsets->at(idx),
446 50 : *task_size);
447 50 : assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx));
448 150 : auto margo_err = margo_bulk_transfer(
449 50 : args.mid, HG_BULK_PUSH, args.origin_addr,
450 50 : args.origin_bulk_handle, args.origin_offsets->at(idx),
451 50 : args.local_bulk_handle, args.local_offsets->at(idx),
452 50 : *task_size);
453 50 : if(margo_err != HG_SUCCESS) {
454 0 : GKFS_DATA->spdlogger()->error(
455 : "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'",
456 0 : __func__, margo_err);
457 0 : io_err = EBUSY;
458 0 : continue;
459 : }
460 50 : total_read += *task_size;
461 : }
462 50 : ABT_eventual_free(&task_eventuals_[idx]);
463 : }
464 : // in case of error set read size to zero as data would be corrupted
465 28 : if(io_err != 0)
466 0 : total_read = 0;
467 28 : return make_pair(io_err, total_read);
468 : }
469 :
470 : } // namespace gkfs::data
|