Program Listing for File data.cpp
↰ Return to documentation for file (src/daemon/ops/data.cpp
)
/*
Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS.
GekkoFS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
GekkoFS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
SPDX-License-Identifier: GPL-3.0-or-later
*/
#include <daemon/ops/data.hpp>
#include <daemon/backend/data/chunk_storage.hpp>
#include <common/arithmetic/arithmetic.hpp>
#include <utility>
extern "C" {
#include <mercury_types.h>
}
using namespace std;
namespace gkfs::data {
/* ------------------------------------------------------------------------
* -------------------------- TRUNCATE ------------------------------------
* ------------------------------------------------------------------------*/
void
ChunkTruncateOperation::truncate_abt(void* _arg) {
// import pow2-optimized arithmetic functions
using namespace gkfs::utils::arithmetic;
assert(_arg);
// Unpack args
auto* arg = static_cast<struct chunk_truncate_args*>(_arg);
const string& path = *(arg->path);
const size_t size = arg->size;
int err_response = 0;
try {
// get chunk from where to cut off
auto chunk_id_start = block_index(size, gkfs::config::rpc::chunksize);
// do not last delete chunk if it is in the middle of a chunk
auto left_pad = block_overrun(size, gkfs::config::rpc::chunksize);
if(left_pad != 0) {
GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start,
left_pad);
chunk_id_start++;
}
GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start);
} catch(const ChunkStorageException& err) {
GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
err_response = err.code().value();
} catch(const ::exception& err) {
GKFS_DATA->spdlogger()->error(
"{}() Unexpected error truncating file '{}' to length '{}'",
__func__, path, size);
err_response = EIO;
}
ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response));
}
void
ChunkTruncateOperation::clear_task_args() {
task_arg_ = {};
}
ChunkTruncateOperation::ChunkTruncateOperation(const string& path)
: ChunkOperation{path, 1} {}
void
ChunkTruncateOperation::truncate(size_t size) {
assert(!task_eventuals_[0]);
GKFS_DATA->spdlogger()->trace(
"ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__,
path_, size);
// sizeof(int) comes from truncate's return type
auto abt_err = ABT_eventual_create(
sizeof(int), &task_eventuals_[0]); // truncate file return value
if(abt_err != ABT_SUCCESS) {
auto err_str = fmt::format(
"ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'",
__func__, abt_err);
throw ChunkMetaOpException(err_str);
}
auto& task_arg = task_arg_;
task_arg.path = &path_;
task_arg.size = size;
task_arg.eventual = task_eventuals_[0];
abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_,
&abt_tasks_[0]);
if(abt_err != ABT_SUCCESS) {
auto err_str = fmt::format(
"ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'",
__func__, abt_err);
throw ChunkMetaOpException(err_str);
}
}
int
ChunkTruncateOperation::wait_for_task() {
GKFS_DATA->spdlogger()->trace(
"ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_);
int trunc_err = 0;
int* task_err = nullptr;
auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err);
if(abt_err != ABT_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"ChunkTruncateOperation::{}() Error when waiting on ABT eventual",
__func__);
ABT_eventual_free(&task_eventuals_[0]);
return EIO;
}
assert(task_err != nullptr);
if(*task_err != 0) {
trunc_err = *task_err;
}
ABT_eventual_free(&task_eventuals_[0]);
return trunc_err;
}
/* ------------------------------------------------------------------------
* ----------------------------- WRITE ------------------------------------
* ------------------------------------------------------------------------*/
void
ChunkWriteOperation::write_file_abt(void* _arg) {
assert(_arg);
// Unpack args
auto* arg = static_cast<struct chunk_write_args*>(_arg);
const string& path = *(arg->path);
ssize_t wrote{0};
try {
wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf,
arg->size, arg->off);
} catch(const ChunkStorageException& err) {
GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
wrote = -(err.code().value());
} catch(const ::exception& err) {
GKFS_DATA->spdlogger()->error(
"{}() Unexpected error writing chunk {} of file {}", __func__,
arg->chnk_id, path);
wrote = -EIO;
}
ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote));
}
void
ChunkWriteOperation::clear_task_args() {
task_args_.clear();
}
ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n)
: ChunkOperation{path, n} {
task_args_.resize(n);
}
void
ChunkWriteOperation::write_nonblock(size_t idx, const uint64_t chunk_id,
const char* bulk_buf_ptr, const size_t size,
const off64_t offset) {
assert(idx < task_args_.size());
GKFS_DATA->spdlogger()->trace(
"ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
__func__, idx, path_, size, offset);
// sizeof(ssize_t) comes from pwrite's return type
auto abt_err = ABT_eventual_create(
sizeof(ssize_t),
&task_eventuals_[idx]); // written file return value
if(abt_err != ABT_SUCCESS) {
auto err_str = fmt::format(
"ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'",
__func__, abt_err);
throw ChunkWriteOpException(err_str);
}
auto& task_arg = task_args_[idx];
task_arg.path = &path_;
task_arg.buf = bulk_buf_ptr;
task_arg.chnk_id = chunk_id;
task_arg.size = size;
task_arg.off = offset;
task_arg.eventual = task_eventuals_[idx];
abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt,
&task_args_[idx], &abt_tasks_[idx]);
if(abt_err != ABT_SUCCESS) {
auto err_str = fmt::format(
"ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'",
__func__, abt_err);
throw ChunkWriteOpException(err_str);
}
}
pair<int, size_t>
ChunkWriteOperation::wait_for_tasks() {
GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'",
__func__, path_);
size_t total_written = 0;
int io_err = 0;
/*
* gather all Eventual's information. do not throw here to properly cleanup
* all eventuals On error, cleanup eventuals and set written data to 0 as
* written data is corrupted
*/
for(auto& e : task_eventuals_) {
ssize_t* task_size = nullptr;
auto abt_err = ABT_eventual_wait(e, (void**) &task_size);
if(abt_err != ABT_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"ChunkWriteOperation::{}() Error when waiting on ABT eventual",
__func__);
io_err = EIO;
ABT_eventual_free(&e);
continue;
}
if(io_err != 0) {
ABT_eventual_free(&e);
continue;
}
assert(task_size != nullptr);
if(*task_size < 0) {
io_err = -(*task_size);
} else {
total_written += *task_size;
}
ABT_eventual_free(&e);
}
// in case of error set written size to zero as data would be corrupted
if(io_err != 0)
total_written = 0;
return make_pair(io_err, total_written);
}
/* ------------------------------------------------------------------------
* -------------------------- READ ----------------------------------------
* ------------------------------------------------------------------------*/
void
ChunkReadOperation::read_file_abt(void* _arg) {
assert(_arg);
// unpack args
auto* arg = static_cast<struct chunk_read_args*>(_arg);
const string& path = *(arg->path);
ssize_t read = 0;
try {
// Under expected circumstances (error or no error) read_chunk will
// signal the eventual
read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf,
arg->size, arg->off);
} catch(const ChunkStorageException& err) {
GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
read = -(err.code().value());
} catch(const ::exception& err) {
GKFS_DATA->spdlogger()->error(
"{}() Unexpected error reading chunk {} of file {}", __func__,
arg->chnk_id, path);
read = -EIO;
}
ABT_eventual_set(arg->eventual, &read, sizeof(read));
}
void
ChunkReadOperation::clear_task_args() {
task_args_.clear();
}
ChunkReadOperation::ChunkReadOperation(const string& path, size_t n)
: ChunkOperation{path, n} {
task_args_.resize(n);
}
void
ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id,
char* bulk_buf_ptr, const size_t size,
const off64_t offset) {
assert(idx < task_args_.size());
GKFS_DATA->spdlogger()->trace(
"ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
__func__, idx, path_, size, offset);
// sizeof(ssize_t) comes from pread's return type
auto abt_err = ABT_eventual_create(
sizeof(ssize_t), &task_eventuals_[idx]); // read file return value
if(abt_err != ABT_SUCCESS) {
auto err_str = fmt::format(
"ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'",
__func__, abt_err);
throw ChunkReadOpException(err_str);
}
auto& task_arg = task_args_[idx];
task_arg.path = &path_;
task_arg.buf = bulk_buf_ptr;
task_arg.chnk_id = chunk_id;
task_arg.size = size;
task_arg.off = offset;
task_arg.eventual = task_eventuals_[idx];
abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt,
&task_args_[idx], &abt_tasks_[idx]);
if(abt_err != ABT_SUCCESS) {
auto err_str = fmt::format(
"ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'",
__func__, abt_err);
throw ChunkReadOpException(err_str);
}
}
pair<int, size_t>
ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) {
GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'",
__func__, path_);
assert(args.chunk_ids->size() == task_args_.size());
size_t total_read = 0;
int io_err = 0;
/*
* gather all Eventual's information. do not throw here to properly cleanup
* all eventuals As soon as an error is encountered, bulk_transfers will no
* longer be executed as the data would be corrupted The loop continues
* until all eventuals have been cleaned and freed.
*/
for(uint64_t idx = 0; idx < task_args_.size(); idx++) {
ssize_t* task_size = nullptr;
auto abt_err =
ABT_eventual_wait(task_eventuals_[idx], (void**) &task_size);
if(abt_err != ABT_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"ChunkReadOperation::{}() Error when waiting on ABT eventual",
__func__);
io_err = EIO;
ABT_eventual_free(&task_eventuals_[idx]);
continue;
}
// error occured. stop processing but clean up
if(io_err != 0) {
ABT_eventual_free(&task_eventuals_[idx]);
continue;
}
assert(task_size != nullptr);
if(*task_size < 0) {
// sparse regions do not have chunk files and are therefore skipped
if(-(*task_size) == ENOENT) {
ABT_eventual_free(&task_eventuals_[idx]);
continue;
}
io_err = -(*task_size); // make error code > 0
} else if(*task_size == 0) {
// read size of 0 is not an error and can happen because reading the
// end-of-file
ABT_eventual_free(&task_eventuals_[idx]);
continue;
} else {
// successful case, push read data back to client
GKFS_DATA->spdlogger()->trace(
"ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'",
__func__, path_, args.chunk_ids->at(idx),
args.origin_offsets->at(idx), args.local_offsets->at(idx),
*task_size);
assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx));
auto margo_err = margo_bulk_transfer(
args.mid, HG_BULK_PUSH, args.origin_addr,
args.origin_bulk_handle, args.origin_offsets->at(idx),
args.local_bulk_handle, args.local_offsets->at(idx),
*task_size);
if(margo_err != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'",
__func__, margo_err);
io_err = EBUSY;
continue;
}
total_read += *task_size;
}
ABT_eventual_free(&task_eventuals_[idx]);
}
// in case of error set read size to zero as data would be corrupted
if(io_err != 0)
total_read = 0;
return make_pair(io_err, total_read);
}
} // namespace gkfs::data