Commit 3f473537 authored by Ramon Nou's avatar Ramon Nou
Browse files

Resolve "refactor gfind / sfind"

parent 2a2b395b
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -8,7 +8,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## Unreleased
### New
  - Added cppcheck code checking capabilities ([!214](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/214))
  - Tests to cover proxy and malleability ([!222](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/222))
  - Tests to cover proxy and (malleability) ([!222](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/222))
  - New fd generation method ([!225](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/202))
    - Use LIBGKFS_PROTECT_FD=1 to enable the original method of assignation and protection.
    
@@ -18,6 +18,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
  - Return EINVAL is better than ENOTSUP on internal readlink ([!223](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/223))
  - resolve_new accepts the parameter resolve_last_link,([!209](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/209))
    - This solves an issue with the ci when looks at gcl-build directory (mainly lxstat) 
  - Refactor and modernize sfind and gfind ([!226](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/226))
  
### Fixed

+217 −143
Original line number Diff line number Diff line
@@ -39,21 +39,27 @@
/* Based on pfind from ior500 */
/* https://github.com/VI4IO/pfind/ */

#include <cmath>
#include <algorithm>
#include <cerrno>
#include <climits>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <fstream>
#include <getopt.h>
#include <iostream>
#include <mpi.h>
#include <limits>
#include <queue>
#include <regex.h>
#include <stdio.h>
#include <regex>
#include <sstream>
#include <stdexcept>
#include <string>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <limits>
#include <cstdint>

#include <vector>
#include <mpi.h>
#include <cmath>
using namespace std;

