Program Listing for File data.cpp

Return to documentation for file (src/daemon/ops/data.cpp)

/*
  Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This software was partially supported by the
  the European Union’s Horizon 2020 JTI-EuroHPC research and
  innovation programme, by the project ADMIRE (Project ID: 956748,
  admire-eurohpc.eu)

  This project was partially promoted by the Ministry for Digital Transformation
  and the Civil Service, within the framework of the Recovery,
  Transformation and Resilience Plan - Funded by the European Union
  -NextGenerationEU.

  This file is part of GekkoFS.

  GekkoFS is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  GekkoFS is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: GPL-3.0-or-later
*/
#include <daemon/ops/data.hpp>
#include <daemon/backend/data/chunk_storage.hpp>
#include <common/arithmetic/arithmetic.hpp>
#include <utility>

extern "C" {
#include <mercury_types.h>
}

using namespace std;

namespace gkfs::data {

/* ------------------------------------------------------------------------
 * -------------------------- TRUNCATE ------------------------------------
 * ------------------------------------------------------------------------*/


void
ChunkTruncateOperation::truncate_abt(void* _arg) {

    // import pow2-optimized arithmetic functions
    using namespace gkfs::utils::arithmetic;

    assert(_arg);
    // Unpack args
    auto* arg = static_cast<struct chunk_truncate_args*>(_arg);
    const string& path = *(arg->path);
    const size_t size = arg->size;
    int err_response = 0;
    try {
        // get chunk from where to cut off
        auto chunk_id_start = block_index(size, gkfs::config::rpc::chunksize);
        // do not last delete chunk if it is in the middle of a chunk
        auto left_pad = block_overrun(size, gkfs::config::rpc::chunksize);
        if(left_pad != 0) {
            GKFS_DATA->storage()->truncate_chunk_file(path, chunk_id_start,
                                                      left_pad);
            chunk_id_start++;
        }
        GKFS_DATA->storage()->trim_chunk_space(path, chunk_id_start);
    } catch(const ChunkStorageException& err) {
        GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
        err_response = err.code().value();
    } catch(const ::exception& err) {
        GKFS_DATA->spdlogger()->error(
                "{}() Unexpected error truncating file '{}' to length '{}'",
                __func__, path, size);
        err_response = EIO;
    }
    ABT_eventual_set(arg->eventual, &err_response, sizeof(err_response));
}

void
ChunkTruncateOperation::clear_task_args() {
    task_arg_ = {};
}

ChunkTruncateOperation::ChunkTruncateOperation(const string& path)
    : ChunkOperation{path, 1} {}

void
ChunkTruncateOperation::truncate(size_t size) {
    assert(!task_eventuals_[0]);
    GKFS_DATA->spdlogger()->trace(
            "ChunkTruncateOperation::{}() enter: path '{}' size '{}'", __func__,
            path_, size);
    // sizeof(int) comes from truncate's return type
    auto abt_err = ABT_eventual_create(
            sizeof(int), &task_eventuals_[0]); // truncate file return value
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "ChunkTruncateOperation::{}() Failed to create ABT eventual with abt_err '{}'",
                __func__, abt_err);
        throw ChunkMetaOpException(err_str);
    }

    auto& task_arg = task_arg_;
    task_arg.path = &path_;
    task_arg.size = size;
    task_arg.eventual = task_eventuals_[0];

    abt_err = ABT_task_create(RPC_DATA->io_pool(), truncate_abt, &task_arg_,
                              &abt_tasks_[0]);
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "ChunkTruncateOperation::{}() Failed to create ABT task with abt_err '{}'",
                __func__, abt_err);
        throw ChunkMetaOpException(err_str);
    }
}


int
ChunkTruncateOperation::wait_for_task() {
    GKFS_DATA->spdlogger()->trace(
            "ChunkTruncateOperation::{}() enter: path '{}'", __func__, path_);
    int trunc_err = 0;

    int* task_err = nullptr;
    auto abt_err = ABT_eventual_wait(task_eventuals_[0], (void**) &task_err);
    if(abt_err != ABT_SUCCESS) {
        GKFS_DATA->spdlogger()->error(
                "ChunkTruncateOperation::{}() Error when waiting on ABT eventual",
                __func__);
        ABT_eventual_free(&task_eventuals_[0]);
        return EIO;
    }
    assert(task_err != nullptr);
    if(*task_err != 0) {
        trunc_err = *task_err;
    }
    ABT_eventual_free(&task_eventuals_[0]);
    return trunc_err;
}

