Commit 30c0f5a0 authored by Ramon Nou's avatar Ramon Nou
Browse files

adding compression to extended directory

parent e4e51f35
Loading
Loading
Loading
Loading
+38 −21
Original line number Diff line number Diff line
@@ -26,32 +26,49 @@
# SPDX-License-Identifier: GPL-3.0-or-later                                    #
################################################################################

#
# - Try to find Facebook zstd library
# This will define
# ZStd_FOUND
# ZStd_INCLUDE_DIR
# ZStd_LIBRARIES
#


# Standard names to search for
set(ZStd_NAMES zstd zstd_static)

find_path(ZStd_INCLUDE_DIR
          NAMES zstd.h
)
          PATH_SUFFIXES include)

find_library(ZStd_LIBRARY
    NAMES zstd
)
# Allow ZStd_LIBRARY to be set manually, as the location of the zstd library
if(NOT ZStd_LIBRARY)
  find_library(ZStd_LIBRARY_RELEASE
               NAMES ${ZStd_NAMES}
               PATH_SUFFIXES lib)
  
set(ZStd_LIBRARIES ${ZStd_LIBRARY})
set(ZStd_INCLUDE_DIRS ${ZStd_INCLUDE_DIR})

  include(SelectLibraryConfigurations)
  select_library_configurations(ZStd)
endif()

unset(ZStd_NAMES)

mark_as_advanced(ZStd_INCLUDE_DIR)

include(FindPackageHandleStandardArgs)
FIND_PACKAGE_HANDLE_STANDARD_ARGS(ZStd
        REQUIRED_VARS ZStd_LIBRARY ZStd_INCLUDE_DIR
        VERSION_VAR ZStd_VERSION_STRING)

if(ZStd_FOUND)
    set(ZStd_INCLUDE_DIRS ${ZStd_INCLUDE_DIR})

    if(NOT ZStd_LIBRARIES)
        set(ZStd_LIBRARIES ${ZStd_LIBRARY})
    endif()

    if(NOT TARGET ZStd::ZStd)
        add_library(ZStd::ZStd UNKNOWN IMPORTED)
        set_target_properties(ZStd::ZStd PROPERTIES
                INTERFACE_INCLUDE_DIRECTORIES "${ZStd_INCLUDE_DIRS}")
       
find_package_handle_standard_args(ZStd
    DEFAULT_MSG ZStd_LIBRARY ZStd_INCLUDE_DIR
)
        set_target_properties(ZStd::ZStd PROPERTIES
                IMPORTED_LOCATION "${ZStd_LIBRARY}")

mark_as_advanced(
    ZStd_LIBRARY
    ZStd_INCLUDE_DIR
)
 No newline at end of file
    endif()
endif()
+3 −0
Original line number Diff line number Diff line
@@ -161,6 +161,9 @@ find_package(Margo 0.14.0 REQUIRED)
message(STATUS "[${PROJECT_NAME}] Checking for syscall_intercept")
find_package(Syscall_intercept REQUIRED)

message(STATUS "[${PROJECT_NAME}] Checking for Zstd")
find_package(ZStd REQUIRED)

### AGIOS: required for scheduling I/O requests
if (GKFS_ENABLE_AGIOS)
    message(STATUS "[${PROJECT_NAME}] Checking for Agios")
+1 −0
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
set (CMAKE_CXX_STANDARD 17)