/* Minimal struct needed for io500 find */
@@ -75,52 +81,54 @@ gkfs_getsingleserverdir(const char* path, struct dirent_extended* dirp,
/* PFIND OPTIONS EXTENDED We need to add the GekkoFS mount dir and the number of
 * servers */
typedef struct {
    char* workdir;
    int just_count;
    int print_by_process;
    char* results_dir;
    int stonewall_timer;
    int print_rates;

    char* timestamp_file;
    char* name_pattern;
    regex_t name_regex;
    uint64_t size;

    int num_servers;
    char* mountdir;
    string workdir;
    bool just_count = false;
    bool print_by_process = false;
    string results_dir;
    int stonewall_timer = 0;
    bool print_rates = false;

    string timestamp_file;
    string name_pattern;
    regex name_regex;
    uint64_t size = std::numeric_limits<uint64_t>::max();

    int num_servers = 0;
    string mountdir;
    // optimizing parameters NOT USED
    int queue_length;
    int max_entries_per_iter;
    int steal_from_next;            // if true, then steal from the next process
    int parallel_single_dir_access; // if 1, use hashing to parallelize single
                                    // directory access, if 2 sequential
                                    // increment

    int verbosity;
    int queue_length = 100000;
    int max_entries_per_iter = 1000;
    bool steal_from_next = false; // if true, then steal from the next process
    int parallel_single_dir_access = 0; // if 1, use hashing to parallelize
                                        // single directory access, if 2
                                        // sequential increment

    int verbosity = 0;
} pfind_options_t;

typedef struct {
    uint64_t ctime_min;
    double stonewall_endtime;
    FILE* logfile;
    int needs_stat;
    uint64_t ctime_min = 0;
    double stonewall_endtime = 0.0;
    FILE* logfile = nullptr;
    bool needs_stat = false;
} pfind_runtime_options_t;

static pfind_runtime_options_t runtime;

int pfind_rank;
int pfind_rank = 0;
int pfind_size = 1;

static pfind_options_t* opt;

void
pfind_abort(const string str) {
    printf("%s", str.c_str());
[[noreturn]] void
pfind_abort(const string& str) {
    cerr << str << endl;
    exit(1);
}


static void
pfind_print_help(pfind_options_t* res) {
pfind_print_help(const pfind_options_t* res) {
    printf("pfind \nSynopsis:\n"
           "pfind <workdir> [-newer <timestamp file>] [-size <size>c] [-name "
           "<substr>] [-regex <regex>] [-S <numserver>] [-M <mountdir>]\n"
@@ -132,158 +140,214 @@ pfind_print_help(pfind_options_t* res) {
           "Optional flags\n"
           "\t-h: prints the help\n"
           "\t--help: prints the help without initializing MPI\n",
           res->workdir, res->timestamp_file, res->name_pattern,
           res->num_servers, res->mountdir);
           res->workdir.c_str(), res->timestamp_file.c_str(),
           res->name_pattern.c_str(), res->num_servers, res->mountdir.c_str());
}

MPI_Comm pfind_com;
int pfind_size;

pfind_options_t*
pfind_parse_args(int argc, char** argv, int force_print_help, MPI_Comm com) {
    MPI_Comm_rank(com, &pfind_rank);
    MPI_Comm_size(com, &pfind_size);
    pfind_com = com;

    pfind_options_t* res = (pfind_options_t*) malloc(sizeof(pfind_options_t));
    memset(res, 0, sizeof(pfind_options_t));
    int print_help = force_print_help;
    auto res = new pfind_options_t();

    bool print_help = force_print_help;

    res->workdir = nullptr;
    res->results_dir = nullptr;
    res->verbosity = 0;
    res->timestamp_file = nullptr;
    res->name_pattern = nullptr;
    res->size = std::numeric_limits<uint64_t>::max();
    res->queue_length = 100000;
    res->max_entries_per_iter = 1000;
    char* firstarg = nullptr;
    vector<char*> modified_argv(argv, argv + argc);

    // when we find special args, we process them
    // but we need to replace them with 0 so that getopt will ignore them
    // and getopt will continue to process beyond them
    for(auto i = 1; i < argc - 1; i++) {
    for(int i = 1; i < argc - 1; ++i) {
        if(strcmp(argv[i], "-newer") == 0) {
            res->timestamp_file = strdup(argv[i + 1]);
            argv[i][0] = 0;
            argv[++i][0] = 0;
            res->timestamp_file = argv[i + 1];
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-size") == 0) {
            char* str = argv[i + 1];
            char extension = str[strlen(str) - 1];
            str[strlen(str) - 1] = 0;
            res->size = atoll(str);
            try {
                res->size = stoull(str);
            } catch(const invalid_argument& e) {
                pfind_abort("Invalid size argument: " + string(str) + "\n");
            } catch(const out_of_range& e) {
                pfind_abort("Size argument out of range: " + string(str) +
                            "\n");
            }

            switch(extension) {
                case 'c':
                    break;
                default:
                    pfind_abort("Unsupported exension for -size\n");
                    pfind_abort("Unsupported extension for -size\n");
            }
            argv[i][0] = 0;
            argv[++i][0] = 0;
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-name") == 0) {
            res->name_pattern = (char*) malloc(strlen(argv[i + 1]) * 4 + 100);
            // transform a traditional name pattern to a regex:
            char* str = argv[i + 1];
            char* out = res->name_pattern;
            int pos = 0;
            for(unsigned i = 0; i < strlen(str); i++) {
                if(str[i] == '*') {
                    pos += sprintf(out + pos, ".*");
                } else if(str[i] == '.') {
                    pos += sprintf(out + pos, "[.]");
                } else if(str[i] == '"' || str[i] == '\"') {
                    // erase the "
                } else {
                    out[pos] = str[i];
                    pos++;
            string pattern = argv[i + 1];
            res->name_pattern.clear();
            res->name_pattern.reserve(pattern.length() * 4 +
                                      100); // pre-allocate for expansion

            for(char c : pattern) {
                switch(c) {
                    case '*':
                        res->name_pattern += ".*";
                        break;
                    case '.':
                        res->name_pattern += "[.]";
                        break;
                    case '"':
                    case '\'':
                        // erase quotes
                        break;
                    default:
                        res->name_pattern += c;
                        break;
                }
            }
            out[pos] = 0;

            auto ret = regcomp(&res->name_regex, res->name_pattern, 0);
            if(ret) {
                pfind_abort("Invalid regex for name given\n");
            try {
                res->name_regex = regex(res->name_pattern);
            } catch(const regex_error& e) {
                pfind_abort("Invalid regex for name given: " +
                            string(e.what()) + "\n");
            }
            argv[i][0] = 0;
            argv[++i][0] = 0;
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-regex") == 0) {
            res->name_pattern = strdup(argv[i + 1]);
            auto ret = regcomp(&res->name_regex, res->name_pattern, 0);
            if(ret) {
                pfind_abort("Invalid regex for name given\n");
            }
            argv[i][0] = 0;
            argv[++i][0] = 0;
            res->name_pattern = argv[i + 1];
            try {
                res->name_regex = regex(res->name_pattern);
            } catch(const regex_error& e) {
                pfind_abort("Invalid regex for name given: " +
                            string(e.what()) + "\n");
            }
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-M") == 0) {
            res->mountdir = strdup(argv[i + 1]);
            argv[i][0] = 0;
            argv[++i][0] = 0;
            res->mountdir = argv[i + 1];
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(strcmp(argv[i], "-S") == 0) {
            res->num_servers = atoi(argv[i + 1]);
            argv[i][0] = 0;
            argv[++i][0] = 0;
        } else if(!firstarg) {
            firstarg = strdup(argv[i]);
            argv[i][0] = 0;
            try {
                res->num_servers = stoi(argv[i + 1]);
            } catch(const invalid_argument& e) {
                pfind_abort("Invalid number of servers: " +
                            string(argv[i + 1]) + "\n");
            } catch(const out_of_range& e) {
                pfind_abort("Number of servers out of range: " +
                            string(argv[i + 1]) + "\n");
            }
            modified_argv[i][0] = 0;
            modified_argv[i + 1][0] = 0;
            ++i;
        } else if(res->workdir.empty()) {
            res->workdir = argv[i];
            modified_argv[i][0] = 0;
        }
    }
    if(argc == 2) {
        firstarg = strdup(argv[1]);

    if(argc == 2 && res->workdir.empty()) {
        res->workdir = argv[1];
    }


    const char* optstring = "CPs:r:vhD:xq:H:NM:S:";
    int c;
    while((c = getopt(argc, argv, "CPs:r:vhD:xq:H:NM:S:")) != -1) {
    optind = 1; // Reset getopt's internal index for repeated calls.
    while((c = getopt(argc, modified_argv.data(), optstring)) != -1) {
        if(c == -1) {
            break;
        }

        switch(c) {
            case 'H':
                res->parallel_single_dir_access = atoi(optarg);
                try {
                    res->parallel_single_dir_access = stoi(optarg);
                } catch(const invalid_argument& e) {
                    pfind_abort("Invalid parallel_single_dir_access: " +
                                string(optarg) + "\n");
                } catch(const out_of_range& e) {
                    pfind_abort("parallel_single_dir_access out of range: " +
                                string(optarg) + "\n");
                }
                break;
            case 'N':
                res->steal_from_next = 1;
                res->steal_from_next = true;
                break;
            case 'x':
                /* ignore fake arg that we added when we processed the extra
                 * args */
                break;
            case 'P':
                res->print_by_process = 1;
                res->print_by_process = true;
                break;
            case 'C':
                res->just_count = 1;
                res->just_count = true;
                break;
            case 'D':
                if(strcmp(optarg, "rates") == 0) {
                    res->print_rates = 1;
                    res->print_rates = true;
                } else {
                    pfind_abort("Unsupported debug flag\n");
                }
                break;
            case 'h':
                print_help = 1;
                print_help = true;
                break;
            case 'r':
                res->results_dir = strdup(optarg);
                res->results_dir = optarg;
                break;
            case 'q':
                res->queue_length = atoi(optarg);
                break;
                try {
                    res->queue_length = stoi(optarg);
                } catch(const invalid_argument& e) {
                    pfind_abort("Invalid queue length: " + string(optarg) +
                                "\n");
                } catch(const out_of_range& e) {
                    pfind_abort("Queue length out of range: " + string(optarg) +
                                "\n");
                }
                if(res->queue_length < 10) {
                    pfind_abort("Queue must be at least 10 elements!\n");
                }
                break;
            case 's':
                res->stonewall_timer = atol(optarg);
                try {
                    res->stonewall_timer = stoi(optarg);
                } catch(const invalid_argument& e) {
                    pfind_abort("Invalid stonewall timer: " + string(optarg) +
                                "\n");
                } catch(const out_of_range& e) {
                    pfind_abort("Stonewall timer out of range: " +
                                string(optarg) + "\n");
                }
                break;
            case 'v':
                res->verbosity++;
                break;
            case 0:
                break;
            case '?':
                // getopt already prints an error message
                exit(1);
            default:
                pfind_abort("Unhandled option: " + string(1, (char) c) + "\n");
        }
    }

    if(res->verbosity > 2 && pfind_rank == 0) {
        printf("Regex: %s\n", res->name_pattern);
        printf("Regex: %s\n", res->name_pattern.c_str());
    }

    if(print_help) {
@@ -297,10 +361,9 @@ pfind_parse_args(int argc, char** argv, int force_print_help, MPI_Comm com) {
        exit(0);
    }

    if(!firstarg) {
    if(res->workdir.empty()) {
        pfind_abort("Error: pfind <directory>\n");
    }
    res->workdir = firstarg;

    return res;
}
@@ -336,12 +399,12 @@ void
dirProcess(const string path, unsigned long long& checked,
           unsigned long long& found, queue<string>& dirs,
           unsigned int world_rank, unsigned int world_size,
           pfind_options_t* opt) {
    struct dirent_extended* getdir = (struct dirent_extended*) malloc(
            (sizeof(struct dirent_extended) + 255) * 1024 * 100);
    memset(getdir, 0, (sizeof(struct dirent_extended) + 255) * 1024 * 100);
    // cout << "PROCESSING " << world_rank << "/"<< world_size << " = " << path
    // << endl;
           const pfind_options_t* opt) {
    const size_t buffer_size =
            (sizeof(struct dirent_extended) + 255) * 1024 * 100;
    unique_ptr<struct dirent_extended[]> getdir(
            new struct dirent_extended
                    [buffer_size / (sizeof(struct dirent_extended) + 255)]{});


    int servers_per_node = ceil(opt->num_servers / (world_size - 1));
@@ -353,19 +416,21 @@ dirProcess(const string path, unsigned long long& checked,
            break;

        unsigned long long total_size = 0;
        auto n = gkfs_getsingleserverdir(
                path.c_str(), getdir,
                (sizeof(struct dirent_extended) + 255) * 1024 * 100, server);
        struct dirent_extended* temp = getdir;
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(), getdir.get(), buffer_size, server);

        struct dirent_extended* temp = getdir.get();

        while(total_size < (unsigned long long) n) {
        while(total_size < n) {
            if(strlen(temp->d_name) == 0)
                break;

            total_size += temp->d_reclen;

            /* Queue directory to process */
            if(temp->d_type == 1) {
                string slash;
                if(path[path.size() - 1] != '/')
                if(path.back() != '/')
                    slash = "/";
                checked++;
                dirs.push(path + slash + temp->d_name);
@@ -373,17 +438,22 @@ dirProcess(const string path, unsigned long long& checked,
                        reinterpret_cast<char*>(temp) + temp->d_reclen);
                continue;
            }

            /* Find filtering */
            auto timeOK = true;
            if(opt->timestamp_file) {
            bool timeOK = true;
            if(!opt->timestamp_file.empty()) {
                if((uint64_t) temp->ctime < runtime.ctime_min)
                    timeOK = false;
            }
            if(timeOK and (temp->size == opt->size or
                           opt->size == std::numeric_limits<uint64_t>::max()))
                if(!(opt->name_pattern &&
                     regexec(&opt->name_regex, temp->d_name, 0, nullptr, 0)))

            if(timeOK && (temp->size == opt->size ||
                          opt->size == std::numeric_limits<uint64_t>::max())) {
                if(opt->name_pattern.empty() ||
                   regex_search(temp->d_name, opt->name_regex)) {
                    found++;
                }
            }

            checked++;
            temp = reinterpret_cast<dirent_extended*>(
                    reinterpret_cast<char*>(temp) + temp->d_reclen);
@@ -393,18 +463,18 @@ dirProcess(const string path, unsigned long long& checked,

int
process(char* processor_name, int world_rank, int world_size,
        pfind_options_t* opt) {
        const pfind_options_t* opt) {
    // Print off a hello world message

    // INIT PFIND
    memset(&runtime, 0, sizeof(pfind_runtime_options_t));
    runtime = {};
    /* Get timestamp file */
    if(opt->timestamp_file) {
    if(!opt->timestamp_file.empty()) {
        if(pfind_rank == 0) {
            static struct stat timer_file{};
            if(lstat(opt->timestamp_file, &timer_file) != 0) {
                printf("Could not open: \"%s\", error: %s", opt->timestamp_file,
                       strerror(errno));
            struct stat timer_file;
            if(lstat(opt->timestamp_file.c_str(), &timer_file) != 0) {
                printf("Could not open: \"%s\", error: %s",
                       opt->timestamp_file.c_str(), strerror(errno));
                pfind_abort("\n");
            }
            runtime.ctime_min = timer_file.st_ctime;
@@ -415,9 +485,12 @@ process(char* processor_name, int world_rank, int world_size,
    if(world_rank == 0) {
        queue<string> dirs;
        string workdir = opt->workdir;
        workdir = workdir.substr(strlen(opt->mountdir), workdir.size());
        if(workdir.size() == 0)
        if(workdir.rfind(opt->mountdir, 0) == 0) {
            workdir = workdir.substr(opt->mountdir.length());
        }
        if(workdir.empty()) {
            workdir = "/";
        }
        dirs.push(workdir);

        do {
@@ -522,8 +595,9 @@ main(int argc, char** argv) {
        if(strcmp(argv[i], "--help") == 0) {
            argv[i][0] = 0;
            pfind_rank = 0;
            pfind_parse_args(argc, argv, 1, MPI_COMM_SELF);
            exit(0);
            opt = pfind_parse_args(argc, argv, 1, MPI_COMM_SELF);
            delete opt;
            return 0;
        }
    }

@@ -546,7 +620,7 @@ main(int argc, char** argv) {
    MPI_Get_processor_name(processor_name, &name_len);

    process(processor_name, world_rank, world_size, opt);

    delete opt;
    // Finalize the MPI environment.
    MPI_Finalize();
}
+227 −162

File changed.

Preview size limit exceeded, changes collapsed.

+4 −8
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ def test_malleability(gkfwd_daemon_factory, gkfs_client, gkfs_shell):
    import time
    d00 = gkfwd_daemon_factory.create()
    # Add "#FS_INSTANCE_END" in the file with name d00.hostfile
    d01 = gkfwd_daemon_factory.create()
   

    time.sleep(10)
    with open(d00.hostfile, 'a') as f:
@@ -65,24 +65,20 @@ def test_malleability(gkfwd_daemon_factory, gkfs_client, gkfs_shell):

    # Create content 

    d02 = gkfwd_daemon_factory.create()
    d01 = gkfwd_daemon_factory.create()
    time.sleep(10)
    cmd = gkfs_shell.gkfs_malleability('expand','status')
    assert cmd.exit_code == 0
    assert cmd.stdout.decode() == "No expansion running/finished.\n"
    
    cmd = gkfs_shell.gkfs_malleability('expand','start', timeout=340)
    assert cmd.stdout.decode() == "Expansion process from 2 nodes to 3 nodes launched...\n"
    assert cmd.exit_code == 0
    
    time.sleep(20)

    cmd = gkfs_shell.gkfs_malleability('expand','finalize')
    assert cmd.stdout.decode() == "Expand finalize 0\n"
    assert cmd.exit_code == 0
    
    d00.shutdown()
    d01.shutdown()
    d02.shutdown()

    
    
 No newline at end of file