/* ------------------------------------------------------------------------
 * ----------------------------- WRITE ------------------------------------
 * ------------------------------------------------------------------------*/

void
ChunkWriteOperation::write_file_abt(void* _arg) {
    assert(_arg);
    // Unpack args
    auto* arg = static_cast<struct chunk_write_args*>(_arg);
    const string& path = *(arg->path);
    ssize_t wrote{0};
    try {
        wrote = GKFS_DATA->storage()->write_chunk(path, arg->chnk_id, arg->buf,
                                                  arg->size, arg->off);
    } catch(const ChunkStorageException& err) {
        GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
        wrote = -(err.code().value());
    } catch(const ::exception& err) {
        GKFS_DATA->spdlogger()->error(
                "{}() Unexpected error writing chunk {} of file {}", __func__,
                arg->chnk_id, path);
        wrote = -EIO;
    }
    ABT_eventual_set(arg->eventual, &wrote, sizeof(wrote));
}

void
ChunkWriteOperation::clear_task_args() {
    task_args_.clear();
}

ChunkWriteOperation::ChunkWriteOperation(const string& path, size_t n)
    : ChunkOperation{path, n} {
    task_args_.resize(n);
}

void
ChunkWriteOperation::write_nonblock(size_t idx, const uint64_t chunk_id,
                                    const char* bulk_buf_ptr, const size_t size,
                                    const off64_t offset) {
    assert(idx < task_args_.size());
    GKFS_DATA->spdlogger()->trace(
            "ChunkWriteOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
            __func__, idx, path_, size, offset);
    // sizeof(ssize_t) comes from pwrite's return type
    auto abt_err = ABT_eventual_create(
            sizeof(ssize_t),
            &task_eventuals_[idx]); // written file return value
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "ChunkWriteOperation::{}() Failed to create ABT eventual with abt_err '{}'",
                __func__, abt_err);
        throw ChunkWriteOpException(err_str);
    }

    auto& task_arg = task_args_[idx];
    task_arg.path = &path_;
    task_arg.buf = bulk_buf_ptr;
    task_arg.chnk_id = chunk_id;
    task_arg.size = size;
    task_arg.off = offset;
    task_arg.eventual = task_eventuals_[idx];

    abt_err = ABT_task_create(RPC_DATA->io_pool(), write_file_abt,
                              &task_args_[idx], &abt_tasks_[idx]);
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "ChunkWriteOperation::{}() Failed to create ABT task with abt_err '{}'",
                __func__, abt_err);
        throw ChunkWriteOpException(err_str);
    }
}

pair<int, size_t>
ChunkWriteOperation::wait_for_tasks() {
    GKFS_DATA->spdlogger()->trace("ChunkWriteOperation::{}() enter: path '{}'",
                                  __func__, path_);
    size_t total_written = 0;
    int io_err = 0;
    /*
     * gather all Eventual's information. do not throw here to properly cleanup
     * all eventuals On error, cleanup eventuals and set written data to 0 as
     * written data is corrupted
     */
    for(auto& e : task_eventuals_) {
        ssize_t* task_size = nullptr;
        auto abt_err = ABT_eventual_wait(e, (void**) &task_size);
        if(abt_err != ABT_SUCCESS) {
            GKFS_DATA->spdlogger()->error(
                    "ChunkWriteOperation::{}() Error when waiting on ABT eventual",
                    __func__);
            io_err = EIO;
            ABT_eventual_free(&e);
            continue;
        }
        if(io_err != 0) {
            ABT_eventual_free(&e);
            continue;
        }
        assert(task_size != nullptr);
        if(*task_size < 0) {
            io_err = -(*task_size);
        } else {
            total_written += *task_size;
        }
        ABT_eventual_free(&e);
    }
    // in case of error set written size to zero as data would be corrupted
    if(io_err != 0)
        total_written = 0;
    return make_pair(io_err, total_written);
}

/* ------------------------------------------------------------------------
 * -------------------------- READ ----------------------------------------
 * ------------------------------------------------------------------------*/

