Verified Commit b2b74091 authored by Marc Vef's avatar Marc Vef
Browse files

Adding directory entry cache avoiding stat RPCs. Flushed for dir on dir close.

This uses the extended dir RPC call.
parent 22eda4c1
Loading
Loading
Loading
Loading
+30 −5
Original line number Diff line number Diff line
@@ -30,17 +30,37 @@
#ifndef GKFS_CLIENT_CACHE
#define GKFS_CLIENT_CACHE

#include <client/open_file_map.hpp>

#include <ctime>
#include <functional>
#include <string>
#include <unordered_map>
#include <mutex>
#include <optional>
#include <cstdint>

namespace gkfs::cache {

struct cache_entry {
    gkfs::filemap::FileType file_type;
    uint64_t size;
    time_t ctime;
};

class Cache {
private:
    std::unordered_map<std::string, std::string> entries_;
    std::unordered_map<uint32_t, std::unordered_map<std::string, cache_entry>>
            entries_;
    std::unordered_map<std::string, uint32_t> entry_dir_id_;
    std::mutex mtx_;
    std::hash<std::string> str_hash;

    uint32_t
    gen_dir_id(const std::string& dir_path);

    uint32_t
    get_dir_id(const std::string& dir_path);

public:
    Cache() = default;
@@ -48,18 +68,23 @@ public:
    virtual ~Cache() = default;

    void
    insert(const std::string& key, const std::string& value);
    insert(const std::string& parent_dir, const std::string name,
           const cache_entry value);

    std::optional<cache_entry>
    get(const std::string& parent_dir, const std::string& name);

    std::optional<std::string>
    get(const std::string& key);
    void
    clear_dir(const std::string& dir_path);

    void
    remove(const std::string& key);
    dump_cache_to_log(const std::string& dir_path);

    void
    clear();
};


} // namespace gkfs::cache

#endif // GKFS_CLIENT_CACHE
+1 −0
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ static constexpr auto METRICS_IP_PORT = ADD_PREFIX("METRICS_IP_PORT");

static constexpr auto NUM_REPL = ADD_PREFIX("NUM_REPL");
static constexpr auto PROXY_PID_FILE = ADD_PREFIX("PROXY_PID_FILE");
static constexpr auto DIR_CACHE = ADD_PREFIX("ENABLE_DIR_CACHE");

} // namespace gkfs::env

+67 −10
Original line number Diff line number Diff line
@@ -28,38 +28,95 @@
*/

#include <client/cache.hpp>
#include <client/preload.hpp>
#include <client/preload_util.hpp>
#include <client/logging.hpp>

#include <cstdint>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_map>

