Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#ifndef GEKKOFS_DAEMON_DATA_HPP
#define GEKKOFS_DAEMON_DATA_HPP
#include <daemon/daemon.hpp>
#include <global/global_defs.hpp>
#include <string>
#include <vector>
extern "C" {
#include <abt.h>
#include <margo.h>
}
namespace gkfs {
namespace data {
class ChunkOpException : public std::runtime_error {
public:
explicit ChunkOpException(const std::string& s) : std::runtime_error(s) {};
};
class ChunkWriteOpException : public ChunkOpException {
public:
explicit ChunkWriteOpException(const std::string& s) : ChunkOpException(s) {};
};
class ChunkReadOpException : public ChunkOpException {
public:
explicit ChunkReadOpException(const std::string& s) : ChunkOpException(s) {};
};
class ChunkMetaOpException : public ChunkOpException {
public:
explicit ChunkMetaOpException(const std::string& s) : ChunkOpException(s) {};
};
/**
* Classes to encapsulate asynchronous chunk operations.
* All operations on chunk files must go through the Argobots' task queues.
* Otherwise operations may overtake operations in the queues.
* This applies to write, read, and truncate which may modify the middle of a chunk, essentially a write operation.
*
* Note: This class is not thread-safe.
*
* In the future, this class may be used to provide failure tolerance for IO tasks
*
* Base class using the CRTP idiom
template<class OperationType>
class ChunkOperation {
protected:
const std::string path_;
std::vector<ABT_task> abt_tasks_;
std::vector<ABT_eventual> task_eventuals_;
explicit ChunkOperation(const std::string& path) : ChunkOperation(path, 1) {};
ChunkOperation(std::string path, size_t n) : path_(std::move(path)) {
// Knowing n beforehand is important and cannot be dynamic. Otherwise eventuals cause seg faults
abt_tasks_.resize(n);
task_eventuals_.resize(n);
};
~ChunkOperation() {
cancel_all_tasks();
}
/**
* Cleans up and cancels all tasks in flight
*/
void cancel_all_tasks() {
GKFS_DATA->spdlogger()->trace("{}() enter", __func__);
for (auto& task : abt_tasks_) {
if (task) {
ABT_task_cancel(task);
ABT_task_free(&task);
}
}
for (auto& eventual : task_eventuals_) {
if (eventual) {
ABT_eventual_reset(eventual);
ABT_eventual_free(&eventual);
}
}
abt_tasks_.clear();
task_eventuals_.clear();
static_cast<OperationType*>(this)->clear_task_args();
}
class ChunkTruncateOperation : public ChunkOperation<ChunkTruncateOperation> {
friend class ChunkOperation<ChunkTruncateOperation>;
private:
struct chunk_truncate_args {
const std::string* path;
size_t size;
ABT_eventual eventual;
};
struct chunk_truncate_args task_arg_{};
static void truncate_abt(void* _arg);
void clear_task_args();
explicit ChunkTruncateOperation(const std::string& path);
~ChunkTruncateOperation() = default;
void truncate(size_t size);
class ChunkWriteOperation : public ChunkOperation<ChunkWriteOperation> {
friend class ChunkOperation<ChunkWriteOperation>;
private:
struct chunk_write_args {
const std::string* path;
const char* buf;
gkfs::rpc::chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
std::vector<struct chunk_write_args> task_args_;
static void write_file_abt(void* _arg);
void clear_task_args();
ChunkWriteOperation(const std::string& path, size_t n);
~ChunkWriteOperation() = default;
void write_nonblock(size_t idx, uint64_t chunk_id, const char* bulk_buf_ptr, size_t size, off64_t offset);
std::pair<int, size_t> wait_for_tasks();
};
class ChunkReadOperation : public ChunkOperation<ChunkReadOperation> {
friend class ChunkOperation<ChunkReadOperation>;
private:
struct chunk_read_args {
const std::string* path;
char* buf;
gkfs::rpc::chnk_id_t chnk_id;
size_t size;
off64_t off;
ABT_eventual eventual;
};
std::vector<struct chunk_read_args> task_args_;
static void read_file_abt(void* _arg);
void clear_task_args();
public:
struct bulk_args {
margo_instance_id mid;
hg_addr_t origin_addr;
hg_bulk_t origin_bulk_handle;
std::vector<size_t>* origin_offsets;
hg_bulk_t local_bulk_handle;
std::vector<size_t>* local_offsets;
std::vector<uint64_t>* chunk_ids;
};
ChunkReadOperation(const std::string& path, size_t n);
~ChunkReadOperation() = default;
void read_nonblock(size_t idx, uint64_t chunk_id, char* bulk_buf_ptr, size_t size, off64_t offset);