Commit e76adfc1 authored by Ramon Nou's avatar Ramon Nou
Browse files

First step to generalization

parent f3ecdcdb
Loading
Loading
Loading
Loading
Loading
+103 −0
Original line number Diff line number Diff line
/*
  Copyright 2018-2021, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2021, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This file is part of GekkoFS.

  GekkoFS is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  GekkoFS is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: GPL-3.0-or-later
*/

#ifndef GEKKOFS_METADATA_DB_HPP
#define GEKKOFS_METADATA_DB_HPP

#include <memory>
#include <spdlog/spdlog.h>
#include <rocksdb/db.h>
#include <daemon/backend/exceptions.hpp>
#include <tuple>
extern "C" {
#include <kreon.h>
}
namespace rdb = rocksdb;

namespace gkfs::metadata {

class MetadataDB {
private:
    std::unique_ptr<rdb::DB> db;
    rdb::Options options;
    rdb::WriteOptions write_opts;
    std::string path;
    std::shared_ptr<spdlog::logger> log_;

    // kreon
    klc_handle klc_db;
    klc_db_options klc_options;
    std::string klc_path;

    static void
    optimize_rocksdb_options(rdb::Options& options);

public:
    static inline void
    throw_rdb_status_excpt(const rdb::Status& s);

    explicit MetadataDB(const std::string& path);

    std::string
    get(const std::string& key) const;

    void
    put(const std::string& key, const std::string& val);

    void
    put_no_exist(const std::string& key, const std::string& val);

    void
    remove(const std::string& key);

    bool
    exists(const std::string& key);

    void
    update(const std::string& old_key, const std::string& new_key,
           const std::string& val);

    void
    increase_size(const std::string& key, size_t size, bool append);

    void
    decrease_size(const std::string& key, size_t size);

    std::vector<std::pair<std::string, bool>>
    get_dirents(const std::string& dir) const;

    std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_dirents_extended(const std::string& dir) const;

    void
    iterate_all();
};

} // namespace gkfs::metadata

#endif // GEKKOFS_METADATA_DB_HPP
+4 −0
Original line number Diff line number Diff line
@@ -32,10 +32,14 @@ target_sources(metadata_db
    PUBLIC
    ${INCLUDE_DIR}/daemon/backend/metadata/db.hpp
    ${INCLUDE_DIR}/daemon/backend/exceptions.hpp
    ${INCLUDE_DIR}/daemon/backend/metadata/kreon_backend.hpp
    ${INCLUDE_DIR}/daemon/backend/metadata/rocksdb_backend.hpp
    PRIVATE
    ${INCLUDE_DIR}/daemon/backend/metadata/merge.hpp
    ${CMAKE_CURRENT_LIST_DIR}/merge.cpp
    ${CMAKE_CURRENT_LIST_DIR}/db.cpp
    ${CMAKE_CURRENT_LIST_DIR}/kreon_backend.cpp
    ${CMAKE_CURRENT_LIST_DIR}/rocksdb_backend.cpp
    )

