Commit ba63a434 authored by Ramon Nou's avatar Ramon Nou
Browse files

rocksdb tune and async

parent 6c9df78e
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -34,7 +34,7 @@ GKFS_FIND_PROCESS=10
GKFS_SERVERS=$SLURM_JOB_NUM_NODES
GKFS_FIND=sfind

srun -N $NUM_NODES -n $GKFS_FIND_PROCESS --overlap --overcommit --mem=0 --oversubscribe --export=ALL,LD_PRELOAD=${GKFS} $GKFS_FIND $@ -M $GKFS_MNT -S $GKFS_SERVERS
srun -N $NUM_NODES -n $GKFS_FIND_PROCESS --overlap --overcommit --mem=0 --oversubscribe --export=ALL,LD_PRELOAD=${GKFS} $GKFS_FIND $@ -M $GKFS_MNT -S $GKFS_SERVERS --server-side -C

#!/bin/bash
# scripts/aggregate_sfind_results.sh
+2 −1
Original line number Diff line number Diff line
@@ -54,7 +54,8 @@ forward_get_metadentry_size_proxy(const std::string& path);

std::pair<int, std::unique_ptr<std::vector<std::tuple<
                       const std::string, unsigned char, size_t, time_t>>>>
forward_get_dirents_single_proxy_v2(const std::string& path, int server);
forward_get_dirents_single_proxy_v2(const std::string& path, int server,
                                    const std::string& start_key = "");

} // namespace gkfs::rpc

