From 221716ecb391a770284314c5856424001623c4a5 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 11 Mar 2026 10:17:26 +0100 Subject: [PATCH 1/2] rocksdb tune and async --- examples/gfind/pfind.sh | 2 +- include/client/rpc/utils.hpp | 10 +- src/client/cache.cpp | 42 ++- src/client/gkfs_metadata.cpp | 167 +++++++++--- src/client/rpc/forward_metadata.cpp | 254 +++++++----------- .../backend/metadata/rocksdb_backend.cpp | 33 ++- src/daemon/handler/srv_metadata.cpp | 44 +-- 7 files changed, 320 insertions(+), 232 deletions(-) diff --git a/examples/gfind/pfind.sh b/examples/gfind/pfind.sh index 443d5cbd8..585c3623f 100755 --- a/examples/gfind/pfind.sh +++ b/examples/gfind/pfind.sh @@ -34,7 +34,7 @@ GKFS_FIND_PROCESS=10 GKFS_SERVERS=$SLURM_JOB_NUM_NODES GKFS_FIND=sfind -srun -N $NUM_NODES -n $GKFS_FIND_PROCESS --overlap --overcommit --mem=0 --oversubscribe --export=ALL,LD_PRELOAD=${GKFS} $GKFS_FIND $@ -M $GKFS_MNT -S $GKFS_SERVERS +srun -N $NUM_NODES -n $GKFS_FIND_PROCESS --overlap --overcommit --mem=0 --oversubscribe --export=ALL,LD_PRELOAD=${GKFS} $GKFS_FIND $@ -M $GKFS_MNT -S $GKFS_SERVERS --server-side -C #!/bin/bash # scripts/aggregate_sfind_results.sh diff --git a/include/client/rpc/utils.hpp b/include/client/rpc/utils.hpp index 15c5dd29c..80bd300a7 100644 --- a/include/client/rpc/utils.hpp +++ b/include/client/rpc/utils.hpp @@ -105,6 +105,7 @@ decompress_and_parse_entries(const OutputOrErr& out, std::vector> entries; + entries.reserve(static_cast((end - p) / 32) + 1); while(p < end) { if(p + sizeof(unsigned char) > end) { @@ -112,7 +113,8 @@ decompress_and_parse_entries(const OutputOrErr& out, __func__); break; } - unsigned char type = *reinterpret_cast(p); + unsigned char type = 0; + std::memcpy(&type, p, sizeof(unsigned char)); p += sizeof(unsigned char); if(p + sizeof(size_t) > end) { @@ -120,7 +122,8 @@ decompress_and_parse_entries(const OutputOrErr& out, __func__); break; } - size_t file_size = *reinterpret_cast(p); + size_t file_size = 0; + std::memcpy(&file_size, p, sizeof(size_t)); p += sizeof(size_t); if(p + sizeof(time_t) > end) { @@ -128,7 +131,8 @@ decompress_and_parse_entries(const OutputOrErr& out, __func__); break; } - time_t ctime = *reinterpret_cast(p); + time_t ctime = 0; + std::memcpy(&ctime, p, sizeof(time_t)); p += sizeof(time_t); // Name is null-terminated. We must check we don't go past 'end'. diff --git a/src/client/cache.cpp b/src/client/cache.cpp index 50401c8f5..070f3cdbe 100644 --- a/src/client/cache.cpp +++ b/src/client/cache.cpp @@ -62,14 +62,12 @@ DentryCache::gen_dir_id(const std::string& dir_path) { uint32_t DentryCache::get_dir_id(const std::string& dir_path) { - // check if id already exists in map and return - if(entry_dir_id_.find(dir_path) != entry_dir_id_.end()) { - return entry_dir_id_[dir_path]; + auto id_it = entry_dir_id_.find(dir_path); + if(id_it != entry_dir_id_.end()) { + return id_it->second; } - // otherwise generate one auto dir_id = gen_dir_id(dir_path); - entry_dir_id_.emplace(dir_path, dir_id); - return dir_id; + return entry_dir_id_.try_emplace(dir_path, dir_id).first->second; } @@ -78,18 +76,30 @@ DentryCache::insert(const std::string& parent_dir, const std::string& name, const cache_entry value) { std::lock_guard const lock(mtx_); auto dir_id = get_dir_id(parent_dir); - entries_[dir_id].emplace(name, value); + auto [entries_it, inserted] = entries_.try_emplace( + dir_id, std::unordered_map{}); + (void) inserted; + entries_it->second.emplace(name, value); } std::optional DentryCache::get(const std::string& parent_dir, const std::string& name) { std::lock_guard const lock(mtx_); - auto dir_id = get_dir_id(parent_dir); - if(entries_[dir_id].find(name) != entries_[dir_id].end()) { - return entries_[dir_id][name]; - } else { + auto id_it = entry_dir_id_.find(parent_dir); + if(id_it == entry_dir_id_.end()) { + return {}; + } + + auto entries_it = entries_.find(id_it->second); + if(entries_it == entries_.end()) { + return {}; + } + + auto entry_it = entries_it->second.find(name); + if(entry_it == entries_it->second.end()) { return {}; } + return entry_it->second; } void @@ -116,8 +126,14 @@ DentryCache::dump_cache_to_log(const std::string& dir_path) { dir_path); return; } - auto dir_id = id_it->second; - for(auto& [name, entry] : entries_[dir_id]) { + auto entry_it = entries_.find(id_it->second); + if(entry_it == entries_.end()) { + LOG(INFO, "{}(): Cache contents for dir path '{}' NONE", __func__, + dir_path); + return; + } + + for(auto& [name, entry] : entry_it->second) { // log entry LOG(INFO, "{}(): Cache contents for dir path '{}' -> name '{}' type '{}' size '{}' ctime '{}'", diff --git a/src/client/gkfs_metadata.cpp b/src/client/gkfs_metadata.cpp index ce2fd576d..f228b4fe5 100644 --- a/src/client/gkfs_metadata.cpp +++ b/src/client/gkfs_metadata.cpp @@ -38,6 +38,7 @@ */ #include +#include #include #include @@ -48,6 +49,7 @@ #include #include #include +#include #include #include #include @@ -55,10 +57,12 @@ #include #include #include -#include #include +#include +#include #include +#include #ifdef GKFS_ENABLE_CLIENT_METRICS #include #endif @@ -1138,48 +1142,34 @@ gkfs_opendir(const std::string& path) { // metadata this is used in get_metadata() later to avoid stat RPCs if(CTX->use_dentry_cache()) { ret.second = make_shared(path); - std::vector, - unsigned char, size_t, time_t>>>>>> - dcache_futures; + LOG(DEBUG, - "{}() Sending async dirents for path '{}' to '{}' daemons ...", + "{}() Sending async dirents for path '{}' to '{}' targets ...", __func__, path, CTX->hosts().size()); - // Launch RPC calls asynchronously - // We need to filter the results from the dentry cache as - // forward_get_dirents_single gathers all the files - for(uint64_t i = 0; i < CTX->hosts().size(); i++) { - dcache_futures.push_back(std::async(std::launch::async, [&, i]() { - if(gkfs::config::proxy::fwd_get_dirents_single && - CTX->use_proxy()) { - return gkfs::rpc::forward_get_dirents_single_proxy_v2(path, - i); - } else { - return gkfs::rpc::forward_get_dirents_single(path, i); - } - })); - } + int cnt = 0; // Collect and process results ret.second->add(".", gkfs::filemap::FileType::directory); ret.second->add("..", gkfs::filemap::FileType::directory); - for(auto& fut : dcache_futures) { - auto res = fut.get(); // Wait for the RPC result - - if(res.first != 0) { - ret.first = res.first; - LOG(ERROR, "{}() RPC failed with error: {}", __func__, - res.first); - continue; - } - if(!res.second) { - LOG(ERROR, "{}() RPC returned null entries vector", __func__); - ret.first = EIO; - continue; - } - - auto& open_dir = *res.second; - for(auto& dentry : open_dir) { + const auto targets = CTX->distributor()->locate_directory_metadata(); + const auto target_count = targets.size(); + const bool use_proxy_dirents = + gkfs::config::proxy::fwd_get_dirents_single && CTX->use_proxy(); + const size_t initial_buffer_size = + std::max(size_t{1}, CTX->dirents_buff_size()); + + std::vector> buffers(target_count); + std::vector exposed_buffers; + std::vector waiters; + std::vector waiter_buffer_ids; + std::vector waiter_servers; + exposed_buffers.reserve(target_count); + waiters.reserve(target_count); + waiter_buffer_ids.reserve(target_count); + waiter_servers.reserve(target_count); + + auto consume_entries = [&](const auto& open_dir) { + for(const auto& dentry : open_dir) { // type returns as unsigned char LOG(DEBUG, "name: {} type: {} size: {} ctime: {}", get<0>(dentry), static_cast(get<1>(dentry)), @@ -1199,7 +1189,110 @@ gkfs_opendir(const std::string& path) { get<3>(dentry)}); cnt++; } + }; + + if(use_proxy_dirents) { + auto proxy_rpc = CTX->ipc_engine()->define( + gkfs::rpc::tag::client_proxy_get_dirents_extended); + + for(uint64_t i = 0; i < target_count; ++i) { + buffers[i].resize(initial_buffer_size); + std::vector> segments = { + {buffers[i].data(), buffers[i].size()}}; + try { + exposed_buffers.emplace_back(CTX->ipc_engine()->expose( + segments, thallium::bulk_mode::read_write)); + gkfs::rpc::rpc_client_proxy_get_dirents_in_t in; + in.path = path; + in.server_id = targets[i]; + in.start_key = ""; + in.bulk_handle = exposed_buffers.back(); + waiters.push_back( + proxy_rpc.on(CTX->proxy_host()).async(in)); + waiter_buffer_ids.push_back(i); + waiter_servers.push_back(targets[i]); + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Failed to post proxy dirents RPC: {}", + __func__, ex.what()); + ret.first = EBUSY; + } + } + } else { + auto get_dirents_rpc = CTX->rpc_engine()->define( + gkfs::rpc::tag::get_dirents_extended); + std::string root_path = path; + if(root_path.length() > 1 && root_path.back() != '/') { + root_path += '/'; + } + + for(uint64_t i = 0; i < target_count; ++i) { + buffers[i].resize(initial_buffer_size); + std::vector> segments = { + {buffers[i].data(), buffers[i].size()}}; + try { + exposed_buffers.emplace_back(CTX->rpc_engine()->expose( + segments, thallium::bulk_mode::read_write)); + + gkfs::rpc::rpc_get_dirents_in_t in; + in.path = root_path; + in.start_key = ""; + in.bulk_handle = exposed_buffers.back(); + waiters.push_back( + get_dirents_rpc.on(CTX->hosts().at(targets[i])) + .async(in)); + waiter_buffer_ids.push_back(i); + waiter_servers.push_back(i); + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Failed to post dirents RPC to {}: {}", + __func__, i, ex.what()); + ret.first = EBUSY; + } + } + } + + for(size_t i = 0; i < waiters.size(); ++i) { + auto server = waiter_servers[i]; + auto buffer_id = waiter_buffer_ids[i]; + try { + gkfs::rpc::rpc_get_dirents_out_t out = waiters[i].wait(); + if(out.err == ENOBUFS) { + auto fallback_res = + use_proxy_dirents + ? gkfs::rpc:: + forward_get_dirents_single_proxy_v2( + path, server) + : gkfs::rpc::forward_get_dirents_single( + path, server); + if(fallback_res.first != 0 || !fallback_res.second) { + ret.first = + fallback_res.first ? fallback_res.first : EIO; + LOG(ERROR, + "{}() Fallback dirents RPC failed for server {} with error {}", + __func__, server, ret.first); + continue; + } + consume_entries(*fallback_res.second); + continue; + } + + if(out.err != 0) { + ret.first = out.err; + LOG(ERROR, + "{}() Async dirents RPC failed for server {} with error {}", + __func__, server, out.err); + continue; + } + + auto entries = gkfs::rpc::decompress_and_parse_entries( + out, buffers[buffer_id].data()); + consume_entries(entries); + } catch(const std::exception& ex) { + LOG(ERROR, "{}() Failed waiting async dirents response: {}", + __func__, ex.what()); + ret.first = EBUSY; + } } + LOG(DEBUG, "{}() Unpacked dirents for path '{}' counted '{}' entries", __func__, path, cnt); } else { diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index be074d23c..8f0cc4c5c 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -238,24 +238,18 @@ forward_mk_symlink(const std::string& path, const std::string& target_path) { } // function matches the standard rpc_get_dirents_out_t -inline std::vector> -decompress_and_parse_entries_standard( - const gkfs::rpc::rpc_get_dirents_out_t& out, - const void* compressed_buffer) { - // Duplicated: 'out' type differs (proxy vs daemon) - +template +std::pair +decompress_dirents_payload(const OutputType& out, const void* compressed_buffer, + std::vector& decompressed_data) { if(out.err != 0) { throw std::runtime_error("Server returned an error: " + std::to_string(out.err)); } if(out.dirents_size == 0) { - return {}; // No entries, return empty vector + return {nullptr, 0}; } - const char* p = nullptr; - const char* end = nullptr; - std::vector decompressed_data; - if(gkfs::config::rpc::use_dirents_compression) { const unsigned long long uncompressed_size = ZSTD_getFrameContentSize(compressed_buffer, out.dirents_size); @@ -283,100 +277,29 @@ decompress_and_parse_entries_standard( throw std::runtime_error("Decompression size mismatch."); } - p = decompressed_data.data(); - end = p + uncompressed_size; + return {decompressed_data.data(), + static_cast(uncompressed_size)}; } else { - p = static_cast(compressed_buffer); - end = p + out.dirents_size; + return {static_cast(compressed_buffer), out.dirents_size}; } - - std::vector> - entries; - entries.reserve(out.dirents_size); // Approx - - while(p < end) { - if(p + sizeof(unsigned char) > end) { - LOG(ERROR, "{}() Unexpected end of buffer while parsing type", - __func__); - break; - } - unsigned char type = *reinterpret_cast(p); - p += sizeof(unsigned char); - - // Name is null-terminated. We must check we don't go past 'end'. - size_t name_len = strnlen(p, end - p); - if(p + name_len >= end) { - LOG(ERROR, "{}() Unexpected end of buffer while parsing name", - __func__); - break; - } - - std::string name(p, name_len); - p += name_len + 1; - - if(!name.empty()) { - entries.emplace_back(name, type, 0, 0); - } - } - - return entries; } -// Helper for filtered entries which include size and ctime inline std::vector> -decompress_and_parse_entries_filtered( - const gkfs::rpc::rpc_get_dirents_filtered_out_t& out, +decompress_and_parse_entries_standard( + const gkfs::rpc::rpc_get_dirents_out_t& out, const void* compressed_buffer) { - - if(out.err != 0) { - throw std::runtime_error("Server returned an error: " + - std::to_string(out.err)); - } - if(out.dirents_size == 0) { - return {}; // No entries, return empty vector - } - - const char* p = nullptr; - const char* end = nullptr; std::vector decompressed_data; - - if(gkfs::config::rpc::use_dirents_compression) { - const unsigned long long uncompressed_size = - ZSTD_getFrameContentSize(compressed_buffer, out.dirents_size); - - if(uncompressed_size == ZSTD_CONTENTSIZE_ERROR) { - throw std::runtime_error( - "Received data is not a valid Zstd frame."); - } - if(uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) { - throw std::runtime_error( - "Zstd frame content size is unknown and was not written in the frame."); - } - - decompressed_data.resize(uncompressed_size); - const size_t result_size = - ZSTD_decompress(decompressed_data.data(), uncompressed_size, - compressed_buffer, out.dirents_size); - - if(ZSTD_isError(result_size)) { - throw std::runtime_error( - "Zstd decompression failed: " + - std::string(ZSTD_getErrorName(result_size))); - } - if(result_size != uncompressed_size) { - throw std::runtime_error("Decompression size mismatch."); - } - - p = decompressed_data.data(); - end = p + uncompressed_size; - } else { - p = static_cast(compressed_buffer); - end = p + out.dirents_size; + auto [payload, payload_size] = decompress_dirents_payload( + out, compressed_buffer, decompressed_data); + if(payload_size == 0) { + return {}; } + const char* p = payload; + const char* end = payload + payload_size; + std::vector> entries; - // We don't know exact count, but can optimize reserve? - // entries.reserve(out.dirents_size / 32); + entries.reserve((payload_size / 32U) + 1U); while(p < end) { if(p + sizeof(unsigned char) > end) { @@ -387,22 +310,6 @@ decompress_and_parse_entries_filtered( unsigned char type = *reinterpret_cast(p); p += sizeof(unsigned char); - if(p + sizeof(size_t) > end) { - LOG(ERROR, "{}() Unexpected end of buffer while parsing size", - __func__); - break; - } - size_t size = *reinterpret_cast(p); - p += sizeof(size_t); - - if(p + sizeof(time_t) > end) { - LOG(ERROR, "{}() Unexpected end of buffer while parsing ctime", - __func__); - break; - } - time_t ctime = *reinterpret_cast(p); - p += sizeof(time_t); - // Name is null-terminated. We must check we don't go past 'end'. size_t name_len = strnlen(p, end - p); if(p + name_len >= end) { @@ -415,7 +322,7 @@ decompress_and_parse_entries_filtered( p += name_len + 1; if(!name.empty()) { - entries.emplace_back(name, type, size, ctime); + entries.emplace_back(name, type, 0, 0); } } @@ -682,40 +589,69 @@ forward_update_metadentry_size(const string& path, const size_t size, int err = 0; off64_t ret_offset = 0; bool valid = false; + bool primary_valid = false; + off64_t primary_offset = 0; + + struct pending_update { + int copy; + thallium::async_response response; + }; + std::vector waiters; + waiters.reserve(num_copies + 1); + + auto update_rpc = + CTX->rpc_engine()->define(gkfs::rpc::tag::update_metadentry_size); for(auto copy = 0; copy < num_copies + 1; copy++) { auto endp = CTX->hosts().at( CTX->distributor()->locate_file_metadata(path, copy)); + gkfs::rpc::rpc_update_metadentry_size_in_t in; + in.path = path; + in.size = size; + in.offset = offset; + in.append = append_flag; + in.clear_inline = clear_inline_flag; + try { - gkfs::rpc::rpc_update_metadentry_size_in_t in; - in.path = path; - in.size = size; - in.offset = offset; - in.append = append_flag; - in.clear_inline = clear_inline_flag; - - auto update_rpc = CTX->rpc_engine()->define( - gkfs::rpc::tag::update_metadentry_size); - gkfs::rpc::rpc_update_metadentry_size_out_t out = - update_rpc.on(endp)(in); + waiters.push_back({copy, update_rpc.on(endp).async(in)}); + } catch(const std::exception& ex) { + LOG(ERROR, "{}() posting rpc for path '{}' replica {} failed: {}", + __func__, path, copy, ex.what()); + err = EBUSY; + } + } + for(auto& waiter : waiters) { + try { + gkfs::rpc::rpc_update_metadentry_size_out_t out = + waiter.response.wait(); if(out.err == 0) { valid = true; - ret_offset = out.ret_offset; + if(waiter.copy == 0) { + primary_valid = true; + primary_offset = out.ret_offset; + } + if(!primary_valid) { + ret_offset = out.ret_offset; + } } else { err = out.err; } } catch(const std::exception& ex) { LOG(ERROR, "{}() getting rpc output for path '{}' replica {} failed: {}", - __func__, path, copy, ex.what()); - // Continue to other replicas + __func__, path, waiter.copy, ex.what()); + err = EBUSY; } } if(!valid) - return make_pair(err, 0); + return make_pair(err ? err : EIO, 0); + + if(primary_valid) { + ret_offset = primary_offset; + } return make_pair(err, ret_offset); } @@ -738,43 +674,49 @@ forward_get_dirents_single(const string& path, int server, vector>>(); int err = 0; string start_key = start_key_arg; + auto endp = CTX->hosts().at(targets[server]); + auto get_dirents_rpc = + CTX->rpc_engine()->define(gkfs::rpc::tag::get_dirents_extended); + + // Ensure path ends with / for get_dirents prefix check + std::string root_path = path; + if(root_path.length() > 1 && root_path.back() != '/') { + root_path += '/'; + } + + thread_local std::vector large_buffer; + size_t buffer_size = + std::max(std::size_t{1}, CTX->dirents_buff_size()); + if(large_buffer.size() < buffer_size) { + large_buffer.resize(buffer_size); + } + thallium::bulk exposed_buffer; + bool expose_needed = true; // Chunking loop while(true) { - size_t buffer_size = CTX->dirents_buff_size(); - auto large_buffer = std::unique_ptr(new char[buffer_size]); const int max_retries = 2; bool chunk_success = false; - // Ensure path ends with / for get_dirents prefix check - std::string root_path = path; - if(root_path.length() > 1 && root_path.back() != '/') { - root_path += '/'; - } - for(int attempt = 0; attempt < max_retries; ++attempt) { - // Expose buffer - std::vector> segments = { - std::make_pair(large_buffer.get(), buffer_size)}; - thallium::bulk exposed_buffer; - try { - exposed_buffer = CTX->rpc_engine()->expose( - segments, thallium::bulk_mode::read_write); - } catch(const std::exception& e) { - LOG(ERROR, "Failed to expose buffer: {}", e.what()); - return make_pair(EBUSY, nullptr); + if(expose_needed) { + std::vector> segments = { + std::make_pair(large_buffer.data(), buffer_size)}; + try { + exposed_buffer = CTX->rpc_engine()->expose( + segments, thallium::bulk_mode::read_write); + expose_needed = false; + } catch(const std::exception& e) { + LOG(ERROR, "Failed to expose buffer: {}", e.what()); + return make_pair(EBUSY, nullptr); + } } - auto endp = CTX->hosts().at(targets[server]); - gkfs::rpc::rpc_get_dirents_in_t in; in.path = root_path; in.start_key = start_key; in.bulk_handle = exposed_buffer; - auto get_dirents_rpc = CTX->rpc_engine()->define( - gkfs::rpc::tag::get_dirents_extended); - try { gkfs::rpc::rpc_get_dirents_out_t out = get_dirents_rpc.on(endp)(in); @@ -784,9 +726,15 @@ forward_get_dirents_single(const string& path, int server, LOG(WARNING, "{}() Buffer too small. Server requested {} bytes. Retrying.", __func__, required_size); + if(required_size <= buffer_size) { + err = ENOBUFS; + break; + } buffer_size = required_size; - large_buffer = - std::unique_ptr(new char[buffer_size]); + if(large_buffer.size() < buffer_size) { + large_buffer.resize(buffer_size); + } + expose_needed = true; continue; // Retry with new buffer size } else if(out.err != 0) { err = out.err; @@ -794,7 +742,7 @@ forward_get_dirents_single(const string& path, int server, } auto current_entries = gkfs::rpc::decompress_and_parse_entries( - out, large_buffer.get()); + out, large_buffer.data()); if(current_entries.empty()) { return make_pair(0, std::move(all_entries)); diff --git a/src/daemon/backend/metadata/rocksdb_backend.cpp b/src/daemon/backend/metadata/rocksdb_backend.cpp index 0790ae284..abf326a8a 100644 --- a/src/daemon/backend/metadata/rocksdb_backend.cpp +++ b/src/daemon/backend/metadata/rocksdb_backend.cpp @@ -48,6 +48,11 @@ #include #include #include +#include +#include +#include +#include +#include extern "C" { #include } @@ -637,11 +642,6 @@ RocksDBBackend::get_dirents_filtered_impl( if(use_regex) { if(!std::regex_match(relative_name, name_regex)) { matched = false; - GKFS_DATA->spdlogger()->error("DEBUG: '{}' NO MATCH regex '{}'", - relative_name, filter_name); - } else { - GKFS_DATA->spdlogger()->error("DEBUG: '{}' MATCHED regex '{}'", - relative_name, filter_name); } } if(matched && filter_size != -1 && md.size() != (size_t) filter_size) { @@ -723,7 +723,30 @@ RocksDBBackend::db_size_impl() const { */ void RocksDBBackend::optimize_database_impl() { + constexpr size_t kib = 1024; + constexpr size_t mib = 1024 * kib; + options_.max_successive_merges = 128; + options_.level_compaction_dynamic_level_bytes = true; + options_.max_background_jobs = + std::max(4, static_cast(std::thread::hardware_concurrency())); + options_.bytes_per_sync = 1 * mib; + + // Metadata workloads are point-lookups and small updates heavy. + options_.write_buffer_size = 64 * mib; + options_.max_write_buffer_number = 4; + options_.min_write_buffer_number_to_merge = 2; + options_.target_file_size_base = 64 * mib; + options_.max_bytes_for_level_base = 256 * mib; + options_.optimize_filters_for_hits = true; + + rdb::BlockBasedTableOptions table_options; + table_options.block_size = 16 * kib; + table_options.cache_index_and_filter_blocks = true; + table_options.pin_l0_filter_and_index_blocks_in_cache = true; + table_options.block_cache = rdb::NewLRUCache(128 * mib); + table_options.filter_policy.reset(rdb::NewBloomFilterPolicy(10, false)); + options_.table_factory.reset(rdb::NewBlockBasedTableFactory(table_options)); } diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 37457227b..c6081deed 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -56,6 +56,8 @@ #include #include #include +#include +#include using namespace std; // namespace { @@ -510,6 +512,11 @@ get_dirents_helper(const std::shared_ptr& engine, size_t uncompressed_size = 0; size_t entries_serialized = 0; std::vector uncompressed_data; + auto append_bytes = [&uncompressed_data](const void* src, size_t len) { + const auto old_size = uncompressed_data.size(); + uncompressed_data.resize(old_size + len); + std::memcpy(uncompressed_data.data() + old_size, src, len); + }; if(client_bulk_size > 0) uncompressed_data.reserve(client_bulk_size); // Hint for reservation @@ -518,6 +525,16 @@ get_dirents_helper(const std::shared_ptr& engine, if constexpr(std::is_same_v>) { + if(client_bulk_size == 0) { + const auto estimated_size = + std::accumulate(entries.begin(), entries.end(), size_t{0}, + [](size_t sum, const auto& e) { + return sum + sizeof(unsigned char) + + sizeof(size_t) + sizeof(time_t) + + std::get<0>(e).size() + 1; + }); + uncompressed_data.reserve(estimated_size); + } for(const auto& e : entries) { const auto& name = std::get<0>(e); @@ -563,20 +580,10 @@ get_dirents_helper(const std::shared_ptr& engine, } // Serialize - const char* type_p = reinterpret_cast(&type); - uncompressed_data.insert(uncompressed_data.end(), type_p, - type_p + sizeof(unsigned char)); - - const char* size_p = reinterpret_cast(&file_size); - uncompressed_data.insert(uncompressed_data.end(), size_p, - size_p + sizeof(size_t)); - - const char* time_p = reinterpret_cast(&ctime); - uncompressed_data.insert(uncompressed_data.end(), time_p, - time_p + sizeof(time_t)); - - uncompressed_data.insert(uncompressed_data.end(), name.c_str(), - name.c_str() + name.length() + 1); + append_bytes(&type, sizeof(unsigned char)); + append_bytes(&file_size, sizeof(size_t)); + append_bytes(&ctime, sizeof(time_t)); + append_bytes(name.c_str(), name.length() + 1); entries_serialized++; } @@ -593,11 +600,8 @@ get_dirents_helper(const std::shared_ptr& engine, for(const auto& e : entries) { unsigned char type = e.second; - const char* type_p = reinterpret_cast(&type); - uncompressed_data.insert(uncompressed_data.end(), type_p, - type_p + sizeof(unsigned char)); - uncompressed_data.insert(uncompressed_data.end(), e.first.c_str(), - e.first.c_str() + e.first.length() + 1); + append_bytes(&type, sizeof(unsigned char)); + append_bytes(e.first.c_str(), e.first.length() + 1); entries_serialized++; } } @@ -1105,4 +1109,4 @@ rpc_srv_read_data_inline(const tl::request& req, // } // namespace -// Removed all DEFINE_MARGO_RPC_HANDLER as per instruction. \ No newline at end of file +// Removed all DEFINE_MARGO_RPC_HANDLER as per instruction. -- GitLab From 1c8196b1662ad52102a23f0d8ea4434a63b2ca9d Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Wed, 11 Mar 2026 10:45:13 +0100 Subject: [PATCH 2/2] fix chunking --- include/client/rpc/forward_metadata_proxy.hpp | 3 ++- src/client/gkfs_metadata.cpp | 19 +++++++++++++++++++ src/client/rpc/forward_metadata_proxy.cpp | 7 ++++--- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/include/client/rpc/forward_metadata_proxy.hpp b/include/client/rpc/forward_metadata_proxy.hpp index 9c165a22f..f3a57f6a6 100644 --- a/include/client/rpc/forward_metadata_proxy.hpp +++ b/include/client/rpc/forward_metadata_proxy.hpp @@ -54,7 +54,8 @@ forward_get_metadentry_size_proxy(const std::string& path); std::pair>>> -forward_get_dirents_single_proxy_v2(const std::string& path, int server); +forward_get_dirents_single_proxy_v2(const std::string& path, int server, + const std::string& start_key = ""); } // namespace gkfs::rpc diff --git a/src/client/gkfs_metadata.cpp b/src/client/gkfs_metadata.cpp index f228b4fe5..426d0f397 100644 --- a/src/client/gkfs_metadata.cpp +++ b/src/client/gkfs_metadata.cpp @@ -1286,6 +1286,25 @@ gkfs_opendir(const std::string& path) { auto entries = gkfs::rpc::decompress_and_parse_entries( out, buffers[buffer_id].data()); consume_entries(entries); + if(!entries.empty()) { + const auto& last_key = std::get<0>(entries.back()); + auto remainder_res = + use_proxy_dirents + ? gkfs::rpc:: + forward_get_dirents_single_proxy_v2( + path, server, last_key) + : gkfs::rpc::forward_get_dirents_single( + path, server, last_key); + if(remainder_res.first != 0 || !remainder_res.second) { + ret.first = + remainder_res.first ? remainder_res.first : EIO; + LOG(ERROR, + "{}() Failed to fetch remaining dirents for server {}: {}", + __func__, server, ret.first); + continue; + } + consume_entries(*remainder_res.second); + } } catch(const std::exception& ex) { LOG(ERROR, "{}() Failed waiting async dirents response: {}", __func__, ex.what()); diff --git a/src/client/rpc/forward_metadata_proxy.cpp b/src/client/rpc/forward_metadata_proxy.cpp index 41b1bb37b..4833a6bf7 100644 --- a/src/client/rpc/forward_metadata_proxy.cpp +++ b/src/client/rpc/forward_metadata_proxy.cpp @@ -143,7 +143,8 @@ forward_get_metadentry_size_proxy(const std::string& path) { pair>>> -forward_get_dirents_single_proxy_v2(const string& path, int server) { +forward_get_dirents_single_proxy_v2(const string& path, int server, + const std::string& start_key_arg) { LOG(DEBUG, "{}() enter for path '{}', server '{}'", __func__, path, server); auto endp = CTX->proxy_host(); @@ -151,7 +152,7 @@ forward_get_dirents_single_proxy_v2(const string& path, int server) { auto all_entries = make_unique< vector>>(); int err = 0; - string start_key = ""; + string start_key = start_key_arg; // Chunking loop: keep fetching until no more entries are returned while(true) { @@ -252,4 +253,4 @@ forward_get_dirents_single_proxy_v2(const string& path, int server) { } } // namespace rpc -} // namespace gkfs \ No newline at end of file +} // namespace gkfs -- GitLab