Commit 06001b71 authored by Ramon Nou's avatar Ramon Nou
Browse files

sfind chunking

parent 85ae7f92
Loading
Loading
Loading
Loading
+91 −74
Original line number Diff line number Diff line
@@ -1070,24 +1070,27 @@ forward_get_dirents_single(const string& path, int server) {
        return make_pair(EINVAL, nullptr);
    }

    size_t buffer_size = gkfs::config::rpc::dirents_buff_size;
    auto large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);

    auto output_ptr = make_unique<
    auto all_entries = make_unique<
            vector<tuple<const std::string, bool, size_t, time_t>>>();
    int err = 0;
    string start_key = "";

    // Chunking loop: keep fetching until no more entries are returned
    while(true) {
        size_t buffer_size = gkfs::config::rpc::dirents_buff_size;
        auto large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);
        const int max_retries = 2; // Prevent infinite loops
        bool chunk_success = false;

        for(int attempt = 0; attempt < max_retries; ++attempt) {

            // Expose the current buffer for RMA.
        // This needs to be done on each iteration because the buffer might be
        // reallocated.
            hermes::exposed_memory exposed_buffer;
            try {
                exposed_buffer = ld_network_service->expose(
                    std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
                            large_buffer.get(), buffer_size}},
                        std::vector<hermes::mutable_buffer>{
                                hermes::mutable_buffer{large_buffer.get(),
                                                       buffer_size}},
                        hermes::access_mode::write_only);
            } catch(const std::exception& ex) {
                LOG(ERROR,
@@ -1097,7 +1100,8 @@ forward_get_dirents_single(const string& path, int server) {
            }

            auto endp = CTX->hosts().at(targets[server]);
        gkfs::rpc::get_dirents_extended::input in(path, exposed_buffer);
            gkfs::rpc::get_dirents_extended::input in(path, exposed_buffer,
                                                      start_key);
            gkfs::rpc::get_dirents_extended::output out;

            try {
@@ -1105,15 +1109,16 @@ forward_get_dirents_single(const string& path, int server) {
                    "{}() Sending RPC to host '{}' (attempt {}, buffer size {})",
                    __func__, targets[server], attempt + 1, buffer_size);

            auto handle =
                    ld_network_service->post<gkfs::rpc::get_dirents_extended>(
                auto handle = ld_network_service
                                      ->post<gkfs::rpc::get_dirents_extended>(
                                              endp, in);
                out = handle.get().at(0);
            } catch(const std::exception& ex) {
            LOG(ERROR, "{}() RPC post/get failed on attempt {}: {}", __func__,
                attempt, ex.what());
                LOG(ERROR, "{}() RPC post/get failed on attempt {}: {}",
                    __func__, attempt, ex.what());
                err = EBUSY;
            break; // Fatal error, break the loop
                break; // Fatal error, break retry loop but might continue chunk
                       // loop? No, EBUSY is bad.
            }

            // === RETRY LOGIC ===
@@ -1122,43 +1127,55 @@ forward_get_dirents_single(const string& path, int server) {
                LOG(WARNING,
                    "{}() Buffer too small. Server requested {} bytes. Retrying.",
                    __func__, required_size);

            // Re-allocate the buffer to the exact size the server needs.
                // Re-allocate
                buffer_size = required_size;
                large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);

            // The `exposed_buffer` from this iteration will be destructed.
            // The loop will continue for the next attempt with the new buffer.
                continue;

            } else if(out.err() != 0) {
            // A different, fatal server-side error occurred.
                LOG(ERROR, "{}() Server returned a fatal error: {}", __func__,
                    strerror(out.err()));
                err = out.err();
            break; // Break the loop
                break; // Break retry loop
            }

            // --- SUCCESS! ---
        // If we reach here, out.err() was 0.
            LOG(DEBUG, "{}() RPC successful. Decompressing data.", __func__);
            try {
                auto entries_vector =
                        decompress_and_parse_entries(out, large_buffer.get());
            output_ptr = make_unique<
                    vector<tuple<const std::string, bool, size_t, time_t>>>(
                    std::move(entries_vector));
            err = 0; // Explicitly set success

                if(entries_vector.empty()) {
                    // No more entries to fetch, we are done.
                    return make_pair(0, std::move(all_entries));
                }

                // Append to accumulator using move
                for(auto& e : entries_vector) {
                    // Check if we already have this entry (should not happen
                    // with correct start_key logic unless server returns
                    // start_key itself, but backend logic skips it).
                    all_entries->push_back(std::move(e));
                }

                // Update start_key for next chunk
                start_key = get<0>(all_entries->back());

                chunk_success = true; // Proceed to next chunk
                err = 0;
            } catch(const std::exception& ex) {
            LOG(ERROR, "{}() Failed to decompress/parse entries: {}", __func__,
                ex.what());
                LOG(ERROR, "{}() Failed to decompress/parse entries: {}",
                    __func__, ex.what());
                err = EBADMSG;
            }

        break; // Success, so we must break the retry loop.
            break; // Break retry loop
        }

    return make_pair(err, std::move(output_ptr));
        if(!chunk_success) {
            // If we exhausted retries or had fatal error
            return make_pair(err, nullptr);
        }
    }
}