add_library(metadata_module
+18 −422
Original line number Diff line number Diff line
@@ -39,8 +39,7 @@ extern "C" {
#include <sys/stat.h>
}

//#define ROCKSDB 1
#define KREON 1


namespace gkfs::metadata {

@@ -50,95 +49,19 @@ namespace gkfs::metadata {
 */
MetadataDB::MetadataDB(const std::string& path) : path(path) {
#ifdef KREON
    // Kreon
    klc_path = path; // file is rocksdb
    int64_t size;

    int fd = open(klc_path.c_str(), O_RDONLY);
    if (fd == -1) {
        perror("open");
        exit(EXIT_FAILURE);
    }

    size = lseek(fd, 0, SEEK_END);
    if (size == -1) {
        printf("[%s:%s:%d] failed to determine volume size exiting...\n", __FILE__, __func__,
               __LINE__);
        perror("ioctl");
        exit(EXIT_FAILURE);
    }

    close(fd);
   db.reset(new KreonBackend(path));
#endif

    klc_options.create_flag = KLC_CREATE_DB;
    klc_options.db_name= "test";
    klc_options.volume_name = (char*)malloc(klc_path.size()+1);
    strcpy(klc_options.volume_name,klc_path.c_str());
    klc_options.volume_start = 0;
    klc_options.volume_size = size;
    klc_db = klc_open(&klc_options);
#ifdef ROCKSDB
   db.reset(new RocksDBBackend(path));
#endif
    /* Get logger instance and set it for data module and chunk storage */
    GKFS_METADATA_MOD->log(spdlog::get(GKFS_METADATA_MOD->LOGGER_NAME));
    assert(GKFS_METADATA_MOD->log());
    log_ = spdlog::get(GKFS_METADATA_MOD->LOGGER_NAME);
    assert(log_);
#ifdef ROCKSDB
    // Optimize RocksDB. This is the easiest way to get RocksDB to perform well
    options.IncreaseParallelism();
    options.OptimizeLevelStyleCompaction();
    // create the DB if it's not already present
    options.create_if_missing = true;
    options.merge_operator.reset(new MetadataMergeOperator);
    MetadataDB::optimize_rocksdb_options(options);
    write_opts.disableWAL = !(gkfs::config::rocksdb::use_write_ahead_log);
    rdb::DB* rdb_ptr = nullptr;
    auto s = rocksdb::DB::Open(options, path, &rdb_ptr);
    if(!s.ok()) {
        throw std::runtime_error("Failed to open RocksDB: " + s.ToString());
    }
    this->db.reset(rdb_ptr);
#endif
}

/**
 * Exception wrapper on Status object. Throws NotFoundException if
 * s.IsNotFound(), general DBException otherwise
 * @param RocksDB status
 * @throws DBException
 */
void
MetadataDB::throw_rdb_status_excpt(const rdb::Status& s) {
    assert(!s.ok());

    if(s.IsNotFound()) {
        throw NotFoundException(s.ToString());
    } else {
        throw DBException(s.ToString());
    }
}

/**
 * Convert a String to klc_key
 * @param key
 * @param klc_key struct
 */

inline void str2klc(const std::string& key, struct klc_key & K) {
    K.size = key.size()+1;
    K.data = key.data();
}

/**
 * Convert a String to klc_value
 * @param value
 * @param klc_value struct
 */

inline void str2klc(const std::string& value, struct klc_value & V) {
    V.size = value.size()+1;
    V.data = value.data();
}

/**
 * Gets a KV store value for a key
@@ -149,26 +72,9 @@ inline void str2klc(const std::string& value, struct klc_value & V) {
std::string
MetadataDB::get(const std::string& key) const {
    std::string val;
#ifdef KREON
    struct klc_key K;
    struct klc_value *V = NULL;
    str2klc(key, K);

    klc_ret_code ret = klc_get(klc_db, &K, &V);
    if (ret != KLC_SUCCESS) {
    val = db->get(key);

        rdb::Status s;
        s = rdb::Status::NotFound();
        MetadataDB::throw_rdb_status_excpt(s);
    }
    else { val = V->data; free(V); }
#endif
#ifdef ROCKSDB
    auto s = db->Get(rdb::ReadOptions(), key, &val);
    if(!s.ok()) {
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
    return val;
}

@@ -182,27 +88,8 @@ void
MetadataDB::put(const std::string& key, const std::string& val) {
    assert(gkfs::path::is_absolute(key));
    assert(key == "/" || !gkfs::path::has_trailing_slash(key));
#ifdef KREON
    struct klc_key_value key_value;

    str2klc(key, key_value.k);
    str2klc(val, key_value.v);


    klc_ret_code ret = klc_put(klc_db, &key_value);
    if (ret != KLC_SUCCESS) {
        rdb::Status s;
        s = rdb::Status::NotFound();
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
#ifdef ROCKSDB
    auto cop = CreateOperand(val);
    auto s = db->Merge(write_opts, key, cop.serialize());
    if(!s.ok()) {
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
    db->put(key, val);
}

/**
@@ -215,25 +102,7 @@ MetadataDB::put(const std::string& key, const std::string& val) {
void
MetadataDB::put_no_exist(const std::string& key, const std::string& val) {

#ifdef KREON
    struct klc_key_value key_value;

    str2klc(key, key_value.k);
    str2klc(val, key_value.v);

    klc_ret_code ret = klc_exists(klc_db, &key_value.k);

    if (ret == KLC_KEY_NOT_FOUND) {
        klc_put(klc_db, &key_value);
    }
    else
        throw ExistsException(key);
#endif
#ifdef ROCKSDB
    if(exists(key))
        throw ExistsException(key);
    put(key, val);
#endif
    db->put_no_exist(key,val);
}

/**
@@ -243,25 +112,9 @@ MetadataDB::put_no_exist(const std::string& key, const std::string& val) {
 */
void
MetadataDB::remove(const std::string& key) {
#ifdef KREON
    struct klc_key k;

    str2klc(key, k);
    klc_ret_code ret = klc_delete(klc_db, &k);
    db->remove(key);

    if (ret != KLC_SUCCESS)
    {
        rdb::Status s;
        s = rdb::Status::NotFound();
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
#ifdef ROCKSDB
    auto s = db->Delete(write_opts, key);
    if(!s.ok()) {
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
}

/**
@@ -273,29 +126,7 @@ MetadataDB::remove(const std::string& key) {
bool
MetadataDB::exists(const std::string& key) {

#ifdef KREON
    struct klc_key k;

    str2klc(key, k);

    klc_ret_code ret = klc_exists(klc_db, &k);
    if (ret == KLC_KEY_NOT_FOUND ) {
       return true;
    }
    else
        return false; //TODO it is not the only case, we can have errors
#endif
#ifdef ROCKSDB
    auto s = db->Get(rdb::ReadOptions(), key, &val);
    if(!s.ok()) {
        if(s.IsNotFound()) {
            return false;
        } else {
            MetadataDB::throw_rdb_status_excpt(s);
        }
    }
    return true;
#endif
    return db->exists(key);
}

/**
@@ -309,34 +140,7 @@ void
MetadataDB::update(const std::string& old_key, const std::string& new_key,
                   const std::string& val) {

    // TODO: Kreon does not have transactions/Batches
#ifdef KREON
    struct klc_key_value n_key_value;
    struct klc_key o_key;

    str2klc(new_key, n_key_value.k);
    str2klc(val, n_key_value.v);

    str2klc(old_key, o_key);

    klc_delete(klc_db, &o_key);
    klc_ret_code ret = klc_put(klc_db, &n_key_value);
    if (ret != KLC_SUCCESS) {
        rdb::Status s;
        s = rdb::Status::NotFound();
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
#ifdef ROCKSDB
    // TODO use rdb::Put() method
    rdb::WriteBatch batch;
    batch.Delete(old_key);
    batch.Put(new_key, val);
    auto s = db->Write(write_opts, &batch);
    if(!s.ok()) {
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
   db->update(old_key, new_key, val);
}

/**
@@ -349,21 +153,9 @@ MetadataDB::update(const std::string& old_key, const std::string& new_key,
 */
void
MetadataDB::increase_size(const std::string& key, size_t size, bool append) {
#ifdef ROCKSDB
    auto uop = IncreaseSizeOperand(size, append);
    auto s = db->Merge(write_opts, key, uop.serialize());
    if(!s.ok()) {
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
#ifdef KREON
    auto value = get(key);
    // Decompress string
    Metadata md(value);
    if (append) size=md.size()+size;
    md.size(size);
    update(key,key,md.serialize());
#endif

   db->increase_size(key, size, append);

}

/**
@@ -375,21 +167,8 @@ MetadataDB::increase_size(const std::string& key, size_t size, bool append) {
 */
void
MetadataDB::decrease_size(const std::string& key, size_t size) {
#ifdef ROCKSDB
    auto uop = DecreaseSizeOperand(size);
    auto s = db->Merge(write_opts, key, uop.serialize());
    if(!s.ok()) {
        MetadataDB::throw_rdb_status_excpt(s);
    }
#endif
#ifdef KREON
    auto value = get(key);
    // Decompress string
    Metadata md(value);
    md.size(size);
    update(key,key,md.serialize());
#endif

   db->decrease_size(key, size);
}

/**
@@ -408,89 +187,8 @@ MetadataDB::get_dirents(const std::string& dir) const {
        // add trailing slash only if missing and is not the root_folder "/"
        root_path.push_back('/');
    }
#ifdef ROCKSDB
    rocksdb::ReadOptions ropts;
    auto it = db->NewIterator(ropts);

    std::vector<std::pair<std::string, bool>> entries;
    std::cout << "getdirents RP:["<<root_path << "]" << std::endl;
    for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
        it->Next()) {

        if(it->key().size() == root_path.size()) {
            // we skip this path cause it is exactly the root_path
            continue;
        }

        /***** Get File name *****/
        auto name = it->key().ToString();
        std::cout << "getdirents RES:["<<name << "]" << std::endl;
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            continue;
        }
        // remove prefix
        name = name.substr(root_path.size());

        // relative path of directory entries must not be empty
        assert(!name.empty());

        Metadata md(it->value().ToString());
        auto is_dir = S_ISDIR(md.mode());

        entries.emplace_back(std::move(name), is_dir);
    }
    assert(it->status().ok());
#endif
#ifdef KREON
    struct klc_key K;
// One difference between Kreon vs ROCKSDB is that

    str2klc(root_path, K);

    klc_scanner S = klc_init_scanner(klc_db, &K, KLC_GREATER_OR_EQUAL);

    std::vector<std::pair<std::string, bool>> entries;

    while (klc_is_valid(S)) {
        struct klc_key K2 = klc_get_key(S);
        struct klc_value value = klc_get_value(S);

        std::string k(K2.data, K2.size);
        std::string v(value.data, value.size);
        if ( k.size() < root_path.size() || k.substr(0,root_path.size()) != root_path ) {
            break;
        }

        if(k.size() == root_path.size()) {
            klc_get_next(S);
            continue;
        }

        /***** Get File name *****/
        auto name = k;
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            klc_get_next(S);
            continue;
        }
        // remove prefix
        name = name.substr(root_path.size());

        // relative path of directory entries must not be empty
        assert(!name.empty());

        Metadata md(v);
        auto is_dir = S_ISDIR(md.mode());

        entries.emplace_back(std::move(name), is_dir);

        klc_get_next(S);
    }
    // If we don't close the scanner we cannot delete keys
    klc_close_scanner(S);
#endif
    return entries;
   return db->get_dirents(dir);
}

/**
@@ -509,93 +207,8 @@ MetadataDB::get_dirents_extended(const std::string& dir) const {
        // add trailing slash only if missing and is not the root_folder "/"
        root_path.push_back('/');
    }
#ifdef ROCKSDB
    rocksdb::ReadOptions ropts;
    auto it = db->NewIterator(ropts);

    std::vector<std::tuple<std::string, bool, size_t, time_t>> entries;

    for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
        it->Next()) {

        if(it->key().size() == root_path.size()) {
            // we skip this path cause it is exactly the root_path
            continue;
        }

        /***** Get File name *****/
        auto name = it->key().ToString();
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            continue;
        }
        // remove prefix
        name = name.substr(root_path.size());

        // relative path of directory entries must not be empty
        assert(!name.empty());

        Metadata md(it->value().ToString());
        auto is_dir = S_ISDIR(md.mode());

        entries.emplace_back(std::forward_as_tuple(std::move(name), is_dir,
                                                   md.size(), md.ctime()));
    }
    assert(it->status().ok());