add_executable(sfind sfind.cpp)
target_link_libraries(sfind PRIVATE ZStd::ZStd)
set_property(TARGET sfind PROPERTY POSITION_INDEPENDENT_CODE ON)
if(GKFS_INSTALL_TESTS)
    install(TARGETS sfind
+137 −64
Original line number Diff line number Diff line
@@ -21,7 +21,8 @@

using namespace std;

// (struct dirent_extended, pfind_options_t, pfind_runtime_options_t are the same)
// (struct dirent_extended, pfind_options_t, pfind_runtime_options_t are the
// same)
#pragma region structs
/* Minimal struct needed for io500 find */
struct dirent_extended {
@@ -34,8 +35,8 @@ struct dirent_extended {

/* Function exported from GekkoFS LD_PRELOAD */
extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended* dirp,
                        unsigned int count, int server) __attribute__((weak));
gkfs_getsingleserverdir(const char* path, struct dirent_extended** dirp,
                       int server) __attribute__((weak));

/* PFIND OPTIONS EXTENDED */
typedef struct {
@@ -97,52 +98,105 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {

    for(int i = 1; i < argc; ++i) {
        if(strcmp(argv[i], "-newer") == 0 && i + 1 < argc) {
            res->timestamp_file = argv[i + 1]; modified_argv[i][0] = 0; modified_argv[i + 1][0] = 0; ++i;
            res->timestamp_file = argv[i + 1];
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-size") == 0 && i + 1 < argc) {
            char* str = argv[i + 1]; char extension = str[strlen(str) - 1]; str[strlen(str) - 1] = 0;
            try { res->size = stoull(str); } catch(...) { pfind_abort("Invalid size: " + string(str)); }
            if (extension != 'c') pfind_abort("Unsupported extension for -size, only 'c' is supported");
            modified_argv[i][0] = 0; modified_argv[i + 1][0] = 0; ++i;
        } else if((strcmp(argv[i], "-name") == 0 || strcmp(argv[i], "-regex") == 0) && i + 1 < argc) {
            char* str = argv[i + 1];
            char extension = str[strlen(str) - 1];
            str[strlen(str) - 1] = 0;
            try {
                res->size = stoull(str);
            } catch(...) {
                pfind_abort("Invalid size: " + string(str));
            }
            if(extension != 'c')
                pfind_abort(
                        "Unsupported extension for -size, only 'c' is supported");
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if((strcmp(argv[i], "-name") == 0 ||
                   strcmp(argv[i], "-regex") == 0) &&
                  i + 1 < argc) {
            bool is_name = (strcmp(argv[i], "-name") == 0);
            string pattern = argv[i + 1];
            if(is_name) {
                res->name_pattern.clear();
                for(char c : pattern) {
                    if(c == '*') res->name_pattern += ".*";
                    else if(c == '.') res->name_pattern += "[.]";
                    else if(c != '"' && c != '\'') res->name_pattern += c;
                    if(c == '*')
                        res->name_pattern += ".*";
                    else if(c == '.')
                        res->name_pattern += "[.]";
                    else if(c != '"' && c != '\'')
                        res->name_pattern += c;
                }
            } else {
                res->name_pattern = pattern;
            }
            try { res->name_regex = regex(res->name_pattern); } catch(const regex_error& e) { pfind_abort("Invalid regex: " + string(e.what())); }
            modified_argv[i][0] = 0; modified_argv[i + 1][0] = 0; ++i;
            try {
                res->name_regex = regex(res->name_pattern);
            } catch(const regex_error& e) {
                pfind_abort("Invalid regex: " + string(e.what()));
            }
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-M") == 0 && i + 1 < argc) {
            res->mountdir = argv[i + 1]; modified_argv[i][0] = 0; modified_argv[i + 1][0] = 0; ++i;
            res->mountdir = argv[i + 1];
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-S") == 0 && i + 1 < argc) {
            try { res->num_servers = stoi(argv[i + 1]); } catch(...) { pfind_abort("Invalid server count: " + string(argv[i + 1])); }
            modified_argv[i][0] = 0; modified_argv[i + 1][0] = 0; ++i;
            try {
                res->num_servers = stoi(argv[i + 1]);
            } catch(...) {
                pfind_abort("Invalid server count: " + string(argv[i + 1]));
            }
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(res->workdir.empty() && argv[i][0] != '-') {
            res->workdir = argv[i]; modified_argv[i][0] = 0;
            res->workdir = argv[i];
            modified_argv[i][0] = 0;
        }
    }
    if(argc == 2 && res->workdir.empty()) { res->workdir = argv[1]; }
    if(argc == 2 && res->workdir.empty()) {
        res->workdir = argv[1];
    }
    const char* optstring = "CPs:r:vhD:xq:H:NM:S:";
    int c; optind = 1;
    int c;
    optind = 1;
    while((c = getopt(argc, modified_argv.data(), optstring)) != -1) {
        switch(c) {
            case 'h': print_help = true; break;
            case 'P': res->print_by_process = true; break;
            case 'C': res->just_count = true; break;
            case '?': exit(1);
            default: break;
            case 'h':
                print_help = true;
                break;
            case 'P':
                res->print_by_process = true;
                break;
            case 'C':
                res->just_count = true;
                break;
            case '?':
                exit(1);
            default:
                break;
        }
    }
    if (pfind_rank == 0 && print_help) { pfind_print_help(res); }
    if (print_help) { exit(0); }
    if(res->workdir.empty()) { pfind_abort("pfind <directory> is required"); }
    if(res->num_servers == 0) { pfind_abort("-S <num_servers> is required"); }
    if(pfind_rank == 0 && print_help) {
        pfind_print_help(res);
    }
    if(print_help) {
        exit(0);
    }
    if(res->workdir.empty()) {
        pfind_abort("pfind <directory> is required");
    }
    if(res->num_servers == 0) {
        pfind_abort("-S <num_servers> is required");
    }
    return res;
}
#pragma endregion
@@ -150,43 +204,55 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {
void
dirProcess(const string& path, unsigned long long& checked,
           unsigned long long& found, const pfind_options_t* opt) {
    const size_t buffer_size =
            (sizeof(struct dirent_extended) + 255) * 1024 * 10240;
    unique_ptr<char[]> buffer(new char[buffer_size]{});
    struct dirent_extended* entries = nullptr;

    // --- PARALLELIZATION LOGIC ---
    // Each process calculates its own range of servers to query.
    int servers_per_proc = opt->num_servers / pfind_size;
    int remainder = opt->num_servers % pfind_size;
    int start_server = pfind_rank * servers_per_proc + min(pfind_rank, remainder);
    int end_server = start_server + servers_per_proc + (pfind_rank < remainder ? 1 : 0);
    int start_server =
            pfind_rank * servers_per_proc + min(pfind_rank, remainder);
    int end_server =
            start_server + servers_per_proc + (pfind_rank < remainder ? 1 : 0);

    // Each process loops ONLY over its assigned servers
    for(int server = start_server; server < end_server; server++) {
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(),
                reinterpret_cast<struct dirent_extended*>(buffer.get()),
                buffer_size, server);
               &entries, server);

        if (n <= 0) continue;
        if(n <= 0)
            continue;

        unsigned long long total_size = 0;
        struct dirent_extended* temp = reinterpret_cast<struct dirent_extended*>(buffer.get());
        // --- Success! We can now iterate through the results ---
    char* ptr = reinterpret_cast<char*>(entries);
    int bytes_processed = 0;

        while(total_size < n) {
            if(strlen(temp->d_name) == 0 || temp->d_reclen == 0) break;

      
        while(bytes_processed < n) {
            struct dirent_extended* temp = reinterpret_cast<struct dirent_extended*>(ptr);
            if(strlen(temp->d_name) == 0 || temp->d_reclen == 0)
                break;
            if(temp->d_type != 1) {
                bool timeOK = opt->timestamp_file.empty() || ((uint64_t)temp->ctime >= runtime.ctime_min);
                bool sizeOK = (opt->size == std::numeric_limits<uint64_t>::max() || temp->size == opt->size);
                bool nameOK = opt->name_pattern.empty() || regex_search(temp->d_name, opt->name_regex);
                bool timeOK = opt->timestamp_file.empty() ||
                              ((uint64_t) temp->ctime >= runtime.ctime_min);
                bool sizeOK =
                        (opt->size == std::numeric_limits<uint64_t>::max() ||
                         temp->size == opt->size);
                bool nameOK = opt->name_pattern.empty() ||
                              regex_search(temp->d_name, opt->name_regex);

                if (timeOK && sizeOK && nameOK) found++;
                if(timeOK && sizeOK && nameOK)
                    found++;
                checked++;
            }
            total_size += temp->d_reclen;
            temp = reinterpret_cast<dirent_extended*>(reinterpret_cast<char*>(temp) + temp->d_reclen);
            bytes_processed += temp->d_reclen;
            ptr += temp->d_reclen;
       
        }
    }
    free(entries);
}

int
@@ -198,7 +264,8 @@ process_sequential_parallel(const pfind_options_t* opt) {
    if(!opt->timestamp_file.empty()) {
        struct stat timer_file;
        if(lstat(opt->timestamp_file.c_str(), &timer_file) != 0) {
            pfind_abort("Could not open timestamp file: " + opt->timestamp_file);
            pfind_abort("Could not open timestamp file: " +
                        opt->timestamp_file);
        }
        runtime.ctime_min = timer_file.st_ctime;
    }
@@ -207,12 +274,14 @@ process_sequential_parallel(const pfind_options_t* opt) {
    if(workdir.rfind(opt->mountdir, 0) == 0) {
        workdir = workdir.substr(opt->mountdir.length());
    }
    if(workdir.empty()) workdir = "/";
    if(workdir.empty())
        workdir = "/";

    dirProcess(workdir, local_checked, local_found, opt);

    // --- WRITE LOCAL RESULTS TO A UNIQUE FILE ---
    string output_filename = "gfind_results.rank-" + to_string(pfind_rank) + ".txt";
    string output_filename =
            "gfind_results.rank-" + to_string(pfind_rank) + ".txt";
    ofstream output_file(output_filename);
    if(!output_file.is_open()) {
        pfind_abort("Failed to open output file: " + output_filename);
@@ -220,7 +289,8 @@ process_sequential_parallel(const pfind_options_t* opt) {
    output_file << "MATCHED " << local_found << "/" << local_checked << endl;
    output_file.close();

    cout << "[Rank " << pfind_rank << "] Finished. Wrote results to " << output_filename << endl;
    cout << "[Rank " << pfind_rank << "] Finished. Wrote results to "
         << output_filename << endl;
    cout << "MATCHED " << local_found << "/" << local_checked << endl;
    return 0;
}
@@ -236,14 +306,16 @@ main(int argc, char** argv) {
            pfind_rank = stoi(env_rank);
            pfind_size = stoi(env_size);
        } catch(const std::exception& e) {
            cerr << "Could not parse SLURM environment variables: " << e.what() << endl;
            cerr << "Could not parse SLURM environment variables: " << e.what()
                 << endl;
            // Fallback to sequential
            pfind_rank = 0;
            pfind_size = 1;
        }
    } else {
        // Fallback for running outside of srun
        cout << "SLURM variables not found. Running in sequential mode (rank 0 of 1)." << endl;
        cout << "SLURM variables not found. Running in sequential mode (rank 0 of 1)."
             << endl;
        pfind_rank = 0;
        pfind_size = 1;
    }
@@ -253,7 +325,8 @@ main(int argc, char** argv) {

    // Check if GekkoFS function is available
    if(gkfs_getsingleserverdir == nullptr) {
        pfind_abort("GekkoFS functions not available. Is the library preloaded?");
        pfind_abort(
                "GekkoFS functions not available. Is the library preloaded?");
    }

    int result = process_sequential_parallel(opt);
+2 −2
Original line number Diff line number Diff line
@@ -198,7 +198,7 @@ gkfs_msync(void* addr, size_t length, int flags);

// gkfs_getsingleserverdir is using extern "C" to demangle it for C usage
extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended* dirp,
                        unsigned int count, int server);
gkfs_getsingleserverdir(const char* path, struct dirent_extended** dirp,
                        int server);

#endif // GEKKOFS_GKFS_FUNCTIONS_HPP
Loading