+9 −4
Original line number Diff line number Diff line
@@ -194,7 +194,9 @@ MetadataDB::get_dirents(const std::string& dir) const {
}

std::vector<std::tuple<std::string, bool, size_t, time_t>>
MetadataDB::get_dirents_extended(const std::string& dir) const {
MetadataDB::get_dirents_extended(const std::string& dir,
                                 const std::string& start_key,
                                 size_t max_entries) const {
    auto root_path = dir;
    assert(gkfs::path::is_absolute(root_path));
    // add trailing slash if missing
@@ -203,18 +205,21 @@ MetadataDB::get_dirents_extended(const std::string& dir) const {
        root_path.push_back('/');
    }

    return backend_->get_dirents_extended(root_path);
    return backend_->get_dirents_extended(root_path, start_key, max_entries);
}

std::vector<std::tuple<std::string, bool, size_t, time_t>>
MetadataDB::get_all_dirents_extended(const std::string& dir) const {
MetadataDB::get_all_dirents_extended(const std::string& dir,
                                     const std::string& start_key,
                                     size_t max_entries) const {
    auto root_path = dir;
    assert(gkfs::path::is_absolute(root_path));
    if(!gkfs::path::has_trailing_slash(root_path) && root_path.size() != 1) {
        // add trailing slash only if missing and is not the root_folder "/"
        root_path.push_back('/');
    }
    return backend_->get_all_dirents_extended(root_path);
    return backend_->get_all_dirents_extended(root_path, start_key,
                                              max_entries);
}

/**
+36 −6
Original line number Diff line number Diff line
@@ -391,15 +391,30 @@ RocksDBBackend::get_dirents_impl(const std::string& dir) const {
 *         is true in the case the entry is a directory.
 */
std::vector<std::tuple<std::string, bool, size_t, time_t>>
RocksDBBackend::get_dirents_extended_impl(const std::string& dir) const {
RocksDBBackend::get_dirents_extended_impl(const std::string& dir,
                                          const std::string& start_key,
                                          size_t max_entries) const {
    auto root_path = dir;
    rocksdb::ReadOptions ropts;
    auto it = db_->NewIterator(ropts);

    std::vector<std::tuple<std::string, bool, size_t, time_t>> entries;

    for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
        it->Next()) {
    if(start_key.empty()) {
        it->Seek(root_path);
    } else {
        auto key = root_path + start_key;
        it->Seek(key);
        if(it->Valid() && it->key().ToString() == key) {
            it->Next();
        }
    }

    for(; it->Valid() && it->key().starts_with(root_path); it->Next()) {

        if(max_entries > 0 && entries.size() >= max_entries) {
            break;
        }

        if(it->key().size() == root_path.size()) {
            // we skip this path cause it is exactly the root_path
@@ -442,15 +457,30 @@ RocksDBBackend::get_dirents_extended_impl(const std::string& dir) const {

// Return all the extended entries with root in the path specified
std::vector<std::tuple<std::string, bool, size_t, time_t>>
RocksDBBackend::get_all_dirents_extended_impl(const std::string& dir) const {
RocksDBBackend::get_all_dirents_extended_impl(const std::string& dir,
                                              const std::string& start_key,
                                              size_t max_entries) const {
    auto root_path = dir;
    rocksdb::ReadOptions ropts;
    auto it = db_->NewIterator(ropts);

    std::vector<std::tuple<std::string, bool, size_t, time_t>> entries;

    for(it->Seek(root_path); it->Valid() && it->key().starts_with(root_path);
        it->Next()) {
    if(start_key.empty()) {
        it->Seek(root_path);
    } else {
        auto key = root_path + start_key;
        it->Seek(key);
        if(it->Valid() && it->key().ToString() == key) {
            it->Next();
        }
    }

    for(; it->Valid() && it->key().starts_with(root_path); it->Next()) {

        if(max_entries > 0 && entries.size() >= max_entries) {
            break;
        }

        if(it->key().size() == root_path.size()) {
            // we skip this path cause it is exactly the root_path
+63 −19
Original line number Diff line number Diff line
@@ -810,8 +810,16 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {

    // Get directory entries from local DB
    vector<tuple<string, bool, size_t, time_t>> entries{};
    // Estimate max entries to avoid huge vector
    // Minimum entry size is roughly 40+ bytes (metadata + min name).
    // Using a conservative 200 bytes per entry to balance RPC calls.
    size_t max_entries = client_bulk_size / 200;
    if(max_entries == 0)
        max_entries = 1;

    try {
        entries = gkfs::metadata::get_all_dirents_extended(in.path);
        entries = gkfs::metadata::get_all_dirents_extended(
                in.path, in.start_key, max_entries);
    } catch(const ::exception& e) {
        GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
                                      __func__, e.what());
@@ -826,9 +834,10 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {

    // Serialize data into a vector
    std::vector<char> uncompressed_data;
    // Optimization: Reserve a reasonable starting size to avoid reallocations
    // Assuming avg filename length of 32 + metadata sizes
    uncompressed_data.reserve(entries.size() * 48);
    // Optimization: Reserve a reasonable starting size
    uncompressed_data.reserve(client_bulk_size);

    size_t entries_serialized = 0;

    for(const auto& e : entries) {
        const auto& name = get<0>(e);
@@ -836,8 +845,40 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {
        size_t file_size = get<2>(e);
        time_t ctime = get<3>(e);

        // Append data fields sequentially into the vector. The client will
        // parse in this exact order.
        // Calculate size of this entry
        size_t entry_size = sizeof(bool) + sizeof(size_t) + sizeof(time_t) +
                            name.length() + 1;

        // Check if this entry fits
        if(gkfs::config::rpc::use_dirents_compression) {
            size_t current_uncompressed_size = uncompressed_data.size();
            size_t new_uncompressed_size =
                    current_uncompressed_size + entry_size;
            size_t compressed_bound = ZSTD_compressBound(new_uncompressed_size);
            if(compressed_bound > client_bulk_size) {
                if(entries_serialized == 0) {
                    out.err = ENOBUFS;
                    out.dirents_size = compressed_bound;
                    return gkfs::rpc::cleanup_respond(&handle, &in, &out);
                } else {
                    // Buffer full, stop here
                    break;
                }
            }
        } else {
            if(uncompressed_data.size() + entry_size > client_bulk_size) {
                if(entries_serialized == 0) {
                    out.err = ENOBUFS;
                    out.dirents_size = uncompressed_data.size() + entry_size;
                    return gkfs::rpc::cleanup_respond(&handle, &in, &out);
                } else {
                    // Buffer full, stop here
                    break;
                }
            }
        }

        // Append data fields sequentially into the vector
        const char* bool_p = reinterpret_cast<const char*>(&is_dir);
        uncompressed_data.insert(uncompressed_data.end(), bool_p,
                                 bool_p + sizeof(bool));
@@ -853,6 +894,8 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {
        // Append string and null terminator
        uncompressed_data.insert(uncompressed_data.end(), name.c_str(),
                                 name.c_str() + name.length() + 1);

        entries_serialized++;
    }

    const size_t uncompressed_size = uncompressed_data.size();
@@ -880,11 +923,19 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {
            return gkfs::rpc::cleanup_respond(&handle, &in, &out);
        }

        // Check fits in client buffer
        // Double check fits (should match bound check roughly)
        if(client_bulk_size < compressed_size) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Compressed data ('{}' bytes) does not fit client buffer ('{}' bytes)",
                    "{}() Compressed data ('{}' bytes) does not fit client buffer ('{}' bytes) after check!",
                    __func__, compressed_size, client_bulk_size);
            // Should not happen if bound check logic was correct, but Zstd
            // bound is upper limit. If we really messed up, return ENOBUFS but
            // this implies logic error or edge case. Since we return what we
            // have, we might have been too aggressive. But we used
            // CompressBound in loop check, so it should be fine. However, if
            // entries_serialized > 0, we can't easily backup unless we retry
            // compression with fewer entries. Given bound check, this is
            // unlikely.
            out.err = ENOBUFS;
            out.dirents_size = compressed_size;
            return gkfs::rpc::cleanup_respond(&handle, &in, &out);
@@ -895,25 +946,18 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {

        GKFS_DATA->spdlogger()->trace(
                "{}() Serialized '{}' entries to '{}' bytes, compressed to '{}' bytes.",
                __func__, entries.size(), uncompressed_size, compressed_size);
                __func__, entries_serialized, uncompressed_size,
                compressed_size);

    } else {
        // === Compression Disabled ===
        if(client_bulk_size < uncompressed_size) {
            GKFS_DATA->spdlogger()->error(
                    "{}() Uncompressed data ('{}' bytes) does not fit client buffer ('{}' bytes)",
                    __func__, uncompressed_size, client_bulk_size);
            out.err = ENOBUFS;
            out.dirents_size = uncompressed_size;
            return gkfs::rpc::cleanup_respond(&handle, &in, &out);
        }

        // Size check done in loop
        segment_ptr = uncompressed_data.data();
        transfer_size = uncompressed_size;

        GKFS_DATA->spdlogger()->trace(
                "{}() Serialized '{}' entries to '{}' bytes (Compression disabled).",
                __func__, entries.size(), uncompressed_size);
                __func__, entries_serialized, uncompressed_size);
    }

    // Create a zero-copy bulk handle that wraps our data vector for the push
+7 −4
Original line number Diff line number Diff line
@@ -66,13 +66,16 @@ get_dirents(const std::string& dir) {
}

std::vector<std::tuple<std::string, bool, size_t, time_t>>
get_dirents_extended(const std::string& dir) {
    return GKFS_DATA->mdb()->get_dirents_extended(dir);
get_dirents_extended(const std::string& dir, const std::string& start_key,
                     size_t max_entries) {
    return GKFS_DATA->mdb()->get_dirents_extended(dir, start_key, max_entries);
}

std::vector<std::tuple<std::string, bool, size_t, time_t>>
get_all_dirents_extended(const std::string& dir) {
    return GKFS_DATA->mdb()->get_all_dirents_extended(dir);
get_all_dirents_extended(const std::string& dir, const std::string& start_key,
                         size_t max_entries) {
    return GKFS_DATA->mdb()->get_all_dirents_extended(dir, start_key,
                                                      max_entries);
}

void