void
ChunkReadOperation::read_file_abt(void* _arg) {
    assert(_arg);
    // unpack args
    auto* arg = static_cast<struct chunk_read_args*>(_arg);
    const string& path = *(arg->path);
    ssize_t read = 0;
    try {
        // Under expected circumstances (error or no error) read_chunk will
        // signal the eventual
        read = GKFS_DATA->storage()->read_chunk(path, arg->chnk_id, arg->buf,
                                                arg->size, arg->off);
    } catch(const ChunkStorageException& err) {
        GKFS_DATA->spdlogger()->error("{}() {}", __func__, err.what());
        read = -(err.code().value());
    } catch(const ::exception& err) {
        GKFS_DATA->spdlogger()->error(
                "{}() Unexpected error reading chunk {} of file {}", __func__,
                arg->chnk_id, path);
        read = -EIO;
    }
    ABT_eventual_set(arg->eventual, &read, sizeof(read));
}

void
ChunkReadOperation::clear_task_args() {
    task_args_.clear();
}

ChunkReadOperation::ChunkReadOperation(const string& path, size_t n)
    : ChunkOperation{path, n} {
    task_args_.resize(n);
}

void
ChunkReadOperation::read_nonblock(size_t idx, const uint64_t chunk_id,
                                  char* bulk_buf_ptr, const size_t size,
                                  const off64_t offset) {
    assert(idx < task_args_.size());
    GKFS_DATA->spdlogger()->trace(
            "ChunkReadOperation::{}() enter: idx '{}' path '{}' size '{}' offset '{}'",
            __func__, idx, path_, size, offset);
    // sizeof(ssize_t) comes from pread's return type
    auto abt_err = ABT_eventual_create(
            sizeof(ssize_t), &task_eventuals_[idx]); // read file return value
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "ChunkReadOperation::{}() Failed to create ABT eventual with abt_err '{}'",
                __func__, abt_err);
        throw ChunkReadOpException(err_str);
    }

    auto& task_arg = task_args_[idx];
    task_arg.path = &path_;
    task_arg.buf = bulk_buf_ptr;
    task_arg.chnk_id = chunk_id;
    task_arg.size = size;
    task_arg.off = offset;
    task_arg.eventual = task_eventuals_[idx];
    task_arg.bulk_transfer_done = false;

    abt_err = ABT_task_create(RPC_DATA->io_pool(), read_file_abt,
                              &task_args_[idx], &abt_tasks_[idx]);
    if(abt_err != ABT_SUCCESS) {
        auto err_str = fmt::format(
                "ChunkReadOperation::{}() Failed to create ABT task with abt_err '{}'",
                __func__, abt_err);
        throw ChunkReadOpException(err_str);
    }
}