+7 −3
Original line number Diff line number Diff line
@@ -105,6 +105,7 @@ decompress_and_parse_entries(const OutputOrErr& out,

    std::vector<std::tuple<const std::string, unsigned char, size_t, time_t>>
            entries;
    entries.reserve(static_cast<size_t>((end - p) / 32) + 1);

    while(p < end) {
        if(p + sizeof(unsigned char) > end) {
@@ -112,7 +113,8 @@ decompress_and_parse_entries(const OutputOrErr& out,
                __func__);
            break;
        }
        unsigned char type = *reinterpret_cast<const unsigned char*>(p);
        unsigned char type = 0;
        std::memcpy(&type, p, sizeof(unsigned char));
        p += sizeof(unsigned char);

        if(p + sizeof(size_t) > end) {
@@ -120,7 +122,8 @@ decompress_and_parse_entries(const OutputOrErr& out,
                __func__);
            break;
        }
        size_t file_size = *reinterpret_cast<const size_t*>(p);
        size_t file_size = 0;
        std::memcpy(&file_size, p, sizeof(size_t));
        p += sizeof(size_t);

        if(p + sizeof(time_t) > end) {
@@ -128,7 +131,8 @@ decompress_and_parse_entries(const OutputOrErr& out,
                __func__);
            break;
        }
        time_t ctime = *reinterpret_cast<const time_t*>(p);
        time_t ctime = 0;
        std::memcpy(&ctime, p, sizeof(time_t));
        p += sizeof(time_t);

        // Name is null-terminated. We must check we don't go past 'end'.
+29 −13
Original line number Diff line number Diff line
@@ -62,14 +62,12 @@ DentryCache::gen_dir_id(const std::string& dir_path) {

uint32_t
DentryCache::get_dir_id(const std::string& dir_path) {
    // check if id already exists in map and return
    if(entry_dir_id_.find(dir_path) != entry_dir_id_.end()) {
        return entry_dir_id_[dir_path];
    auto id_it = entry_dir_id_.find(dir_path);
    if(id_it != entry_dir_id_.end()) {
        return id_it->second;
    }
    // otherwise generate one
    auto dir_id = gen_dir_id(dir_path);
    entry_dir_id_.emplace(dir_path, dir_id);
    return dir_id;
    return entry_dir_id_.try_emplace(dir_path, dir_id).first->second;
}


@@ -78,18 +76,30 @@ DentryCache::insert(const std::string& parent_dir, const std::string& name,
                    const cache_entry value) {
    std::lock_guard<std::mutex> const lock(mtx_);
    auto dir_id = get_dir_id(parent_dir);
    entries_[dir_id].emplace(name, value);
    auto [entries_it, inserted] = entries_.try_emplace(
            dir_id, std::unordered_map<std::string, cache_entry>{});
    (void) inserted;
    entries_it->second.emplace(name, value);
}

std::optional<cache_entry>
DentryCache::get(const std::string& parent_dir, const std::string& name) {
    std::lock_guard<std::mutex> const lock(mtx_);
    auto dir_id = get_dir_id(parent_dir);
    if(entries_[dir_id].find(name) != entries_[dir_id].end()) {
        return entries_[dir_id][name];
    } else {
    auto id_it = entry_dir_id_.find(parent_dir);
    if(id_it == entry_dir_id_.end()) {
        return {};
    }

    auto entries_it = entries_.find(id_it->second);
    if(entries_it == entries_.end()) {
        return {};
    }

    auto entry_it = entries_it->second.find(name);
    if(entry_it == entries_it->second.end()) {
        return {};
    }
    return entry_it->second;
}

void
@@ -116,8 +126,14 @@ DentryCache::dump_cache_to_log(const std::string& dir_path) {
            dir_path);
        return;
    }
    auto dir_id = id_it->second;
    for(auto& [name, entry] : entries_[dir_id]) {
    auto entry_it = entries_.find(id_it->second);
    if(entry_it == entries_.end()) {
        LOG(INFO, "{}(): Cache contents for dir path '{}' NONE", __func__,
            dir_path);
        return;
    }

    for(auto& [name, entry] : entry_it->second) {
        // log entry
        LOG(INFO,
            "{}(): Cache contents for dir path '{}' -> name '{}' type '{}' size '{}' ctime '{}'",
+149 −37
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@
*/

#include <config.hpp>
#include <algorithm>
#include <numeric>

#include <client/preload.hpp>
@@ -48,6 +49,7 @@
#include <client/rpc/forward_metadata_proxy.hpp>
#include <client/rpc/forward_data.hpp>
#include <client/rpc/forward_data_proxy.hpp>
#include <client/rpc/utils.hpp>
#include <client/open_dir.hpp>
#include <client/cache.hpp>
#include <string>
@@ -55,10 +57,12 @@
#include <set>
#include <tuple>
#include <thread>
#include <future>
#include <unordered_map>

#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_types_thallium.hpp>
#include <common/path_util.hpp>
#include <thallium.hpp>
#ifdef GKFS_ENABLE_CLIENT_METRICS
#include <common/msgpack_util.hpp>
#endif
@@ -1138,48 +1142,34 @@ gkfs_opendir(const std::string& path) {
    // metadata this is used in get_metadata() later to avoid stat RPCs
    if(CTX->use_dentry_cache()) {
        ret.second = make_shared<gkfs::filemap::OpenDir>(path);
        std::vector<std::future<pair<
                int, unique_ptr<vector<tuple<const basic_string<char>,
                                             unsigned char, size_t, time_t>>>>>>
                dcache_futures;

        LOG(DEBUG,
            "{}() Sending async dirents for path '{}' to '{}' daemons ...",
            "{}() Sending async dirents for path '{}' to '{}' targets ...",
            __func__, path, CTX->hosts().size());
        // Launch RPC calls asynchronously
        // We need to filter the results from the dentry cache as
        // forward_get_dirents_single gathers all the files
        for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
            dcache_futures.push_back(std::async(std::launch::async, [&, i]() {
                if(gkfs::config::proxy::fwd_get_dirents_single &&
                   CTX->use_proxy()) {
                    return gkfs::rpc::forward_get_dirents_single_proxy_v2(path,
                                                                          i);
                } else {
                    return gkfs::rpc::forward_get_dirents_single(path, i);
                }
            }));
        }

        int cnt = 0;
        // Collect and process results
        ret.second->add(".", gkfs::filemap::FileType::directory);
        ret.second->add("..", gkfs::filemap::FileType::directory);
        for(auto& fut : dcache_futures) {
            auto res = fut.get(); // Wait for the RPC result

            if(res.first != 0) {
                ret.first = res.first;
                LOG(ERROR, "{}() RPC failed with error: {}", __func__,
                    res.first);
                continue;
            }
            if(!res.second) {
                LOG(ERROR, "{}() RPC returned null entries vector", __func__);
                ret.first = EIO;
                continue;
            }

            auto& open_dir = *res.second;
            for(auto& dentry : open_dir) {
        const auto targets = CTX->distributor()->locate_directory_metadata();
        const auto target_count = targets.size();
        const bool use_proxy_dirents =
                gkfs::config::proxy::fwd_get_dirents_single && CTX->use_proxy();
        const size_t initial_buffer_size =
                std::max<size_t>(size_t{1}, CTX->dirents_buff_size());

        std::vector<std::vector<char>> buffers(target_count);
        std::vector<thallium::bulk> exposed_buffers;
        std::vector<thallium::async_response> waiters;
        std::vector<size_t> waiter_buffer_ids;
        std::vector<uint64_t> waiter_servers;
        exposed_buffers.reserve(target_count);
        waiters.reserve(target_count);
        waiter_buffer_ids.reserve(target_count);
        waiter_servers.reserve(target_count);

        auto consume_entries = [&](const auto& open_dir) {
            for(const auto& dentry : open_dir) {
                // type returns as unsigned char
                LOG(DEBUG, "name: {} type: {} size: {} ctime: {}",
                    get<0>(dentry), static_cast<int>(get<1>(dentry)),
@@ -1199,7 +1189,129 @@ gkfs_opendir(const std::string& path) {
                                                      get<3>(dentry)});
                cnt++;
            }
        };

        if(use_proxy_dirents) {
            auto proxy_rpc = CTX->ipc_engine()->define(
                    gkfs::rpc::tag::client_proxy_get_dirents_extended);

            for(uint64_t i = 0; i < target_count; ++i) {
                buffers[i].resize(initial_buffer_size);
                std::vector<std::pair<void*, std::size_t>> segments = {
                        {buffers[i].data(), buffers[i].size()}};
                try {
                    exposed_buffers.emplace_back(CTX->ipc_engine()->expose(
                            segments, thallium::bulk_mode::read_write));
                    gkfs::rpc::rpc_client_proxy_get_dirents_in_t in;
                    in.path = path;
                    in.server_id = targets[i];
                    in.start_key = "";
                    in.bulk_handle = exposed_buffers.back();
                    waiters.push_back(
                            proxy_rpc.on(CTX->proxy_host()).async(in));
                    waiter_buffer_ids.push_back(i);
                    waiter_servers.push_back(targets[i]);
                } catch(const std::exception& ex) {
                    LOG(ERROR, "{}() Failed to post proxy dirents RPC: {}",
                        __func__, ex.what());
                    ret.first = EBUSY;
                }
            }
        } else {
            auto get_dirents_rpc = CTX->rpc_engine()->define(
                    gkfs::rpc::tag::get_dirents_extended);
            std::string root_path = path;
            if(root_path.length() > 1 && root_path.back() != '/') {
                root_path += '/';
            }

            for(uint64_t i = 0; i < target_count; ++i) {
                buffers[i].resize(initial_buffer_size);
                std::vector<std::pair<void*, std::size_t>> segments = {
                        {buffers[i].data(), buffers[i].size()}};
                try {
                    exposed_buffers.emplace_back(CTX->rpc_engine()->expose(
                            segments, thallium::bulk_mode::read_write));

                    gkfs::rpc::rpc_get_dirents_in_t in;
                    in.path = root_path;
                    in.start_key = "";
                    in.bulk_handle = exposed_buffers.back();
                    waiters.push_back(
                            get_dirents_rpc.on(CTX->hosts().at(targets[i]))
                                    .async(in));
                    waiter_buffer_ids.push_back(i);
                    waiter_servers.push_back(i);
                } catch(const std::exception& ex) {
                    LOG(ERROR, "{}() Failed to post dirents RPC to {}: {}",
                        __func__, i, ex.what());
                    ret.first = EBUSY;
                }
            }
        }

        for(size_t i = 0; i < waiters.size(); ++i) {
            auto server = waiter_servers[i];
            auto buffer_id = waiter_buffer_ids[i];
            try {
                gkfs::rpc::rpc_get_dirents_out_t out = waiters[i].wait();
                if(out.err == ENOBUFS) {
                    auto fallback_res =
                            use_proxy_dirents
                                    ? gkfs::rpc::
                                              forward_get_dirents_single_proxy_v2(
                                                      path, server)
                                    : gkfs::rpc::forward_get_dirents_single(
                                              path, server);
                    if(fallback_res.first != 0 || !fallback_res.second) {
                        ret.first =
                                fallback_res.first ? fallback_res.first : EIO;
                        LOG(ERROR,
                            "{}() Fallback dirents RPC failed for server {} with error {}",
                            __func__, server, ret.first);
                        continue;
                    }
                    consume_entries(*fallback_res.second);
                    continue;
                }

                if(out.err != 0) {
                    ret.first = out.err;
                    LOG(ERROR,
                        "{}() Async dirents RPC failed for server {} with error {}",
                        __func__, server, out.err);
                    continue;
                }

                auto entries = gkfs::rpc::decompress_and_parse_entries(
                        out, buffers[buffer_id].data());
                consume_entries(entries);
                if(!entries.empty()) {
                    const auto& last_key = std::get<0>(entries.back());
                    auto remainder_res =
                            use_proxy_dirents
                                    ? gkfs::rpc::
                                              forward_get_dirents_single_proxy_v2(
                                                      path, server, last_key)
                                    : gkfs::rpc::forward_get_dirents_single(
                                              path, server, last_key);
                    if(remainder_res.first != 0 || !remainder_res.second) {
                        ret.first =
                                remainder_res.first ? remainder_res.first : EIO;
                        LOG(ERROR,
                            "{}() Failed to fetch remaining dirents for server {}: {}",
                            __func__, server, ret.first);
                        continue;
                    }
                    consume_entries(*remainder_res.second);
                }
            } catch(const std::exception& ex) {
                LOG(ERROR, "{}() Failed waiting async dirents response: {}",
                    __func__, ex.what());
                ret.first = EBUSY;
            }
        }

        LOG(DEBUG, "{}() Unpacked dirents for path '{}' counted '{}' entries",
            __func__, path, cnt);
    } else {
Loading