Commit 3d7335cb authored by Ramon Nou's avatar Ramon Nou
Browse files

new sfind approach

parent 1167c20e
Loading
Loading
Loading
Loading
Loading
+144 −226
Original line number Diff line number Diff line
@@ -36,9 +36,6 @@
  SPDX-License-Identifier: GPL-3.0-or-later
*/

/* Based on pfind from ior500 */
/* https://github.com/VI4IO/pfind/ */

#include <algorithm>
#include <cerrno>
#include <climits>
@@ -58,12 +55,11 @@
#include <sys/types.h>
#include <unistd.h>
#include <vector>
#include <mpi.h>
#include <cmath>
#include <mpi.h> // Include OpenMPI header

using namespace std;

/* Minimal struct needed for io500 find */
/* We could also do the filtering on the server */
struct dirent_extended {
    size_t size;
    time_t ctime;
@@ -72,14 +68,12 @@ struct dirent_extended {
    char d_name[1];
};

/* Function exported from GekkoFS LD_PRELOAD, code needs to be compiled with
 * -fPIC, if not will segfault */
/* Function exported from GekkoFS LD_PRELOAD */
extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended* dirp,
                        unsigned int count, int server) __attribute__((weak));

/* PFIND OPTIONS EXTENDED We need to add the GekkoFS mount dir and the number of
 * servers */
