Commit e7866dd9 authored by Ramon Nou's avatar Ramon Nou
Browse files

All rocksdb operations are transferred to KREON (change define in db.cpp to enable/disable)

parent d9fdbbdf
Loading
Loading
Loading
Loading
Loading
+145 −29
Original line number Diff line number Diff line
@@ -118,6 +118,28 @@ MetadataDB::throw_rdb_status_excpt(const rdb::Status& s) {
    }
}

/**
 * Convert a String to klc_key
 * @param key
 * @param klc_key struct
 */

inline void str2klc(const std::string& key, struct klc_key & K) {
    K.size = key.size()+1;
    K.data = key.data();
}

/**
 * Convert a String to klc_value
 * @param value
 * @param klc_value struct
 */

inline void str2klc(const std::string& value, struct klc_value & V) {
    V.size = value.size()+1;
    V.data = value.data();
}

/**
 * Gets a KV store value for a key
 * @param key
@@ -130,8 +152,7 @@ MetadataDB::get(const std::string& key) const {
#ifdef KREON
    struct klc_key K;
    struct klc_value *V = NULL;
    K.size = key.length();
    K.data = key.c_str();
    str2klc(key, K);

    klc_ret_code ret = klc_get(klc_db, &K, &V);
    if (ret != KLC_SUCCESS) {
@@ -164,10 +185,9 @@ MetadataDB::put(const std::string& key, const std::string& val) {
#ifdef KREON
    struct klc_key_value key_value;

    key_value.k.size = key.length();
    key_value.k.data = key.c_str();
    key_value.v.size = val.length()+1;
    key_value.v.data = val.c_str();
    str2klc(key, key_value.k);
    str2klc(val, key_value.v);


    klc_ret_code ret = klc_put(klc_db, &key_value);
    if (ret != KLC_SUCCESS) {
@@ -198,10 +218,8 @@ MetadataDB::put_no_exist(const std::string& key, const std::string& val) {
#ifdef KREON
    struct klc_key_value key_value;

    key_value.k.size = key.size();
    key_value.k.data = key.c_str();
    key_value.v.size = val.length()+1;
    key_value.v.data = val.c_str();
    str2klc(key, key_value.k);
    str2klc(val, key_value.v);

    klc_ret_code ret = klc_exists(klc_db, &key_value.k);

@@ -228,10 +246,10 @@ MetadataDB::remove(const std::string& key) {
#ifdef KREON
    struct klc_key k;

    k.size = key.size();
    k.data = key.c_str();
    str2klc(key, k);
    klc_ret_code ret = klc_delete(klc_db, &k);
    if (!ret == KLC_SUCCESS)

    if (ret != KLC_SUCCESS)
    {
        rdb::Status s;
        s = rdb::Status::NotFound();
@@ -254,15 +272,13 @@ MetadataDB::remove(const std::string& key) {
 */
