Commit 79551419 authored by Ramon Nou's avatar Ramon Nou
Browse files

sfind with server filtering

parent 0d3251dd
Loading
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -23,8 +23,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
    - Enable inline data for small files (`LIBGKFS_USE_INLINE_DATA`).
    - Create write optimization (`LIBGKFS_CREATE_WRITE_OPTIMIZATION`).
    - Read inline prefetch (`LIBGKFS_READ_INLINE_PREFETCH`).
    - Dirents compression (`LIBGKFS_USE_DIRENTS_COMPRESSION`).
    - Dirents compression (`LIBGKFS_USE_DIRENTS_COMPRESSION and GKFS_DAEMON_USE_DIRENTS_COMPRESSION`). 
    - Dirents buffer size control (`LIBGKFS_DIRENTS_BUFF_SIZE`).
    - New sfind filtering in the server side
  - Added new tests (and enabling failing ones) to increase coverage


+1 −1
Original line number Diff line number Diff line
@@ -634,7 +634,7 @@ Using two environment variables
- `GKFS_DAEMON_LOG_LEVEL` - Log level of the daemon. Available levels are: `off`, `critical`, `err`, `warn`, `info`, `debug`, `trace`.
#### Optimization
- `GKFS_DAEMON_USE_INLINE_DATA` - Enable inline data storage (default: ON).
- `GKFS_USE_DIRENTS_COMPRESSION` - Enable compression for directory entries (default: OFF).
- `GKFS_DAEMON_USE_DIRENTS_COMPRESSION` - Enable compression for directory entries (default: OFF).
### Proxy
#### Logging
- `GKFS_PROXY_LOG_PATH` - Path to the log file of the proxy.
+47 −23
Original line number Diff line number Diff line
@@ -36,38 +36,62 @@ GKFS_FIND=~/ADMIRE/iodeps/bin/sfind

srun -N $NUM_NODES -n $GKFS_FIND_PROCESS --overlap --overcommit --mem=0 --oversubscribe --export=ALL,LD_PRELOAD=${GKFS} $GKFS_FIND $@ -M $GKFS_MNT -S $GKFS_SERVERS

#!/bin/bash
# scripts/aggregate_sfind_results.sh
# Robustly aggregates gfind_results.rank-*.txt files

# Initialize total counters
total_found=0
total_checked=0

# Check if any result files exist
if ! ls gfind_results.rank-*.txt 1> /dev/null 2>&1; then
# Enable nullglob so *.txt doesn't return literal string if no files match
shopt -s nullglob
files=(gfind_results.rank-*.txt)