#endif
#ifdef KREON
    struct klc_key K;

    str2klc(root_path, K);

    klc_scanner S = klc_init_scanner(klc_db, &K, KLC_GREATER_OR_EQUAL);

    std::vector<std::tuple<std::string, bool, size_t, time_t>> entries;

    while (klc_is_valid(S)) {
        struct klc_key K2 = klc_get_key(S);
        struct klc_value value = klc_get_value(S);

        std::string k(K2.data, K2.size);
        std::string v(value.data, value.size);

        if ( k.size() < root_path.size() || k.substr(0,root_path.size()) != root_path ) {
            break;
        }

        if(k.size() == root_path.size()) {
            if (klc_get_next(S) && !klc_is_valid(S))
                break;
            continue;
        }

        /***** Get File name *****/
        auto name = k;
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            if (klc_get_next(S) && !klc_is_valid(S))
                break;
            continue;
        }
        // remove prefix
        name = name.substr(root_path.size());

        // relative path of directory entries must not be empty
        assert(!name.empty());

        Metadata md(v);
        auto is_dir = S_ISDIR(md.mode());

        entries.emplace_back(std::forward_as_tuple(std::move(name), is_dir,
                                                   md.size(), md.ctime()));

        if (klc_get_next(S) && !klc_is_valid(S))
            break;
    }
    // If we don't close the scanner we cannot delete keys
    klc_close_scanner(S);
#endif
    return entries;
   return db->get_dirents_extended(dir);
}


@@ -605,25 +218,8 @@ MetadataDB::get_dirents_extended(const std::string& dir) const {
 */
void
MetadataDB::iterate_all() {
    std::string key;
    std::string val;
    // Do RangeScan on parent inode
    auto iter = db->NewIterator(rdb::ReadOptions());
    for(iter->SeekToFirst(); iter->Valid(); iter->Next()) {
        key = iter->key().ToString();
        val = iter->value().ToString();
    }
    db->iterate_all();
}

/**
 * Called when RocksDB connection is established.
 * Used for setting KV store settings
 * see here: https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
 * @param options
 */
void
MetadataDB::optimize_rocksdb_options(rdb::Options& options) {
    options.max_successive_merges = 128;
}

} // namespace gkfs::metadata
+439 −0

File added.

Preview size limit exceeded, changes collapsed.

+344 −0

File added.

Preview size limit exceeded, changes collapsed.