Commit 4caffc64 authored by Ramon Nou's avatar Ramon Nou
Browse files

Added new compression config.hpp

parent 818abe25
Loading
Loading
Loading
Loading
+62 −59
Original line number Diff line number Diff line
@@ -70,8 +70,8 @@ struct dirent_extended {

/* Function exported from GekkoFS LD_PRELOAD */
extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended* dirp,
                        unsigned int count, int server) __attribute__((weak));
gkfs_getsingleserverdir(const char* path, struct dirent_extended** dirp,
                        int server) __attribute__((weak));

/* PFIND OPTIONS EXTENDED */
typedef struct {
@@ -356,9 +356,9 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {
}



// Helper to broadcast a C++ string
void bcast_string(string& s, int root) {
void
bcast_string(string& s, int root) {
    int len = 0;
    if(pfind_rank == root) {
        len = s.length();
@@ -369,7 +369,8 @@ void bcast_string(string& s, int root) {
}

// Broadcast all options from rank 0 to other processes
void bcast_options(pfind_options_t* opt) {
void
bcast_options(pfind_options_t* opt) {
    // Broadcast simple POD types
    MPI_Bcast(&opt->just_count, 1, MPI_CXX_BOOL, 0, MPI_COMM_WORLD);
    MPI_Bcast(&opt->size, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD);
@@ -395,62 +396,58 @@ void bcast_options(pfind_options_t* opt) {
void
dirProcess(const string& path, unsigned long long& checked,
           unsigned long long& found, const pfind_options_t* opt) {
    const size_t buffer_size =
            (sizeof(struct dirent_extended) + 255) * 1024 * 10240;

    unique_ptr<char[]> buffer(new char[buffer_size]{});

    // --- PARALLELIZATION LOGIC ---
    // Each process calculates its own range of servers to query.
    int servers_per_proc = opt->num_servers / pfind_size;
    int remainder = opt->num_servers % pfind_size;
    int start_server = pfind_rank * servers_per_proc + min(pfind_rank, remainder);
    int end_server = start_server + servers_per_proc + (pfind_rank < remainder ? 1 : 0);
    int start_server =
            pfind_rank * servers_per_proc + min(pfind_rank, remainder);
    int end_server =
            start_server + servers_per_proc + (pfind_rank < remainder ? 1 : 0);

    if(opt->verbosity > 0) {
        cout << "[Rank " << pfind_rank << "] Processing servers " << start_server 
             << " to " << end_server - 1 << endl;
        cout << "[Rank " << pfind_rank << "] Processing servers "
             << start_server << " to " << end_server - 1 << endl;
    }

    // Each process loops ONLY over its assigned servers
    for(int server = start_server; server < end_server; server++) {
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(),
                reinterpret_cast<struct dirent_extended*>(buffer.get()),
                buffer_size, server);
        struct dirent_extended* entries = nullptr;
        long unsigned int n =
                gkfs_getsingleserverdir(path.c_str(), &entries, server);

        if(n <= 0) { // Handle empty or error cases
            if(entries)
                free(entries);
            continue;
        }

        unsigned long long total_size = 0;
        char* ptr = reinterpret_cast<char*>(entries);
        int bytes_processed = 0;
        while(bytes_processed < (int) n) {
            struct dirent_extended* temp =
                reinterpret_cast<struct dirent_extended*>(buffer.get());
                
        while(total_size < n) {
            // Safety checks to prevent infinite loops on malformed data
            if(strlen(temp->d_name) == 0 || temp->d_reclen == 0) {
                    reinterpret_cast<struct dirent_extended*>(ptr);
            if(temp->d_reclen == 0)
                break;
            }

            // Process files only, skip directories
            if(temp->d_type != 1) {
                /* Find filtering */
                bool timeOK = opt->timestamp_file.empty() || ((uint64_t)temp->ctime >= runtime.ctime_min);
                bool sizeOK = (opt->size == std::numeric_limits<uint64_t>::max() || temp->size == opt->size);
                bool nameOK = opt->name_pattern.empty() || regex_search(temp->d_name, opt->name_regex);

                if (timeOK && sizeOK && nameOK) {
                bool timeOK = opt->timestamp_file.empty() ||
                              ((uint64_t) temp->ctime >= runtime.ctime_min);
                bool sizeOK =
                        (opt->size == std::numeric_limits<uint64_t>::max() ||
                         temp->size == opt->size);
                bool nameOK = opt->name_pattern.empty() ||
                              regex_search(temp->d_name, opt->name_regex);

                if(timeOK && sizeOK && nameOK)
                    found++;
            }
            checked++;
            bytes_processed += temp->d_reclen;
            ptr += temp->d_reclen;
        }

            // Unconditionally advance to the next record
            total_size += temp->d_reclen;
            temp = reinterpret_cast<dirent_extended*>(
                    reinterpret_cast<char*>(temp) + temp->d_reclen);
        }
        free(entries);
    }
}

@@ -465,7 +462,8 @@ process_parallel(const pfind_options_t* opt) {
        if(pfind_rank == 0) {
            struct stat timer_file;
            if(lstat(opt->timestamp_file.c_str(), &timer_file) != 0) {
                cerr << "Could not open: \"" << opt->timestamp_file << "\", error: " << strerror(errno) << endl;
                cerr << "Could not open: \"" << opt->timestamp_file
                     << "\", error: " << strerror(errno) << endl;
                MPI_Abort(MPI_COMM_WORLD, 1);
            }
            runtime.ctime_min = timer_file.st_ctime;
@@ -482,7 +480,8 @@ process_parallel(const pfind_options_t* opt) {
        workdir = "/";
    }

    // Each process calls dirProcess, which will handle its assigned subset of servers
    // Each process calls dirProcess, which will handle its assigned subset of
    // servers
    dirProcess(workdir, local_checked, local_found, opt);


@@ -490,8 +489,10 @@ process_parallel(const pfind_options_t* opt) {
    unsigned long long global_checked = 0;


    MPI_Reduce(&local_found, &global_found, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
    MPI_Reduce(&local_checked, &global_checked, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
    MPI_Reduce(&local_found, &global_found, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM,
               0, MPI_COMM_WORLD);
    MPI_Reduce(&local_checked, &global_checked, 1, MPI_UNSIGNED_LONG_LONG,
               MPI_SUM, 0, MPI_COMM_WORLD);


    if(pfind_rank == 0) {
@@ -529,7 +530,9 @@ main(int argc, char** argv) {

    // Check if the GekkoFS function is available (e.g., via LD_PRELOAD)
    if(gkfs_getsingleserverdir == nullptr) {
        if(pfind_rank == 0) cerr << "Error: GekkoFS functions not available. Is the library preloaded?" << endl;
        if(pfind_rank == 0)
            cerr << "Error: GekkoFS functions not available. Is the library preloaded?"
                 << endl;
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

+4 −2
Original line number Diff line number Diff line
@@ -160,7 +160,7 @@ namespace rpc {
constexpr auto chunksize = 524288; // in bytes (e.g., 524288 == 512KB)
// size of preallocated buffer to hold directory entries in rpc call
constexpr auto dirents_buff_size = (8 * 1024 * 1024);       // 8 mega
constexpr auto dirents_buff_size_proxy = (128 * 1024 * 1024); // 8 mega
constexpr auto dirents_buff_size_proxy = (8 * 1024 * 1024); // 8 mega
/*
 * Indicates the number of concurrent progress to drive I/O operations of chunk
 * files to and from local file systems The value is directly mapped to created
@@ -171,6 +171,8 @@ constexpr auto daemon_io_xstreams = 8;
constexpr auto daemon_handler_xstreams = 4;
// Number of threads used for RPC handlers at the proxy
constexpr auto proxy_handler_xstreams = 3;
// Enable compression for directory entries transfer
constexpr auto use_dirents_compression = false;
} // namespace rpc

namespace rocksdb {
+10 −0
Original line number Diff line number Diff line
@@ -1564,10 +1564,20 @@ gkfs_rmdir(const std::string& path) {
    }
    assert(ret.second);
    auto open_dir = ret.second;
    // LOG all entries
    LOG(DEBUG, "rmdir check - entries in dir '{} {}':", path, open_dir->size());
    for(size_t i = 0; i < open_dir->size(); i++) {
        auto de = open_dir->getdent(i);
        LOG(DEBUG, "rmdir check - name: {} type: {}", de.name(),
            (de.type() == gkfs::filemap::FileType::directory) ? "dir"
                                                              : "regular");
    }
    // only . and .. should be present
    if(open_dir->size() != 2) {
        errno = ENOTEMPTY;
        return -1;
    }

#endif
    if(gkfs::config::proxy::fwd_remove && CTX->use_proxy()) {
        err = gkfs::rpc::forward_remove_proxy(path, true);
+130 −63
Original line number Diff line number Diff line
@@ -664,8 +664,8 @@ forward_get_metadentry_size(const std::string& path, const int copy) {

/**
 * Send an RPC request to receive all entries of a directory.
 * @param open_dir
 * @return error code
 * @param path
 * @return pair<error code, shared_ptr<OpenDir>>
 */
pair<int, shared_ptr<gkfs::filemap::OpenDir>>
forward_get_dirents(const string& path) {
@@ -675,14 +675,14 @@ forward_get_dirents(const string& path) {
            __func__);
    }

    LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
    LOG(DEBUG, "{}() enter for path '{}'", __func__, path);

    auto const targets = CTX->distributor()->locate_directory_metadata();

    /* preallocate receiving buffer. The actual size is not known yet.
     *
     * On C++14 make_unique function also zeroes the newly allocated buffer.
     * It turns out that this operation is increadibly slow for such a big
     * It turns out that this operation is incredibly slow for such a big
     * buffer. Moreover we don't need a zeroed buffer here.
     */
    auto large_buffer = std::unique_ptr<char[]>(
@@ -745,23 +745,78 @@ forward_get_dirents(const string& path) {
    open_dir->add(".", gkfs::filemap::FileType::directory);
    open_dir->add("..", gkfs::filemap::FileType::directory);

    // Helper lambda to deserialize data
    auto deserialize_dirents = [&](void* buffer_ptr, size_t num_entries) {
    /**
     * Helper lambda to deserialize the buffer received from the daemon.
     *
     * @param buffer_ptr Pointer to the buffer containing the data.
     * @param num_entries_or_size If compression is on: Byte size of compressed
     * data. If compression is off: Number of entries (count).
     */
    auto deserialize_dirents = [&](void* buffer_ptr,
                                   size_t num_entries_or_size) {
        if(gkfs::config::rpc::use_dirents_compression) {
            // --- Compressed path (AOS layout) ---
            // In this mode, num_entries_or_size is the BYTE SIZE of the
            // compressed data
            size_t capacity = num_entries_or_size;
            unsigned long long uncompressed_size =
                    ZSTD_getFrameContentSize(buffer_ptr, capacity);
            LOG(DEBUG,
                "{}() Zstd compressed dirents size: {}, uncompressed size: {}",
                __func__, capacity, uncompressed_size);

            if(uncompressed_size == ZSTD_CONTENTSIZE_ERROR ||
               uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
                LOG(ERROR, "{}() Zstd error getting content size", __func__);
                return;
            }

            std::vector<char> decomp(uncompressed_size);
            size_t ret = ZSTD_decompress(decomp.data(), uncompressed_size,
                                         buffer_ptr, capacity);

            if(ZSTD_isError(ret)) {
                LOG(ERROR, "{}() Zstd decompression error: {}", __func__,
                    ZSTD_getErrorName(ret));
                return;
            }

            char* ptr = decomp.data();
            char* end = ptr + uncompressed_size;

            while(ptr < end) {
                // Format: [bool is_dir][null-term string name]
                bool is_dir = *reinterpret_cast<bool*>(ptr);
                ptr += sizeof(bool);

                std::string name(ptr);
                ptr += name.size() + 1; // Advance past name + \0
                LOG(DEBUG, "{}() Retrieved dirent: '{}' is_dir: {}", __func__,
                    name, is_dir);
                open_dir->add(name, is_dir ? gkfs::filemap::FileType::directory
                                           : gkfs::filemap::FileType::regular);
            }
        } else {
            // --- Legacy uncompressed path (SOA layout) ---
            // In this mode, num_entries_or_size is the COUNT of entries
            size_t num_entries = num_entries_or_size;

            bool* bool_ptr = reinterpret_cast<bool*>(buffer_ptr);
            char* names_ptr = reinterpret_cast<char*>(buffer_ptr) +
                              (num_entries * sizeof(bool));

        for(std::size_t j = 0; j < num_entries; j++) {
            for(size_t j = 0; j < num_entries; j++) {
                gkfs::filemap::FileType ftype =
                        (*bool_ptr) ? gkfs::filemap::FileType::directory
                                    : gkfs::filemap::FileType::regular;
                bool_ptr++;

            auto name = std::string(names_ptr);
                std::string name(names_ptr);
                names_ptr += name.size() + 1;

                open_dir->add(name, ftype);
            }
        }
    };

    // wait for RPC responses
@@ -772,29 +827,29 @@ forward_get_dirents(const string& path) {
        try {
            out = handles[i].get().at(0);

            // skip processing if there was an error during send
            // skip processing dirent data if there was an error during send
            if(send_error)
                continue;

            // --- RETRY LOGIC START ---
            // --- Retry Logic for ENOBUFS ---
            if(out.err() == ENOBUFS) {
                // The buffer was too small. The daemon returns the required
                // size in dirents_size (in bytes).
                // size in dirents_size.
                size_t required_size = out.dirents_size();
                LOG(DEBUG,
                    "{}() Buffer too small for host '{}'. Required: {}, Available: {}. Retrying...",
                    __func__, targets[i], required_size, per_host_buff_size);

                // Allocate exact size needed
                auto retry_buffer =
                auto retry_buf =
                        std::unique_ptr<char[]>(new char[required_size]);

                // Expose new buffer
                hermes::exposed_memory retry_exposed_buffer;
                // Expose new buffer for RMA
                hermes::exposed_memory retry_exp;
                try {
                    retry_exposed_buffer = ld_network_service->expose(
                    retry_exp = ld_network_service->expose(
                            std::vector<hermes::mutable_buffer>{
                                    hermes::mutable_buffer{retry_buffer.get(),
                                    hermes::mutable_buffer{retry_buf.get(),
                                                           required_size}},
                            hermes::access_mode::write_only);
                } catch(const std::exception& ex) {
@@ -804,14 +859,14 @@ forward_get_dirents(const string& path) {
                    continue;
                }

                // Resend RPC
                // Resend RPC to the specific host
                auto endp = CTX->hosts().at(targets[i]);
                gkfs::rpc::get_dirents::input in(path, retry_exposed_buffer);
                gkfs::rpc::get_dirents::input retry_in(path, retry_exp);

                try {
                    auto retry_out =
                            ld_network_service
                                    ->post<gkfs::rpc::get_dirents>(endp, in)
                    auto retry_out = ld_network_service
                                             ->post<gkfs::rpc::get_dirents>(
                                                     endp, retry_in)
                                             .get()
                                             .at(0);
                    if(retry_out.err() != 0) {
@@ -821,20 +876,19 @@ forward_get_dirents(const string& path) {
                        continue;
                    }

                    // Success on retry
                    deserialize_dirents(retry_buffer.get(),
                    // Success on retry: deserialize data
                    deserialize_dirents(retry_buf.get(),
                                        retry_out.dirents_size());

                } catch(const std::exception& ex) {
                    LOG(ERROR, "{}() Retry RPC failed for host '{}'. err '{}'",
                        __func__, targets[i], ex.what());
                    err = EBUSY;
                    continue;
                }
                continue; // Done with this host
            }
            // --- RETRY LOGIC END ---

            // Normal error check
            if(out.err() != 0) {
                LOG(ERROR,
                    "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
@@ -850,9 +904,7 @@ forward_get_dirents(const string& path) {
            continue;
        }

        // Standard success path
        // each server wrote information to its pre-defined region in
        // large_buffer
        // Standard success path (Initial buffer was large enough)
        assert(exposed_buffers[i].count() == 1);
        void* base_ptr = exposed_buffers[i].begin()->data();

@@ -861,6 +913,7 @@ forward_get_dirents(const string& path) {
    return make_pair(err, open_dir);
}


// This function takes the RPC output and the received buffer, and returns the
// parsed entries.
std::vector<std::tuple<const std::string, bool, size_t, time_t>>
@@ -874,12 +927,18 @@ decompress_and_parse_entries(const gkfs::rpc::get_dirents_extended::output& out,
        return {}; // No entries, return empty vector
    }

    const char* p = nullptr;
    const char* end = nullptr;
    std::vector<char> decompressed_data;

    if(gkfs::config::rpc::use_dirents_compression) {
        // === STEP 1: Discover the original size from the Zstd frame header ===
        const unsigned long long uncompressed_size =
                ZSTD_getFrameContentSize(compressed_buffer, out.dirents_size());

        if(uncompressed_size == ZSTD_CONTENTSIZE_ERROR) {
        throw std::runtime_error("Received data is not a valid Zstd frame.");
            throw std::runtime_error(
                    "Received data is not a valid Zstd frame.");
        }
        if(uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
            throw std::runtime_error(
@@ -887,23 +946,31 @@ decompress_and_parse_entries(const gkfs::rpc::get_dirents_extended::output& out,
        }

        // === STEP 2: Decompress the data into a new buffer ===
    std::vector<char> decompressed_data(uncompressed_size);
        decompressed_data.resize(uncompressed_size);
        const size_t result_size =
                ZSTD_decompress(decompressed_data.data(), uncompressed_size,
                                compressed_buffer, out.dirents_size());

        if(ZSTD_isError(result_size)) {
        throw std::runtime_error("Zstd decompression failed: " +
            throw std::runtime_error(
                    "Zstd decompression failed: " +
                    std::string(ZSTD_getErrorName(result_size)));
        }
        if(result_size != uncompressed_size) {
            throw std::runtime_error("Decompression size mismatch.");
        }

    // === STEP 3: Parse the decompressed, raw data stream ===
        p = decompressed_data.data();
        end = p + uncompressed_size;
    } else {
        // No compression: Data is raw in the input buffer
        p = static_cast<const char*>(compressed_buffer);
        end = p + out.dirents_size();
    }

    // === STEP 3: Parse the data stream ===
    // AOS Layout: [bool is_dir][size_t size][time_t ctime][string name\0]
    std::vector<std::tuple<const std::string, bool, size_t, time_t>> entries;
    const char* p = decompressed_data.data();
    const char* end = p + decompressed_data.size();

    while(p < end) {
        // Read is_dir
+41 −28
Original line number Diff line number Diff line
@@ -216,12 +216,18 @@ decompress_and_parse_entries(
        return {}; // No entries, return empty vector
    }

    const char* p = nullptr;
    const char* end = nullptr;
    std::vector<char> decompressed_data;

    if(gkfs::config::rpc::use_dirents_compression) {
        // === STEP 1: Discover the original size from the Zstd frame header ===
        const unsigned long long uncompressed_size =
                ZSTD_getFrameContentSize(compressed_buffer, out.dirents_size());

        if(uncompressed_size == ZSTD_CONTENTSIZE_ERROR) {
        throw std::runtime_error("Received data is not a valid Zstd frame.");
            throw std::runtime_error(
                    "Received data is not a valid Zstd frame.");
        }
        if(uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
            throw std::runtime_error(
@@ -229,23 +235,31 @@ decompress_and_parse_entries(
        }

        // === STEP 2: Decompress the data into a new buffer ===
    std::vector<char> decompressed_data(uncompressed_size);
        decompressed_data.resize(uncompressed_size);
        const size_t result_size =
                ZSTD_decompress(decompressed_data.data(), uncompressed_size,
                                compressed_buffer, out.dirents_size());

        if(ZSTD_isError(result_size)) {
        throw std::runtime_error("Zstd decompression failed: " +
            throw std::runtime_error(
                    "Zstd decompression failed: " +
                    std::string(ZSTD_getErrorName(result_size)));
        }
        if(result_size != uncompressed_size) {
            throw std::runtime_error("Decompression size mismatch.");
        }

    // === STEP 3: Parse the decompressed, raw data stream ===
        p = decompressed_data.data();
        end = p + uncompressed_size;
    } else {
        // No compression: Data is raw in the input buffer
        p = static_cast<const char*>(compressed_buffer);
        end = p + out.dirents_size();
    }

    // === STEP 3: Parse the data stream ===
    // AOS Layout: [bool is_dir][size_t size][time_t ctime][string name\0]
    std::vector<std::tuple<const std::string, bool, size_t, time_t>> entries;
    const char* p = decompressed_data.data();
    const char* end = p + decompressed_data.size();

    while(p < end) {
        // Read is_dir
@@ -270,7 +284,6 @@ decompress_and_parse_entries(
    return entries;
}


pair<int, unique_ptr<vector<tuple<const std::string, bool, size_t, time_t>>>>
forward_get_dirents_single_proxy(const string& path, int server) {
    LOG(DEBUG, "{}() enter for path '{}', server '{}'", __func__, path, server);
@@ -334,9 +347,9 @@ forward_get_dirents_single_proxy(const string& path, int server) {
        }

        // --- SUCCESS ---
        LOG(DEBUG, "{}() RPC to proxy successful. Decompressing data.",
            __func__);
        LOG(DEBUG, "{}() RPC to proxy successful. Processing data.", __func__);
        try {
            // decompress_and_parse_entries handles the config toggle internally
            auto entries_vector =
                    decompress_and_parse_entries(out, large_buffer.get());
            output_ptr = make_unique<
@@ -344,7 +357,7 @@ forward_get_dirents_single_proxy(const string& path, int server) {
                    std::move(entries_vector));
            err = 0;
        } catch(const std::exception& ex) {
            LOG(ERROR, "{}() Failed to decompress/parse entries from proxy: {}",
            LOG(ERROR, "{}() Failed to process entries from proxy: {}",
                __func__, ex.what());
            err = EBADMSG;
        }
Loading