/*
Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS.
GekkoFS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
GekkoFS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with GekkoFS. If not, see .
SPDX-License-Identifier: GPL-3.0-or-later
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
extern "C" {
#include
}
std::recursive_mutex parallax_mutex_;
namespace gkfs::metadata {
/**
* @brief Destroy the Kreon Backend:: Kreon Backend object
* We remove the file, too large for the CI.
* TODO: Insert option
*/
ParallaxBackend::~ParallaxBackend() {
par_close(par_db_);
}
/**
* Called when the daemon is started: Connects to the KV store
* @param path where KV store data is stored
*/
ParallaxBackend::ParallaxBackend(const std::string& path)
: par_path_(std::move(path)) {
// We try to open options.yml if it exists, if not we create it by default
int options = open("options.yml", O_RDWR | O_CREAT, 0644);
int64_t sizeOptions;
sizeOptions = lseek(options, 0, SEEK_END);
if(sizeOptions == 0) {
std::string optcontent =
"level0_size: 64\ngc_interval: 10\ngrowth_factor: 4\nmedium_log_LRU_cache_size: 400\nlevel_medium_inplace: 3\n";
write(options, optcontent.c_str(), optcontent.length());
}
close(options);
int64_t size;
int fd = open(par_path_.c_str(), O_RDWR | O_CREAT, 0644);
if(fd == -1) {
perror("open");
exit(EXIT_FAILURE);
}
// Check size if we want to reuse it
size = lseek(fd, 0, SEEK_END);
if(size == -1) {
printf("[%s:%s:%d] failed to determine volume size exiting...\n",
__FILE__, __func__, __LINE__);
perror("ioctl");
exit(EXIT_FAILURE);
}
if(size == 0) {
size = GKFS_DATA->parallax_size_md();
lseek(fd, size - 1, SEEK_SET);
std::string tmp = "x";
write(fd, tmp.c_str(), 1);
close(fd);
// We format the database TODO this doesn't work kv_format.parallax is
// not in path
std::string cmd = "kv_format.parallax --device " + par_path_ +
" --max_regions_num 1 ";
system(cmd.c_str());
}
par_options_.create_flag = PAR_CREATE_DB;
par_options_.db_name = "test";
par_options_.volume_name = (char*) malloc(par_path_.size() + 1);
strcpy(par_options_.volume_name, par_path_.c_str());
par_options_.volume_start = 0;
par_options_.volume_size = 0;
par_db_ = par_open(&par_options_);
}
/**
* Exception wrapper on Status object. Throws NotFoundException if
* s == "Not Found", general DBException otherwise
* @param String with status
* @throws DBException
*/
void
ParallaxBackend::throw_status_excpt(const std::string& s) {
if(s == "Not Found") {
throw NotFoundException(s);
} else {
throw DBException(s);
}
}
/**
* Convert a String to klc_key
* @param key
* @param par_key struct
*/
inline void
ParallaxBackend::str2par(const std::string& key, struct par_key& K) const {
K.size = key.size();
K.data = key.c_str();
}
/**
* Convert a String to klc_value
* @param value
* @param par_value struct
*/
inline void
ParallaxBackend::str2par(const std::string& value, struct par_value& V) const {
V.val_size = value.size() + 1;
V.val_buffer = (char*) value.c_str();
}
/**
* Gets a KV store value for a key
* @param key
* @return value
* @throws DBException on failure, NotFoundException if entry doesn't exist
*/
std::string
ParallaxBackend::get_impl(const std::string& key) const {
std::string val;
struct par_key K;
struct par_value V;
V.val_buffer = NULL;
str2par(key, K);
par_ret_code ret = par_get(par_db_, &K, &V);
if(ret != PAR_SUCCESS) {
throw_status_excpt("Not Found");
} else {
val = V.val_buffer;
free(V.val_buffer);
}
return val;
}
/**
* Puts an entry into the KV store
* @param key
* @param val
* @throws DBException on failure
*/
void
ParallaxBackend::put_impl(const std::string& key, const std::string& val) {
struct par_key_value key_value;
str2par(key, key_value.k);
str2par(val, key_value.v);
par_ret_code ret = par_put(par_db_, &key_value);
if(ret != PAR_SUCCESS) {
throw_status_excpt("Not Found");
}
}
/**
* Puts an entry into the KV store if it doesn't exist. This function does not
* use a mutex.
* @param key
* @param val
* @throws DBException on failure, ExistException if entry already exists
*/
void
ParallaxBackend::put_no_exist_impl(const std::string& key,
const std::string& val) {
struct par_key_value key_value;
str2par(key, key_value.k);
str2par(val, key_value.v);
par_ret_code ret = par_exists(par_db_, &key_value.k);
if(ret == PAR_KEY_NOT_FOUND) {
par_put(par_db_, &key_value);
} else
throw ExistsException(key);
}
/**
* Removes an entry from the KV store
* @param key
* @throws DBException on failure, NotFoundException if entry doesn't exist
*/
void
ParallaxBackend::remove_impl(const std::string& key) {
struct par_key k;
str2par(key, k);
par_ret_code ret = par_delete(par_db_, &k);
if(ret != PAR_SUCCESS) {
throw_status_excpt("Not Found");
}
}
/**
* checks for existence of an entry
* @param key
* @return true if exists
* @throws DBException on failure
*/
bool
ParallaxBackend::exists_impl(const std::string& key) {
struct par_key k;
str2par(key, k);
par_ret_code ret = par_exists(par_db_, &k);
if(ret == PAR_KEY_NOT_FOUND) {
return true;
}
return false; // TODO it is not the only case, we can have errors
}
/**
* Updates a metadentry atomically and also allows to change keys
* @param old_key
* @param new_key
* @param val
* @throws DBException on failure, NotFoundException if entry doesn't exist
*/
void
ParallaxBackend::update_impl(const std::string& old_key,
const std::string& new_key,
const std::string& val) {
// TODO: Check Parallax transactions/Batches
struct par_key_value n_key_value;
struct par_key o_key;
str2par(new_key, n_key_value.k);
str2par(val, n_key_value.v);
str2par(old_key, o_key);
par_delete(par_db_, &o_key);
par_ret_code ret = par_put(par_db_, &n_key_value);
if(ret != PAR_SUCCESS) {
throw_status_excpt("Not Found");
}
}
/**
* Updates the size on the metadata
* Operation. E.g., called before a write() call
* @param key
* @param size
* @param append
* @throws DBException on failure
*/
void
ParallaxBackend::increase_size_impl(const std::string& key, size_t size,
bool append) {
lock_guard lock_guard(parallax_mutex_);
auto value = get(key);
// Decompress string
Metadata md(value);
if(append)
size += md.size();
md.size(size);
update(key, key, md.serialize());
}
/**
* Decreases the size on the metadata
* Operation E.g., called before a truncate() call
* @param key
* @param size
* @throws DBException on failure
*/
void
ParallaxBackend::decrease_size_impl(const std::string& key, size_t size) {
lock_guard lock_guard(parallax_mutex_);
auto value = get(key);
// Decompress string
Metadata md(value);
md.size(size);
update(key, key, md.serialize());
}
/**
* Return all the first-level entries of the directory @dir
*
* @return vector of pair ,
* where name is the name of the entries and is_dir
* is true in the case the entry is a directory.
*/
std::vector>
ParallaxBackend::get_dirents_impl(const std::string& dir) const {
auto root_path = dir;
struct par_key K;
str2par(root_path, K);
par_scanner S = par_init_scanner(par_db_, &K, PAR_GREATER_OR_EQUAL);
std::vector> entries;
while(par_is_valid(S)) {
struct par_key K2 = par_get_key(S);
struct par_value value = par_get_value(S);
std::string k(K2.data, K2.size);
std::string v(value.val_buffer, value.val_size);
if(k.size() < root_path.size() ||
k.substr(0, root_path.size()) != root_path) {
break;
}
if(k.size() == root_path.size()) {
par_get_next(S);
continue;
}
/***** Get File name *****/
auto name = k;
if(name.find_first_of('/', root_path.size()) != std::string::npos) {
// skip stuff deeper then one level depth
par_get_next(S);
continue;
}
// remove prefix
name = name.substr(root_path.size());
// relative path of directory entries must not be empty
assert(!name.empty());
Metadata md(v);
auto is_dir = S_ISDIR(md.mode());
entries.emplace_back(std::move(name), is_dir);
par_get_next(S);
}
// If we don't close the scanner we cannot delete keys
par_close_scanner(S);
return entries;
}
/**
* Return all the first-level entries of the directory @dir
*
* @return vector of pair ,
* where name is the name of the entries and is_dir
* is true in the case the entry is a directory.
*/
std::vector>
ParallaxBackend::get_dirents_extended_impl(const std::string& dir) const {
auto root_path = dir;
// assert(gkfs::path::is_absolute(root_path));
// add trailing slash if missing
if(!gkfs::path::has_trailing_slash(root_path) && root_path.size() != 1) {
// add trailing slash only if missing and is not the root_folder "/"
root_path.push_back('/');
}
struct par_key K;
str2par(root_path, K);
par_scanner S = par_init_scanner(par_db_, &K, PAR_GREATER_OR_EQUAL);
std::vector> entries;
while(par_is_valid(S)) {
struct par_key K2 = par_get_key(S);
struct par_value value = par_get_value(S);
std::string k(K2.data, K2.size);
std::string v(value.val_buffer, value.val_size);
if(k.size() < root_path.size() ||
k.substr(0, root_path.size()) != root_path) {
break;
}
if(k.size() == root_path.size()) {
if(par_get_next(S) && !par_is_valid(S))
break;
continue;
}
/***** Get File name *****/
auto name = k;
if(name.find_first_of('/', root_path.size()) != std::string::npos) {
// skip stuff deeper then one level depth
if(par_get_next(S) && !par_is_valid(S))
break;
continue;
}
// remove prefix
name = name.substr(root_path.size());
// relative path of directory entries must not be empty
assert(!name.empty());
Metadata md(v);
auto is_dir = S_ISDIR(md.mode());
entries.emplace_back(std::forward_as_tuple(std::move(name), is_dir,
md.size(), md.ctime()));
if(par_get_next(S) && !par_is_valid(S))
break;
}
// If we don't close the scanner we cannot delete keys
par_close_scanner(S);
return entries;
}
/**
* Code example for iterating all entries in KV store. This is for debug only as
* it is too expensive
*/
void
ParallaxBackend::iterate_all_impl() const {}
} // namespace gkfs::metadata