namespace gkfs::cache {

uint32_t
Cache::gen_dir_id(const std::string& dir_path) {
    return str_hash(dir_path);
}

uint32_t
Cache::get_dir_id(const std::string& dir_path) {
    // check if id already exists in map and return
    if(entry_dir_id_.find(dir_path) != entry_dir_id_.end()) {
        return entry_dir_id_[dir_path];
    }
    // otherwise generate one
    auto dir_id = gen_dir_id(dir_path);
    entry_dir_id_.emplace(dir_path, dir_id);
    return dir_id;
}


void
Cache::insert(const std::string& key, const std::string& value) {
Cache::insert(const std::string& parent_dir, const std::string name,
              const cache_entry value) {
    std::lock_guard<std::mutex> const lock(mtx_);
    entries_[key] = value;
    auto dir_id = get_dir_id(parent_dir);
    entries_[dir_id].emplace(name, value);
}

std::optional<std::string>
Cache::get(const std::string& key) {
std::optional<cache_entry>
Cache::get(const std::string& parent_dir, const std::string& name) {
    std::lock_guard<std::mutex> const lock(mtx_);
    // return key if found
    if(entries_.find(key) != entries_.end()) {
        return entries_[key];
    }
    auto dir_id = get_dir_id(parent_dir);
    if(entries_[dir_id].find(name) != entries_[dir_id].end()) {
        return entries_[dir_id][name];
    } else {
        return {};
    }
}

void
Cache::remove(const std::string& key) {
Cache::clear_dir(const std::string& dir_path) {
    std::lock_guard<std::mutex> const lock(mtx_);
    entries_.erase(key);

    auto id_it = entry_dir_id_.find(dir_path);
    if(id_it == entry_dir_id_.end()) {
        return;
    }
    auto entry_it = entries_.find(id_it->second);
    if(entry_it != entries_.end()) {
        entries_.erase(entry_it);
    }
    entry_dir_id_.erase(id_it);
}

void
Cache::dump_cache_to_log(const std::string& dir_path) {
    std::lock_guard<std::mutex> const lock(mtx_);
    auto id_it = entry_dir_id_.find(dir_path);
    if(id_it == entry_dir_id_.end()) {
        LOG(INFO, "{}(): Cache contents for dir path '{}' NONE", __func__,
            dir_path);
        return;
    }
    auto dir_id = id_it->second;
    for(auto& [name, entry] : entries_[dir_id]) {
        // log entry
        LOG(INFO,
            "{}(): Cache contents for dir path '{}' -> name '{}' is_dir '{}' size '{}' ctime '{}'",
            __func__, dir_path, name,
            entry.file_type == gkfs::filemap::FileType::directory, entry.size,
            entry.ctime);
    }
}

void
Cache::clear() {
    std::lock_guard<std::mutex> const lock(mtx_);
    entries_.clear();
    entry_dir_id_.clear();
}

} // namespace gkfs::cache
+48 −4
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@
#include <client/rpc/forward_data.hpp>
#include <client/rpc/forward_data_proxy.hpp>
#include <client/open_dir.hpp>
#include <client/cache.hpp>

#include <common/path_util.hpp>
#ifdef GKFS_ENABLE_CLIENT_METRICS
@@ -1326,8 +1327,45 @@ gkfs_opendir(const std::string& path) {
        errno = ENOTDIR;
        return -1;
    }

    auto ret = gkfs::rpc::forward_get_dirents(path);
    pair<int, shared_ptr<gkfs::filemap::OpenDir>> ret{};
    // Use cache: Aka get all entries from all servers for the basic metadata
    // this is used in get_metadata() later to avoid stat RPCs
    if(CTX->use_cache()) {
        ret.second = make_shared<gkfs::filemap::OpenDir>(path);
        // TODO parallelize
        for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
            pair<int, unique_ptr<vector<tuple<const basic_string<char>, bool,
                                              size_t, time_t>>>>
                    res{};
            if(gkfs::config::proxy::fwd_get_dirents_single &&
               CTX->use_proxy()) {
                res = gkfs::rpc::forward_get_dirents_single_proxy(path, i);
            } else {
                res = gkfs::rpc::forward_get_dirents_single(path, i);
            }
            //            auto res = gkfs::rpc::forward_get_dirents_single(path,
            //            i);
            auto& open_dir = *res.second;
            for(auto& dentry : open_dir) {
                // type returns as a boolean. true if it is a directory
                LOG(DEBUG, "name: {} type: {} size: {} ctime: {}",
                    get<0>(dentry), get<1>(dentry), get<2>(dentry),
                    get<3>(dentry));
                auto ftype = get<1>(dentry) ? gkfs::filemap::FileType::directory
                                            : gkfs::filemap::FileType::regular;
                // filename, is_dir, size, ctime
                ret.second->add(get<0>(dentry), ftype);
                CTX->cache()->insert(path, get<0>(dentry),
                                     gkfs::cache::cache_entry{ftype,
                                                              get<2>(dentry),
                                                              get<3>(dentry)});
            }
            ret.first = res.first;
        }
        //        CTX->cache()->dump_cache_to_log(path);
    } else {
        ret = gkfs::rpc::forward_get_dirents(path);
    }
    auto err = ret.first;
    if(err) {
        errno = err;
@@ -1389,7 +1427,6 @@ gkfs_rmdir(const std::string& path) {
 */
int
gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {

    // Get opendir object (content was downloaded with opendir() call)
    auto open_dir = CTX->file_map()->get_dir(fd);
    if(open_dir == nullptr) {
@@ -1464,7 +1501,6 @@ gkfs_getdents(unsigned int fd, struct linux_dirent* dirp, unsigned int count) {
int
gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
                unsigned int count) {

    auto open_dir = CTX->file_map()->get_dir(fd);
    if(open_dir == nullptr) {
        // Cast did not succeeded: open_file is a regular file
@@ -1533,6 +1569,14 @@ gkfs_getdents64(unsigned int fd, struct linux_dirent64* dirp,
int
gkfs_close(unsigned int fd) {
    if(CTX->file_map()->exist(fd)) {
        if(CTX->use_cache()) {
            // clear cache for directory
            if(CTX->file_map()->get(fd)->type() ==
               gkfs::filemap::FileType::directory) {
                CTX->cache()->clear_dir(CTX->file_map()->get(fd)->path());
            }
            //            CTX->cache()->dump_cache_to_log(CTX->file_map()->get(fd)->path());
        }
        // No call to the daemon is required
        CTX->file_map()->remove(fd);
        return 0;
+11 −9
Original line number Diff line number Diff line
@@ -258,6 +258,7 @@ init_environment() {
#endif
        CTX->distributor(distributor);
    }
    if(gkfs::env::var_is_set(gkfs::env::DIR_CACHE)) {
        try {
            LOG(INFO, "Initializing client caching...");
            auto cache = std::make_shared<gkfs::cache::Cache>();
@@ -268,6 +269,7 @@ init_environment() {
            exit_error_msg(EXIT_FAILURE,
                           "Failed to initialize cache: "s + e.what());
        }
    }

    LOG(INFO, "Retrieving file system configuration...");

Loading