bool
MetadataDB::exists(const std::string& key) {
    std::string val;
#ifdef KREON
    struct klc_key_value key_value;

    key_value.k.size = key.size();
    key_value.k.data = key.c_str();
#ifdef KREON
    struct klc_key k;

    str2klc(key, k);

    klc_ret_code ret = klc_exists(klc_db, &key_value.k);
    klc_ret_code ret = klc_exists(klc_db, &k);
    if (ret == KLC_KEY_NOT_FOUND ) {
       return true;
    }
@@ -298,13 +314,10 @@ MetadataDB::update(const std::string& old_key, const std::string& new_key,
    struct klc_key_value n_key_value;
    struct klc_key o_key;

    n_key_value.k.size = new_key.size();
    n_key_value.k.data = new_key.c_str();
    n_key_value.v.size = val.length()+1;
    n_key_value.v.data = val.c_str();
    str2klc(new_key, n_key_value.k);
    str2klc(val, n_key_value.v);

    o_key.size = old_key.size();
    o_key.data = old_key.c_str();
    str2klc(old_key, o_key);

    klc_delete(klc_db, &o_key);
    klc_ret_code ret = klc_put(klc_db, &n_key_value);
@@ -394,12 +407,12 @@ MetadataDB::get_dirents(const std::string& dir) const {
        // add trailing slash only if missing and is not the root_folder "/"
        root_path.push_back('/');
    }

#ifdef ROCKSDB
    rocksdb::ReadOptions ropts;
    auto it = db->NewIterator(ropts);

    std::vector<std::pair<std::string, bool>> entries;

    std::cout << "getdirents RP:["<<root_path << "]" << std::endl;
    for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
        it->Next()) {

@@ -410,6 +423,7 @@ MetadataDB::get_dirents(const std::string& dir) const {

        /***** Get File name *****/
        auto name = it->key().ToString();
        std::cout << "getdirents RES:["<<name << "]" << std::endl;
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            continue;
@@ -426,6 +440,55 @@ MetadataDB::get_dirents(const std::string& dir) const {
        entries.emplace_back(std::move(name), is_dir);
    }
    assert(it->status().ok());
#endif
#ifdef KREON
    struct klc_key K;
// One difference between Kreon vs ROCKSDB is that

    str2klc(root_path, K);

    klc_scanner S = klc_init_scanner(klc_db, &K, KLC_GREATER_OR_EQUAL);

    std::vector<std::pair<std::string, bool>> entries;

    while (klc_is_valid(S)) {
        struct klc_key K2 = klc_get_key(S);
        struct klc_value value = klc_get_value(S);

        std::string k(K2.data, K2.size);
        std::string v(value.data, value.size);
        if ( k.size() < root_path.size() || k.substr(0,root_path.size()) != root_path ) {
            break;
        }

        if(k.size() == root_path.size()) {
            klc_get_next(S);
            continue;
        }

        /***** Get File name *****/
        auto name = k;
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            klc_get_next(S);
            continue;
        }
        // remove prefix
        name = name.substr(root_path.size());

        // relative path of directory entries must not be empty
        assert(!name.empty());

        Metadata md(v);
        auto is_dir = S_ISDIR(md.mode());

        entries.emplace_back(std::move(name), is_dir);

        klc_get_next(S);
    }
    // If we don't close the scanner we cannot delete keys
    klc_close_scanner(S);
#endif
    return entries;
}

@@ -445,7 +508,7 @@ MetadataDB::get_dirents_extended(const std::string& dir) const {
        // add trailing slash only if missing and is not the root_folder "/"
        root_path.push_back('/');
    }

#ifdef ROCKSDB
    rocksdb::ReadOptions ropts;
    auto it = db->NewIterator(ropts);

@@ -478,6 +541,59 @@ MetadataDB::get_dirents_extended(const std::string& dir) const {
                                                   md.size(), md.ctime()));
    }
    assert(it->status().ok());
#endif
#ifdef KREON
    struct klc_key K;

    str2klc(root_path, K);

    klc_scanner S = klc_init_scanner(klc_db, &K, KLC_GREATER_OR_EQUAL);

    std::vector<std::tuple<std::string, bool, size_t, time_t>> entries;

    while (klc_is_valid(S)) {
        struct klc_key K2 = klc_get_key(S);
        struct klc_value value = klc_get_value(S);

        std::string k(K2.data, K2.size);
        std::string v(value.data, value.size);

        if ( k.size() < root_path.size() || k.substr(0,root_path.size()) != root_path ) {
            break;
        }

        if(k.size() == root_path.size()) {
            if (klc_get_next(S) && !klc_is_valid(S))
                break;
            continue;
        }

        /***** Get File name *****/
        auto name = k;
        if(name.find_first_of('/', root_path.size()) != std::string::npos) {
            // skip stuff deeper then one level depth
            if (klc_get_next(S) && !klc_is_valid(S))
                break;
            continue;
        }
        // remove prefix
        name = name.substr(root_path.size());

        // relative path of directory entries must not be empty
        assert(!name.empty());

        Metadata md(v);
        auto is_dir = S_ISDIR(md.mode());

        entries.emplace_back(std::forward_as_tuple(std::move(name), is_dir,
                                                   md.size(), md.ctime()));

        if (klc_get_next(S) && !klc_is_valid(S))
            break;
    }
    // If we don't close the scanner we cannot delete keys
    klc_close_scanner(S);
#endif
    return entries;
}