if [ ${#files[@]} -eq 0 ]; then
    echo "No result files found (gfind_results.rank-*.txt)."
    exit 1
fi

# Loop through all result files
for file in gfind_results.rank-*.txt; do
    # Read the line "MATCHED found/checked" from the file
    # and extract the numbers.
    read -r _ found_str checked_str < "$file"
echo "Found ${#files[@]} result files. Aggregating..."

for file in "${files[@]}"; do
    if [ ! -s "$file" ]; then
         echo "Warning: File $file is empty or missing. Skipping."
         continue
    fi

    # Read the line. Using -r to prevent backslash interpretation.
    if read -r line < "$file"; then
        # Expected format: MATCHED <found>/<checked>
        # Example: MATCHED 123/4567
        
        # Remove prefix "MATCHED " if present
        if [[ "$line" == MATCHED* ]]; then
            val_str="${line#MATCHED }"
        else
            # Try to handle cases where MATCHED might be missing or different
            val_str="$line" 
        fi
        
    # Use cut to handle the "found/checked" format
    found=$(echo "$found_str" | cut -d'/' -f1)
    checked=$(echo "$checked_str") # this will be the same as found_str's second part
        # Split by '/'
        # found is everything before /
        found="${val_str%%/*}"
        # checked is everything after /
        checked="${val_str##*/}"
        
    # Bash arithmetic to add to totals
        # Validate that they are numbers
        if [[ "$found" =~ ^[0-9]+$ ]] && [[ "$checked" =~ ^[0-9]+$ ]]; then
            total_found=$((total_found + found))
            total_checked=$((total_checked + checked))
        else
            echo "Error: Invalid number format in $file: '$line' -> found='$found' checked='$checked'"
            # If set -e is active in parent script, we might want to exit? 
            # Or just warn and continue. Warn is safer for now.
        fi
    else
        echo "Warning: Could not read line from $file"
    fi
done

# Print the final aggregated result
echo "MATCHED ${total_found}/${total_checked}"

# Optional: Clean up the intermediate files
# Uncomment the line below if you want to automatically remove the partial results
rm gfind_results.rank-*.txt
exit 0

+147 −36
Original line number Diff line number Diff line
@@ -38,11 +38,19 @@ extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended** dirp,
                        int server) __attribute__((weak));

extern "C" ssize_t
gkfs_getsingleserverdir_filtered(const char* path,
                                 struct dirent_extended** dirp, int server,
                                 const char* start_key, const char* filter_name,
                                 int64_t filter_size, int64_t filter_ctime,
                                 char** last_key_out, uint64_t* total_checked_out) __attribute__((weak));

/* PFIND OPTIONS EXTENDED */
typedef struct {
    string workdir;
    bool just_count = false;
    bool print_by_process = false;
    bool server_side = false;

    int stonewall_timer = 0;
    bool print_rates = false;
@@ -78,7 +86,7 @@ static void
pfind_print_help(const pfind_options_t* res) {
    printf("pfind \nSynopsis:\n"
           "pfind <workdir> [-newer <timestamp file>] [-size <size>c] [-name "
           "<substr>] [-regex <regex>] [-S <numserver>] [-M <mountdir>]\n"
           "<substr>] [-regex <regex>] [-S <numserver>] [-M <mountdir>] [-C] [-P] [--server-side]\n"
           "\tworkdir = \"%s\"\n"
           "\t-newer = \"%s\"\n"
           "\t-name|-regex = \"%s\"\n"
@@ -86,7 +94,10 @@ pfind_print_help(const pfind_options_t* res) {
           "\t-M: mountdir = \"%s\"\n"
           "Optional flags\n"
           "\t-h: prints the help\n"
           "\t--help: prints the help without initializing MPI\n",
           "\t--help: prints the help without initializing MPI\n"
           "\t-C: just count\n"
           "\t-P: print by process\n"
           "\t--server-side: enable server-side filtering\n",
           res->workdir.c_str(), res->timestamp_file.c_str(),
           res->name_pattern.c_str(), res->num_servers, res->mountdir.c_str());
}
@@ -157,6 +168,9 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "--server-side") == 0) {
            res->server_side = true;
            modified_argv[i][0] = 0;
        } else if(res->workdir.empty() && argv[i][0] != '-') {
            res->workdir = argv[i];
            modified_argv[i][0] = 0;
@@ -216,6 +230,7 @@ worker_routine(void* arg) {
    ThreadData* data = static_cast<ThreadData*>(arg);
    unsigned long long local_found = 0;
    unsigned long long local_checked = 0;
    const size_t max_retries = 3;

    while(true) {
        int server_id = -1;
@@ -229,6 +244,99 @@ worker_routine(void* arg) {
        if(server_id == -1)
            break;

        // Check if we should use server-side filtering
        bool use_server_filter = data->opt->server_side;

        if(use_server_filter && !gkfs_getsingleserverdir_filtered) {
             cerr << "Warning: --server-side requested but server-side filtering function is not available. Falling back to client-side." << endl;
             use_server_filter = false;
        }

        if(use_server_filter) {
            char* last_key = nullptr;
            while(true) {
                struct dirent_extended* entries = nullptr;
                char* new_last_key = nullptr;
                int64_t f_size = (data->opt->size ==
                                  std::numeric_limits<uint64_t>::max())
                                         ? -1
                                         : (int64_t) data->opt->size;
                int64_t f_ctime = (runtime.ctime_min == 0)
                                          ? -1
                                          : (int64_t) runtime.ctime_min;

                ssize_t n = -1;
                uint64_t server_checked_count = 0;

                for(size_t i = 0; i < max_retries; ++i) {
                    n = gkfs_getsingleserverdir_filtered(
                            data->workdir->c_str(), &entries, server_id,
                            last_key ? last_key : "",
                            data->opt->name_pattern.c_str(), f_size, f_ctime,
                            &new_last_key, &server_checked_count);
                    if(n >= 0)
                        break;
                    // simple retry delay could be added here
                }
                


                if(n < 0) {
                    cerr << "Warning: Rank " << sfind_rank << " Thread "
                         << data->thread_id << " received error from server "
                         << server_id << " during filtered scan." << endl;
                    if(entries)
                        free(entries);
                    if(last_key)
                        free(last_key);
                    if(new_last_key)
                        free(new_last_key);
                    break; // Skip this server
                }

                local_checked += server_checked_count;

                if(n > 0 && entries) {
                    char* ptr = reinterpret_cast<char*>(entries);
                    int bytes_processed = 0;
                    while(bytes_processed < n) {
                        struct dirent_extended* temp =
                                reinterpret_cast<struct dirent_extended*>(ptr);
                        if(temp->d_reclen == 0)
                            break;

                        local_found++;

                        // Print entry if not just counting
                        if(!data->opt->just_count) {
                            string p_path = (data->workdir->back() == '/') ? *data->workdir : *data->workdir + "/";
                            p_path += temp->d_name;
                            cout << data->opt->mountdir << p_path << endl;
                        }

                        bytes_processed += temp->d_reclen;
                        ptr += temp->d_reclen;
                    }
                    free(entries);
                } else {
                    if(entries)
                        free(entries);
                }

                if(last_key)
                    free(last_key);
                last_key = new_last_key;

                // If last_key is empty, we are done
                if(last_key == nullptr || last_key[0] == '\0') {
                    if(last_key)
                        free(last_key);
                    break;
                }
            }

        } else {
            // Fallback to client-side filtering (get all)
            struct dirent_extended* entries = nullptr;
            long n = gkfs_getsingleserverdir(data->workdir->c_str(), &entries,
                                             server_id);
@@ -255,10 +363,12 @@ worker_routine(void* arg) {
                if(temp->d_type != 1) {
                    bool timeOK = data->opt->timestamp_file.empty() ||
                                  ((uint64_t) temp->ctime >= runtime.ctime_min);
                bool sizeOK = (data->opt->size ==
                    bool sizeOK =
                            (data->opt->size ==
                                     std::numeric_limits<uint64_t>::max() ||
                             temp->size == data->opt->size);
                bool nameOK = data->opt->name_pattern.empty() ||
                    bool nameOK =
                            data->opt->name_pattern.empty() ||
                            regex_search(temp->d_name, data->opt->name_regex);

                    if(timeOK && sizeOK && nameOK)
@@ -270,6 +380,7 @@ worker_routine(void* arg) {
            }
            free(entries);
        }
    }

    // Atomically add local results to the global counters
    data->total_found->fetch_add(local_found);
+2 −2
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ target_sources(
    preload_context.hpp
    preload_util.hpp
    cache.hpp
    rpc/rpc_types.hpp

    rpc/forward_metadata.hpp
    rpc/forward_data.hpp
    syscalls/args.hpp
@@ -70,7 +70,7 @@ target_sources(
    preload_context.hpp
    preload_util.hpp
    cache.hpp
    rpc/rpc_types.hpp

    rpc/forward_metadata.hpp
    rpc/forward_data.hpp
    rpc/forward_malleability.hpp
Loading