pair<int, size_t>
ChunkReadOperation::wait_for_tasks_and_push_back(const bulk_args& args) {
    GKFS_DATA->spdlogger()->trace("ChunkReadOperation::{}() enter: path '{}'",
                                  __func__, path_);
    assert(args.chunk_ids->size() == task_args_.size());
    size_t total_read = 0;
    int io_err = 0;

    /*
     * gather all Eventual's information. do not throw here to properly cleanup
     * all eventuals As soon as an error is encountered, bulk_transfers will no
     * longer be executed as the data would be corrupted The loop continues
     * until all eventuals have been cleaned and freed.
     */
    // TODO refactor both if/else. They have redundant code.
    if(gkfs::config::io::spin_lock_read) {
        uint64_t bulk_transfer_cnt = 0;
        do {
            for(uint64_t idx = 0; idx < task_args_.size(); idx++) {
                if(task_args_[idx].bulk_transfer_done)
                    continue;
                ssize_t* task_size = nullptr;
                int is_ready = 0;
                auto abt_err = ABT_eventual_test(
                        task_eventuals_[idx], (void**) &task_size, &is_ready);
                if(abt_err != ABT_SUCCESS) {
                    GKFS_DATA->spdlogger()->error(
                            "ChunkReadOperation::{}() Error when testing on ABT eventual",
                            __func__);
                    io_err = EIO;
                    bulk_transfer_cnt = task_args_.size();
                    ABT_eventual_free(&task_eventuals_[idx]);
                    continue;
                }
                // not ready yet, try next
                if(is_ready == ABT_FALSE)
                    continue;
                // error occured. stop processing but clean up
                if(io_err != 0) {
                    task_args_[idx].bulk_transfer_done = true;
                    bulk_transfer_cnt++;
                    ABT_eventual_free(&task_eventuals_[idx]);
                    continue;
                }
                assert(task_size != nullptr);
                if(*task_size < 0) {
                    // sparse regions do not have chunk files and are therefore
                    // skipped
                    if(-(*task_size) == ENOENT) {
                        task_args_[idx].bulk_transfer_done = true;
                        bulk_transfer_cnt++;
                        ABT_eventual_free(&task_eventuals_[idx]);
                        continue;
                    }
                    io_err = -(*task_size); // make error code > 0
                } else if(*task_size == 0) {
                    // read size of 0 is not an error and can happen because
                    // reading the end-of-file
                    task_args_[idx].bulk_transfer_done = true;
                    bulk_transfer_cnt++;
                    ABT_eventual_free(&task_eventuals_[idx]);
                    continue;
                } else {
                    // successful case, push read data back to client
                    GKFS_DATA->spdlogger()->trace(
                            "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'",
                            __func__, path_, args.chunk_ids->at(idx),
                            args.origin_offsets->at(idx),
                            args.local_offsets->at(idx), *task_size);
                    assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx));
                    auto margo_err = margo_bulk_transfer(
                            args.mid, HG_BULK_PUSH, args.origin_addr,
                            args.origin_bulk_handle,
                            args.origin_offsets->at(idx),
                            args.local_bulk_handle, args.local_offsets->at(idx),
                            *task_size);
                    if(margo_err != HG_SUCCESS) {
                        GKFS_DATA->spdlogger()->error(
                                "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'",
                                __func__, margo_err);
                        io_err = EBUSY;
                        continue;
                    }
                    total_read += *task_size;
                }
                task_args_[idx].bulk_transfer_done = true;
                bulk_transfer_cnt++;
                ABT_eventual_free(&task_eventuals_[idx]);
            }
        } while(bulk_transfer_cnt != task_args_.size());
    } else {
        for(uint64_t idx = 0; idx < task_args_.size(); idx++) {
            ssize_t* task_size = nullptr;
            auto abt_err = ABT_eventual_wait(task_eventuals_[idx],
                                             (void**) &task_size);
            if(abt_err != ABT_SUCCESS) {
                GKFS_DATA->spdlogger()->error(
                        "ChunkReadOperation::{}() Error when waiting on ABT eventual",
                        __func__);
                io_err = EIO;
                ABT_eventual_free(&task_eventuals_[idx]);
                continue;
            }
            // error occured. stop processing but clean up
            if(io_err != 0) {
                ABT_eventual_free(&task_eventuals_[idx]);
                continue;
            }
            assert(task_size != nullptr);
            if(*task_size < 0) {
                // sparse regions do not have chunk files and are therefore
                // skipped
                if(-(*task_size) == ENOENT) {
                    ABT_eventual_free(&task_eventuals_[idx]);
                    continue;
                }
                io_err = -(*task_size); // make error code > 0
            } else if(*task_size == 0) {
                // read size of 0 is not an error and can happen because reading
                // the end-of-file
                ABT_eventual_free(&task_eventuals_[idx]);
                continue;
            } else {
                // successful case, push read data back to client
                GKFS_DATA->spdlogger()->trace(
                        "ChunkReadOperation::{}() BULK_TRANSFER_PUSH file '{}' chnkid '{}' origin offset '{}' local offset '{}' transfersize '{}'",
                        __func__, path_, args.chunk_ids->at(idx),
                        args.origin_offsets->at(idx),
                        args.local_offsets->at(idx), *task_size);
                assert(task_args_[idx].chnk_id == args.chunk_ids->at(idx));
                auto margo_err = margo_bulk_transfer(
                        args.mid, HG_BULK_PUSH, args.origin_addr,
                        args.origin_bulk_handle, args.origin_offsets->at(idx),
                        args.local_bulk_handle, args.local_offsets->at(idx),
                        *task_size);
                if(margo_err != HG_SUCCESS) {
                    GKFS_DATA->spdlogger()->error(
                            "ChunkReadOperation::{}() Failed to margo_bulk_transfer with margo err: '{}'",
                            __func__, margo_err);
                    io_err = EBUSY;
                    continue;
                }
                total_read += *task_size;
            }
            ABT_eventual_free(&task_eventuals_[idx]);
        }
    }


    // in case of error set read size to zero as data would be corrupted
    if(io_err != 0)
        total_read = 0;
    return make_pair(io_err, total_read);
}

} // namespace gkfs::data