Program Listing for File data.hpp
↰ Return to documentation for file (include/daemon/ops/data.hpp
)
/*
Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS.
GekkoFS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
GekkoFS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
SPDX-License-Identifier: GPL-3.0-or-later
*/
#ifndef GEKKOFS_DAEMON_DATA_HPP
#define GEKKOFS_DAEMON_DATA_HPP
#include <daemon/daemon.hpp>
#include <common/common_defs.hpp>
#include <string>
#include <vector>
extern "C" {
#include <abt.h>
#include <margo.h>
}
namespace gkfs::data {
class ChunkOpException : public std::runtime_error {
public:
explicit ChunkOpException(const std::string& s) : std::runtime_error(s){};
};
class ChunkWriteOpException : public ChunkOpException {
public:
explicit ChunkWriteOpException(const std::string& s)
: ChunkOpException(s){};
};
class ChunkReadOpException : public ChunkOpException {
public:
explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s){};
};
class ChunkMetaOpException : public ChunkOpException {
public:
explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s){};
};
template <class OperationType>
class ChunkOperation {
protected:
const std::string path_;
std::vector<ABT_task> abt_tasks_;
std::vector<ABT_eventual>
task_eventuals_;
public:
explicit ChunkOperation(const std::string& path)
: ChunkOperation(path, 1){};
ChunkOperation(std::string path, size_t n) : path_(std::move(path)) {
// Knowing n beforehand is important and cannot be dynamic. Otherwise
// eventuals cause seg faults
abt_tasks_.resize(n);
task_eventuals_.resize(n);
};
~ChunkOperation() {
cancel_all_tasks();
}
void
cancel_all_tasks() {
GKFS_DATA->spdlogger()->trace("{}() enter", __func__);
for(auto& task : abt_tasks_) {
if(task) {
ABT_task_cancel(task);
ABT_task_free(&task);
}
}
for(auto& eventual : task_eventuals_) {
if(eventual) {
ABT_eventual_reset(eventual);
ABT_eventual_free(&eventual);
}
}
abt_tasks_.clear();
task_eventuals_.clear();
static_cast<OperationType*>(this)->clear_task_args();
}
};
class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> {
friend class ChunkOperation<ChunkTruncateOperation>;
private:
struct chunk_truncate_args {
const std::string* path;
size_t size;
ABT_eventual eventual;
};
struct chunk_truncate_args task_arg_ {};
static void
truncate_abt(void* _arg);
void
clear_task_args();
public:
explicit ChunkTruncateOperation(const std::string& path);
~ChunkTruncateOperation() = default;
void
truncate(size_t size);
int
wait_for_task();
};
class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> {
friend class ChunkOperation<ChunkWriteOperation>;
private:
struct chunk_write_args {
const std::string* path;
const char* buf;
gkfs::rpc::chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
std::vector<struct chunk_write_args> task_args_;
static void
write_file_abt(void* _arg);
void
clear_task_args();
public:
ChunkWriteOperation(const std::string& path, size_t n);
~ChunkWriteOperation() = default;
void
write_nonblock(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr,
size_t size, off64_t offset);
std::pair<int, size_t>
wait_for_tasks();
};
class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> {
friend class ChunkOperation<ChunkReadOperation>;
private:
struct chunk_read_args {
const std::string* path;
char* buf;
gkfs::rpc::chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
std::vector<struct chunk_read_args> task_args_;
static void
read_file_abt(void* _arg);
void
clear_task_args();
public:
struct bulk_args {
margo_instance_id mid;
hg_addr_t origin_addr;
hg_bulk_t origin_bulk_handle;
std::vector<size_t>* origin_offsets;
hg_bulk_t local_bulk_handle;
std::vector<size_t>* local_offsets;
std::vector<uint64_t>* chunk_ids;
};
ChunkReadOperation(const std::string& path, size_t n);
~ChunkReadOperation() = default;
void
read_nonblock(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr,
size_t size, off64_t offset);
std::pair<int, size_t>
wait_for_tasks_and_push_back(const bulk_args& args);
};
} // namespace gkfs::data
#endif // GEKKOFS_DAEMON_DATA_HPP