Newer
Older
Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#include <daemon/backend/metadata/db.hpp>
#include <daemon/backend/metadata/merge.hpp>
#include <daemon/backend/exceptions.hpp>
#include <global/metadata.hpp>
extern "C" {
MetadataDB::MetadataDB(const std::string& path) : path(path) {
// Optimize RocksDB. This is the easiest way to get RocksDB to perform well
options.IncreaseParallelism();
options.OptimizeLevelStyleCompaction();
// create the DB if it's not already present
options.create_if_missing = true;
options.merge_operator.reset(new MetadataMergeOperator);
MetadataDB::optimize_rocksdb_options(options);
write_opts.disableWAL = !(gkfs::config::rocksdb::use_write_ahead_log);
auto s = rocksdb::DB::Open(options, path, &rdb_ptr);
throw std::runtime_error("Failed to open RocksDB: " + s.ToString());
}
this->db.reset(rdb_ptr);
}
void
MetadataDB::throw_rdb_status_excpt(const rdb::Status& s) {
assert(!s.ok());
throw NotFoundException(s.ToString());
} else {
throw DBException(s.ToString());
}
}
std::string
MetadataDB::get(const std::string& key) const {
std::string val;
auto s = db->Get(rdb::ReadOptions(), key, &val);
MetadataDB::throw_rdb_status_excpt(s);
}
return val;
void
MetadataDB::put(const std::string& key, const std::string& val) {
assert(gkfs::path::is_absolute(key));
assert(key == "/" || !gkfs::path::has_trailing_slash(key));
auto cop = CreateOperand(val);
auto s = db->Merge(write_opts, key, cop.serialize());
MetadataDB::throw_rdb_status_excpt(s);
void
MetadataDB::remove(const std::string& key) {
auto s = db->Delete(write_opts, key);
MetadataDB::throw_rdb_status_excpt(s);
}
bool
MetadataDB::exists(const std::string& key) {
std::string val;
auto s = db->Get(rdb::ReadOptions(), key, &val);
if(!s.ok()) {
if(s.IsNotFound()) {
return false;
} else {
MetadataDB::throw_rdb_status_excpt(s);
}
}
return true;
}
/**
* Updates a metadentry atomically and also allows to change keys
* @param old_key
* @param new_key
* @param val
* @return
*/
void
MetadataDB::update(const std::string& old_key, const std::string& new_key,
const std::string& val) {
// TODO use rdb::Put() method
rdb::WriteBatch batch;
batch.Delete(old_key);
batch.Put(new_key, val);
auto s = db->Write(write_opts, &batch);
MetadataDB::throw_rdb_status_excpt(s);
}
void
MetadataDB::increase_size(const std::string& key, size_t size, bool append) {
auto uop = IncreaseSizeOperand(size, append);
auto s = db->Merge(write_opts, key, uop.serialize());
MetadataDB::throw_rdb_status_excpt(s);
void
MetadataDB::decrease_size(const std::string& key, size_t size) {
auto uop = DecreaseSizeOperand(size);
auto s = db->Merge(write_opts, key, uop.serialize());
MetadataDB::throw_rdb_status_excpt(s);
}
}
/**
* Return all the first-level entries of the directory @dir
*
* @return vector of pair <std::string name, bool is_dir>,
* where name is the name of the entries and is_dir
* is true in the case the entry is a directory.
*/
std::vector<std::pair<std::string, bool>>
MetadataDB::get_dirents(const std::string& dir) const {
assert(gkfs::path::is_absolute(root_path));
// add trailing slash if missing
if(!gkfs::path::has_trailing_slash(root_path) && root_path.size() != 1) {
// add trailing slash only if missing and is not the root_folder "/"
root_path.push_back('/');
}
rocksdb::ReadOptions ropts;
auto it = db->NewIterator(ropts);
std::vector<std::pair<std::string, bool>> entries;
for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
it->Next()) {
if(it->key().size() == root_path.size()) {
// we skip this path cause it is exactly the root_path
continue;
}
/***** Get File name *****/
auto name = it->key().ToString();
if(name.find_first_of('/', root_path.size()) != std::string::npos) {
// skip stuff deeper then one level depth
continue;
}
// remove prefix
name = name.substr(root_path.size());
// relative path of directory entries must not be empty
assert(!name.empty());
Metadata md(it->value().ToString());
auto is_dir = S_ISDIR(md.mode());
entries.emplace_back(std::move(name), is_dir);
}
assert(it->status().ok());
return entries;
}
Ramon Nou
committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/**
* Return all the first-level entries of the directory @dir
*
* @return vector of pair <std::string name, bool is_dir - size - ctime>,
* where name is the name of the entries and is_dir
* is true in the case the entry is a directory.
*/
std::vector<std::tuple<std::string, bool, size_t, time_t>>
MetadataDB::get_dirents_extended(const std::string& dir) const {
auto root_path = dir;
assert(gkfs::path::is_absolute(root_path));
// add trailing slash if missing
if(!gkfs::path::has_trailing_slash(root_path) && root_path.size() != 1) {
// add trailing slash only if missing and is not the root_folder "/"
root_path.push_back('/');
}
rocksdb::ReadOptions ropts;
auto it = db->NewIterator(ropts);
std::vector<std::tuple<std::string, bool, size_t, time_t>> entries;
for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
it->Next()) {
if(it->key().size() == root_path.size()) {
// we skip this path cause it is exactly the root_path
continue;
}
/***** Get File name *****/
auto name = it->key().ToString();
if(name.find_first_of('/', root_path.size()) != std::string::npos) {
// skip stuff deeper then one level depth
continue;
}
// remove prefix
name = name.substr(root_path.size());
// relative path of directory entries must not be empty
assert(!name.empty());
Metadata md(it->value().ToString());
auto is_dir = S_ISDIR(md.mode());
entries.emplace_back(std::forward_as_tuple(std::move(name), is_dir,
md.size(), md.ctime()));
}
assert(it->status().ok());
return entries;
}
/**
* Code example for iterating all entries in KV store. This is for debug only as
* it is too expensive
*/
void
MetadataDB::iterate_all() {
std::string key;
std::string val;
// Do RangeScan on parent inode
auto iter = db->NewIterator(rdb::ReadOptions());
for(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
key = iter->key().ToString();
val = iter->value().ToString();
}
}
void
MetadataDB::optimize_rocksdb_options(rdb::Options& options) {
options.max_successive_merges = 128;
}
Ramon Nou
committed
} // namespace gkfs::metadata