/* PFIND OPTIONS EXTENDED */
typedef struct {
    string workdir;
    bool just_count = false;
@@ -108,22 +102,21 @@ typedef struct {

typedef struct {
    uint64_t ctime_min = 0;
    double stonewall_endtime = 0.0;
    FILE* logfile = nullptr;
    bool needs_stat = false;
} pfind_runtime_options_t;

static pfind_runtime_options_t runtime;

int pfind_rank = 0;
int pfind_size = 1;

static pfind_options_t* opt;

[[noreturn]] void
pfind_abort(const string& str) {
    cerr << str << endl;
    exit(1);
    if (pfind_rank == 0) {
        cerr << "ERROR: " << str << endl;
    }
    // Use MPI_Abort for a clean shutdown in an MPI environment
    MPI_Abort(MPI_COMM_WORLD, 1);
    exit(1); // MPI_Abort should terminate, but exit is a fallback
}


@@ -144,13 +137,12 @@ pfind_print_help(const pfind_options_t* res) {
           res->name_pattern.c_str(), res->num_servers, res->mountdir.c_str());
}

MPI_Comm pfind_com;

pfind_options_t*
pfind_parse_args(int argc, char** argv, int force_print_help, MPI_Comm com) {
    MPI_Comm_rank(com, &pfind_rank);
    MPI_Comm_size(com, &pfind_size);
    pfind_com = com;
pfind_parse_args(int argc, char** argv, bool force_print_help) {

    pfind_rank = 0;
    pfind_size = 1;

    auto res = new pfind_options_t();

@@ -353,11 +345,6 @@ pfind_parse_args(int argc, char** argv, int force_print_help, MPI_Comm com) {
    if(print_help) {
        if(pfind_rank == 0)
            pfind_print_help(res);
        int init;
        MPI_Initialized(&init);
        if(init) {
            MPI_Finalize();
        }
        exit(0);
    }

@@ -368,93 +355,99 @@ pfind_parse_args(int argc, char** argv, int force_print_help, MPI_Comm com) {
    return res;
}

/* Master send a new path to the workers */
void
send_newPath(string path) {
    auto count = path.size() + 1;
    MPI_Bcast(&count, 1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Bcast((void*) path.c_str(), count, MPI_CHAR, 0, MPI_COMM_WORLD);
}

/* Clients get a new path, getting a "0" size char means there is no new path*/
string
recv_newPath() {
    int count;
    MPI_Bcast(&count, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if(count == 0)
        return "Terminate";
    std::vector<char> buf(count);
    MPI_Bcast(buf.data(), count, MPI_CHAR, 0, MPI_COMM_WORLD);
    return std::string(buf.begin(), buf.end());
}

/* Client Processing a path.
 * We increment local checked/found based on the filters
 * Each client sends the request to a subset of GekkoFS servers.
 * We use 102400 (plus space from 255 chars paths) so it is nearly 1M files per
 * server, which is enough for most cases
 *
 */


// Helper to broadcast a C++ string
void bcast_string(string& s, int root) {
    int len = 0;
    if (pfind_rank == root) {
        len = s.length();
    }
    MPI_Bcast(&len, 1, MPI_INT, root, MPI_COMM_WORLD);
    s.resize(len);
    MPI_Bcast(&s[0], len, MPI_CHAR, root, MPI_COMM_WORLD);
}

// Broadcast all options from rank 0 to other processes
void bcast_options(pfind_options_t* opt) {
    // Broadcast simple POD types
    MPI_Bcast(&opt->just_count, 1, MPI_CXX_BOOL, 0, MPI_COMM_WORLD);
    MPI_Bcast(&opt->size, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD);
    MPI_Bcast(&opt->num_servers, 1, MPI_INT, 0, MPI_COMM_WORLD);
    MPI_Bcast(&opt->verbosity, 1, MPI_INT, 0, MPI_COMM_WORLD);

    // Broadcast complex types (strings)
    bcast_string(opt->workdir, 0);
    bcast_string(opt->timestamp_file, 0);
    bcast_string(opt->name_pattern, 0);
    bcast_string(opt->mountdir, 0);

    // All processes construct the regex from the broadcasted pattern
    if (pfind_rank != 0 && !opt->name_pattern.empty()) {
        try {
            opt->name_regex = regex(opt->name_pattern);
        } catch (const regex_error& e) {
            pfind_abort("Invalid regex for name given: " + string(e.what()));
        }
    }
}

void
dirProcess(const string path, unsigned long long& checked,
           unsigned long long& found, queue<string>& dirs,
           unsigned int world_rank, unsigned int world_size,
           const pfind_options_t* opt) {
dirProcess(const string& path, unsigned long long& checked,
           unsigned long long& found, const pfind_options_t* opt) {
    const size_t buffer_size =
            (sizeof(struct dirent_extended) + 255) * 1024 * 100;
    unique_ptr<struct dirent_extended[]> getdir(
            new struct dirent_extended
                    [buffer_size / (sizeof(struct dirent_extended) + 255)]{});


    int servers_per_node = ceil(opt->num_servers / (world_size - 1));
    if(servers_per_node == 0)
        servers_per_node++;
    for(int it = 0; it < servers_per_node; it++) {
        auto server = (world_rank - 1) * servers_per_node + it;
        if(server >= (unsigned int) opt->num_servers)
            break;
            (sizeof(struct dirent_extended) + 255) * 1024 * 10240;

        unsigned long long total_size = 0;
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(), getdir.get(), buffer_size, server);
    unique_ptr<char[]> buffer(new char[buffer_size]{});

        struct dirent_extended* temp = getdir.get();
    // --- PARALLELIZATION LOGIC ---
    // Each process calculates its own range of servers to query.
    int servers_per_proc = opt->num_servers / pfind_size;
    int remainder = opt->num_servers % pfind_size;
    int start_server = pfind_rank * servers_per_proc + min(pfind_rank, remainder);
    int end_server = start_server + servers_per_proc + (pfind_rank < remainder ? 1 : 0);

        while(total_size < n) {
            if(strlen(temp->d_name) == 0)
                break;
    if (opt->verbosity > 0) {
        cout << "[Rank " << pfind_rank << "] Processing servers " << start_server 
             << " to " << end_server - 1 << endl;
    }

            total_size += temp->d_reclen;
    // Each process loops ONLY over its assigned servers
    for(int server = start_server; server < end_server; server++) {
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(),
                reinterpret_cast<struct dirent_extended*>(buffer.get()),
                buffer_size, server);
        
            /* Queue directory to process */
            if(temp->d_type == 1) {
                string slash;
                if(path.back() != '/')
                    slash = "/";
                checked++;
                dirs.push(path + slash + temp->d_name);
                temp = reinterpret_cast<dirent_extended*>(
                        reinterpret_cast<char*>(temp) + temp->d_reclen);
        if (n <= 0) { // Handle empty or error cases
            continue;
        }

            /* Find filtering */
            bool timeOK = true;
            if(!opt->timestamp_file.empty()) {
                if((uint64_t) temp->ctime < runtime.ctime_min)
                    timeOK = false;
        unsigned long long total_size = 0;
        struct dirent_extended* temp =
                reinterpret_cast<struct dirent_extended*>(buffer.get());
                
        while(total_size < n) {
            // Safety checks to prevent infinite loops on malformed data
            if(strlen(temp->d_name) == 0 || temp->d_reclen == 0) {
                break;
            }

            if(timeOK && (temp->size == opt->size ||
                          opt->size == std::numeric_limits<uint64_t>::max())) {
                if(opt->name_pattern.empty() ||
                   regex_search(temp->d_name, opt->name_regex)) {
            // Process files only, skip directories
            if(temp->d_type != 1) { 
                /* Find filtering */
                bool timeOK = opt->timestamp_file.empty() || ((uint64_t)temp->ctime >= runtime.ctime_min);
                bool sizeOK = (opt->size == std::numeric_limits<uint64_t>::max() || temp->size == opt->size);
                bool nameOK = opt->name_pattern.empty() || regex_search(temp->d_name, opt->name_regex);

                if (timeOK && sizeOK && nameOK) {
                    found++;
                }
                checked++;
            }

            checked++;
            // Unconditionally advance to the next record
            total_size += temp->d_reclen;
            temp = reinterpret_cast<dirent_extended*>(
                    reinterpret_cast<char*>(temp) + temp->d_reclen);
        }
@@ -462,28 +455,25 @@ dirProcess(const string path, unsigned long long& checked,
}

int
process(char* processor_name, int world_rank, int world_size,
        const pfind_options_t* opt) {
    // Print off a hello world message
process_parallel(const pfind_options_t* opt) {
    unsigned long long local_found = 0;
    unsigned long long local_checked = 0;
    runtime = {}; // Initialize runtime options

    // INIT PFIND
    runtime = {};
    /* Get timestamp file */
    /* Get timestamp file, broadcast from rank 0 */
    if(!opt->timestamp_file.empty()) {
        if(pfind_rank == 0) {
            struct stat timer_file;
            if(lstat(opt->timestamp_file.c_str(), &timer_file) != 0) {
                printf("Could not open: \"%s\", error: %s",
                       opt->timestamp_file.c_str(), strerror(errno));
                pfind_abort("\n");
                cerr << "Could not open: \"" << opt->timestamp_file << "\", error: " << strerror(errno) << endl;
                MPI_Abort(MPI_COMM_WORLD, 1);
            }
            runtime.ctime_min = timer_file.st_ctime;
        }
        MPI_Bcast(&runtime.ctime_min, 1, MPI_INT, 0, pfind_com);
        // Broadcast the timestamp to all processes
        MPI_Bcast(&runtime.ctime_min, 1, MPI_UINT64_T, 0, MPI_COMM_WORLD);
    }

    if(world_rank == 0) {
        queue<string> dirs;
    string workdir = opt->workdir;
    if(workdir.rfind(opt->mountdir, 0) == 0) {
        workdir = workdir.substr(opt->mountdir.length());
@@ -491,98 +481,21 @@ process(char* processor_name, int world_rank, int world_size,
    if(workdir.empty()) {
        workdir = "/";
    }
        dirs.push(workdir);

        do {
            std::string processpath = dirs.front();
            dirs.pop();
            send_newPath(processpath);

            auto received_strings = true;

            for(auto i = 1; i < world_size; i++) {
                received_strings = true;
                while(received_strings) {
                    received_strings = false;

                    MPI_Status mpistatus;
                    MPI_Probe(i, 0, MPI_COMM_WORLD, &mpistatus);

                    int count;
                    MPI_Get_count(&mpistatus, MPI_CHAR, &count);

                    std::vector<char> buf(count);
                    MPI_Recv(buf.data(), count, MPI_CHAR, i, 0, MPI_COMM_WORLD,
                             &mpistatus);

                    if(count == 0) {
                        continue;
                    }
                    std::string s(buf.begin(), buf.end());
                    dirs.push(s);
                    received_strings = true;
                }
            }
        } while(!dirs.empty());


        auto count = 0;
        MPI_Bcast(&count, 1, MPI_INT, 0, MPI_COMM_WORLD);

        MPI_Barrier(MPI_COMM_WORLD);
    // Each process calls dirProcess, which will handle its assigned subset of servers
    dirProcess(workdir, local_checked, local_found, opt);

        unsigned long long* Array_checked = (unsigned long long*) malloc(
                sizeof(unsigned long long) * world_size);
        unsigned long long* Array_found = (unsigned long long*) malloc(
                sizeof(unsigned long long) * world_size);
        unsigned long long checked = 0;
        unsigned long long found = 0;

        MPI_Gather(&checked, 1, MPI_UNSIGNED_LONG_LONG, Array_checked, 1,
                   MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD);
        MPI_Gather(&found, 1, MPI_UNSIGNED_LONG_LONG, Array_found, 1,
                   MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD);
    unsigned long long global_found = 0;
    unsigned long long global_checked = 0;
    
        for(int i = 0; i < world_size; i++) {
            checked += Array_checked[i];
            found += Array_found[i];
        }

        cout << "MATCHED " << found << "/" << checked << endl;
    }

    else {
        unsigned long long checked = 0;
        unsigned long long found = 0;
        while(1) {

            string toProcess = recv_newPath();
            if(toProcess == "Terminate") {
                break;
            }
            // cout << "REceived " << toProcess << " --- " << world_rank <<
            // endl;
            queue<string> dirs;
    MPI_Reduce(&local_found, &global_found, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
    MPI_Reduce(&local_checked, &global_checked, 1, MPI_UNSIGNED_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);

            dirProcess(toProcess, checked, found, dirs, world_rank, world_size,
                       opt);
            // Send NEW DIRS to master
            while(!dirs.empty()) {
                string s = dirs.front();
                dirs.pop();
                // cout << world_rank << " --> Sending " << s << endl;
                MPI_Send((void*) s.c_str(), (s.size() + 1), MPI_CHAR, 0, 0,
                         MPI_COMM_WORLD);
            }
            // cout << world_rank << " --> Sending 0 " << endl;
            MPI_Send((void*) 0, 0, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
        }

        MPI_Barrier(MPI_COMM_WORLD);
        MPI_Gather(&checked, 1, MPI_UNSIGNED_LONG_LONG, nullptr, 1,
                   MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD);
        MPI_Gather(&found, 1, MPI_UNSIGNED_LONG_LONG, nullptr, 1,
                   MPI_UNSIGNED_LONG_LONG, 0, MPI_COMM_WORLD);
    if (pfind_rank == 0) {
        cout << "MATCHED " << global_found << "/" << global_checked << endl;
    }

    return 0;
@@ -590,37 +503,42 @@ process(char* processor_name, int world_rank, int world_size,

int
main(int argc, char** argv) {
    // --- MPI INITIALIZATION ---
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &pfind_rank);
    MPI_Comm_size(MPI_COMM_WORLD, &pfind_size);

    for(int i = 0; i < argc; i++) {
    for(int i = 0; i < argc; ++i) {
        if(strcmp(argv[i], "--help") == 0) {
            argv[i][0] = 0;
            pfind_rank = 0;
            opt = pfind_parse_args(argc, argv, 1, MPI_COMM_SELF);
            delete opt;
            if (pfind_rank == 0) {
                // pfind_parse_args handles printing help
                pfind_parse_args(argc, argv, true);
            }
            MPI_Finalize();
            return 0;
        }
    }
    
    // Initialize the MPI environment
    MPI_Init(&argc, &argv);
    opt = new pfind_options_t();
    if (pfind_rank == 0) {
        opt = pfind_parse_args(argc, argv, false);
    }
    
    // Get the number of processes
    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);
    // --- BROADCAST CONFIGURATION ---
    bcast_options(opt);

    // Get the rank of the process
    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
    // Check if the GekkoFS function is available (e.g., via LD_PRELOAD)
    if (gkfs_getsingleserverdir == nullptr) {
        if(pfind_rank == 0) cerr << "Error: GekkoFS functions not available. Is the library preloaded?" << endl;
        MPI_Abort(MPI_COMM_WORLD, 1);
    }

    opt = pfind_parse_args(argc, argv, 0, MPI_COMM_WORLD);
    //	cout << opt->num_servers << " -- " << opt->mountdir << endl;
    // Get the name of the processor
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    int name_len;
    MPI_Get_processor_name(processor_name, &name_len);
    // --- RUN PARALLEL PROCESSING ---
    int result = process_parallel(opt);

    process(processor_name, world_rank, world_size, opt);
    delete opt;
    // Finalize the MPI environment.

    // --- MPI FINALIZATION ---
    MPI_Finalize();
    return result;
}
 No newline at end of file
+12 −15
Original line number Diff line number Diff line
@@ -372,34 +372,31 @@ dirProcess(const string& path, unsigned long long& checked,
           unsigned int world_rank, unsigned int world_size,
           const pfind_options_t* opt) {
    const size_t buffer_size =
            (sizeof(struct dirent_extended) + 255) * 1024 * 100;
    unique_ptr<struct dirent_extended[]> getdir(
            new struct dirent_extended
                    [buffer_size / (sizeof(struct dirent_extended) + 255)]{});
            (sizeof(struct dirent_extended) + 255) * 1024 * 10240;

    unique_ptr<char[]> buffer(new char[buffer_size]{});


    // cout << "PROCESSING " << world_rank << "/"<< world_size << " = " << path
    // << endl;

    for(int server = 0; server < opt->num_servers; server++) {
        unsigned long long total_size = 0;
        // We get the whole directory tree from the server
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(), getdir.get(), buffer_size, server);

        struct dirent_extended* temp = getdir.get();
                path.c_str(),
                reinterpret_cast<struct dirent_extended*>(buffer.get()),
                buffer_size, server);

        struct dirent_extended* temp =
                reinterpret_cast<struct dirent_extended*>(buffer.get());
        while(total_size < n) {
            if(strlen(temp->d_name) == 0)
                break;

            total_size += temp->d_reclen;

            /* Queue directory to process */
            // We don't need to process directories
            if(temp->d_type == 1) {
                string slash;
                if(path.back() != '/')
                    slash = "/";
                checked++;
                dirs.push(path + slash + temp->d_name);
                temp = reinterpret_cast<dirent_extended*>(
                        reinterpret_cast<char*>(temp) + temp->d_reclen);
                continue;
@@ -456,7 +453,7 @@ process(const pfind_options_t* opt) {
        workdir = "/";
    }
    dirs.push(workdir);

    // This loop will be only one shot as we get all the subdirs at once.
    while(!dirs.empty()) {
        string processpath = dirs.front();
        dirs.pop();
+12 −0
Original line number Diff line number Diff line
@@ -170,6 +170,18 @@ public:
    [[nodiscard]] std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_dirents_extended(const std::string& dir) const;


    /**
     * @brief Return all file names and modes for all the entries of the
     * given directory including their sizes and creation time.
     * @param dir directory prefix string
     * @return vector of pair <std::string name, bool is_dir - size - ctime>,
     *         where name is the name of the entries and is_dir
     *         is true in the case the entry is a directory.
     */
    [[nodiscard]] std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_all_dirents_extended(const std::string& dir) const;

    /**
     * @brief Iterate over complete database, note ONLY used for debugging and
     * is therefore unused.
+8 −0
Original line number Diff line number Diff line
@@ -83,6 +83,9 @@ public:
    virtual std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_dirents_extended(const std::string& dir) const = 0;

    virtual std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_all_dirents_extended(const std::string& dir) const = 0;

    virtual void*
    iterate_all() const = 0;

@@ -150,6 +153,11 @@ public:
        return static_cast<T const&>(*this).get_dirents_extended_impl(dir);
    }

    std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_all_dirents_extended(const std::string& dir) const {
        return static_cast<T const&>(*this).get_all_dirents_extended_impl(dir);
    }

    void*
    iterate_all() const {
        return static_cast<T const&>(*this).iterate_all_impl();
+3 −0
Original line number Diff line number Diff line
@@ -196,6 +196,9 @@ public:
    std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_dirents_extended_impl(const std::string& dir) const;

    std::vector<std::tuple<std::string, bool, size_t, time_t>>
    get_all_dirents_extended_impl(const std::string& root_path) const;

    /**
     * Code example for iterating all entries in KV store. This is for debug
     * only as it is too expensive
Loading