Commit f9c50fdd authored by Ramon Nou's avatar Ramon Nou
Browse files

Parallel sfind per server

parent b0cd95ae
Loading
Loading
Loading
Loading
+160 −68
Original line number Diff line number Diff line
@@ -18,11 +18,11 @@
#include <unistd.h>
#include <vector>
#include <cstdlib>   // For getenv
#include <pthread.h> // Include Pthreads header
#include <atomic>    // For atomic counters

using namespace std;

// (struct dirent_extended, pfind_options_t, pfind_runtime_options_t are the
// same)
#pragma region structs
/* Minimal struct needed for io500 find */
struct dirent_extended {
@@ -61,17 +61,18 @@ typedef struct {
#pragma endregion

static pfind_runtime_options_t runtime;
int pfind_rank = 0;
int pfind_size = 1;
static pfind_options_t* opt;

// Global rank/size for logging and work distribution
int sfind_rank = 0;
int sfind_size = 1;

[[noreturn]] void
pfind_abort(const string& str) {
    cerr << "ERROR [Rank " << pfind_rank << "]: " << str << endl;
    cerr << "ERROR [Rank " << sfind_rank << "]: " << str << endl;
    exit(1);
}

// (pfind_print_help and pfind_parse_args are identical to the original)
#pragma region parsing
static void
pfind_print_help(const pfind_options_t* res) {
@@ -95,7 +96,6 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {
    auto res = new pfind_options_t();
    bool print_help = force_print_help;
    vector<char*> modified_argv(argv, argv + argc);

    for(int i = 1; i < argc; ++i) {
        if(strcmp(argv[i], "-newer") == 0 && i + 1 < argc) {
            res->timestamp_file = argv[i + 1];
@@ -185,10 +185,9 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {
                break;
        }
    }
    if(pfind_rank == 0 && print_help) {
        pfind_print_help(res);
    }
    if(print_help) {
        if(sfind_rank == 0)
            pfind_print_help(res);
        exit(0);
    }
    if(res->workdir.empty()) {
@@ -201,64 +200,88 @@ pfind_parse_args(int argc, char** argv, bool force_print_help) {
}
#pragma endregion

void
dirProcess(const string& path, unsigned long long& checked,
           unsigned long long& found, const pfind_options_t* opt) {
    struct dirent_extended* entries = nullptr;
struct ThreadData {
    int thread_id;
    const pfind_options_t* opt;
    const string* workdir;
    queue<int>* server_queue;
    pthread_mutex_t* queue_mutex;
    atomic<unsigned long long>* total_found;
    atomic<unsigned long long>* total_checked;
};

    // --- PARALLELIZATION LOGIC ---
    // Each process calculates its own range of servers to query.
    int servers_per_proc = opt->num_servers / pfind_size;
    int remainder = opt->num_servers % pfind_size;
    int start_server =
            pfind_rank * servers_per_proc + min(pfind_rank, remainder);
    int end_server =
            start_server + servers_per_proc + (pfind_rank < remainder ? 1 : 0);
// The function each worker thread will execute
void*
worker_routine(void* arg) {
    ThreadData* data = static_cast<ThreadData*>(arg);
    unsigned long long local_found = 0;
    unsigned long long local_checked = 0;

    while(true) {
        int server_id = -1;
        pthread_mutex_lock(data->queue_mutex);
        if(!data->server_queue->empty()) {
            server_id = data->server_queue->front();
            data->server_queue->pop();
        }
        pthread_mutex_unlock(data->queue_mutex);

    // Each process loops ONLY over its assigned servers
    for(int server = start_server; server < end_server; server++) {
        long unsigned int n = gkfs_getsingleserverdir(
                path.c_str(),
               &entries, server);
        if(server_id == -1)
            break;

        if(n <= 0)
        struct dirent_extended* entries = nullptr;
        long n = gkfs_getsingleserverdir(data->workdir->c_str(), &entries,
                                         server_id);

        if(n <= 0) {
            if(n < 0) {
                cerr << "Warning: Rank " << sfind_rank << " Thread "
                     << data->thread_id << " received error from server "
                     << server_id << endl;
            }
            if(entries)
                free(entries);
            continue;
        }

        // --- Success! We can now iterate through the results ---
        char* ptr = reinterpret_cast<char*>(entries);
        int bytes_processed = 0;


      
        while(bytes_processed < n) {
            struct dirent_extended* temp = reinterpret_cast<struct dirent_extended*>(ptr);
            if(strlen(temp->d_name) == 0 || temp->d_reclen == 0)
            struct dirent_extended* temp =
                    reinterpret_cast<struct dirent_extended*>(ptr);
            if(temp->d_reclen == 0)
                break;

            if(temp->d_type != 1) {
                bool timeOK = opt->timestamp_file.empty() ||
                bool timeOK = data->opt->timestamp_file.empty() ||
                              ((uint64_t) temp->ctime >= runtime.ctime_min);
                bool sizeOK =
                        (opt->size == std::numeric_limits<uint64_t>::max() ||
                         temp->size == opt->size);
                bool nameOK = opt->name_pattern.empty() ||
                              regex_search(temp->d_name, opt->name_regex);
                bool sizeOK = (data->opt->size ==
                                       std::numeric_limits<uint64_t>::max() ||
                               temp->size == data->opt->size);
                bool nameOK = data->opt->name_pattern.empty() ||
                              regex_search(temp->d_name, data->opt->name_regex);

                if(timeOK && sizeOK && nameOK)
                    found++;
                checked++;
                    local_found++;
            }
            local_checked++;
            bytes_processed += temp->d_reclen;
            ptr += temp->d_reclen;
       
        }
        }
        free(entries);
    }

    // Atomically add local results to the global counters
    data->total_found->fetch_add(local_found);
    data->total_checked->fetch_add(local_checked);

    return nullptr;
}

int
process_sequential_parallel(const pfind_options_t* opt) {
    unsigned long long local_found = 0;
    unsigned long long local_checked = 0;
process_parallel_pthreads(const pfind_options_t* opt) {
    atomic<unsigned long long> global_found(0);
    atomic<unsigned long long> global_checked(0);
    runtime = {};

    if(!opt->timestamp_file.empty()) {
@@ -277,59 +300,128 @@ process_sequential_parallel(const pfind_options_t* opt) {
    if(workdir.empty())
        workdir = "/";

    dirProcess(workdir, local_checked, local_found, opt);
    // --- 1. Calculate this process's subset of servers ---
    int servers_per_proc = opt->num_servers / sfind_size;
    int remainder = opt->num_servers % sfind_size;
    int start_server =
            sfind_rank * servers_per_proc + min(sfind_rank, remainder);
    int end_server =
            start_server + servers_per_proc + (sfind_rank < remainder ? 1 : 0);
    int num_servers_for_this_rank = end_server - start_server;

    if(num_servers_for_this_rank <= 0) {
        cout << "[Rank " << sfind_rank << "] No servers to process. Exiting."
             << endl;
        return 0;
    }

    // --- 2. Populate the local work queue with this rank's servers ---
    queue<int> server_queue;
    for(int i = start_server; i < end_server; ++i) {
        server_queue.push(i);
    }

    // --- Pthreads Setup ---
    int num_threads = 20; // Default number of parallel requests
    const char* env_threads = getenv("SFIND_NUM_THREADS");
    if(env_threads) {
        try {
            num_threads = stoi(env_threads);
        } catch(...) { /* ignore */
        }
    }
    num_threads = min(num_threads, num_servers_for_this_rank);

    cout << "[Rank " << sfind_rank << "] Processing servers " << start_server
         << "-" << end_server - 1 << " using " << num_threads << " threads."
         << endl;

    // --- WRITE LOG TO A UNIQUE FILE ---
    string output_log = "gfind_log.rank-" + to_string(sfind_rank) + ".txt";
    ofstream output_file2(output_log);
    if(!output_file2.is_open()) {
        pfind_abort("Failed to open output file: " + output_log);
    }
    output_file2 << "[Rank " << sfind_rank << "] Processing servers "
                 << start_server << "-" << end_server - 1 << " using "
                 << num_threads << " threads." << endl;
    output_file2.close();

    pthread_mutex_t queue_mutex;
    pthread_mutex_init(&queue_mutex, nullptr);
    vector<pthread_t> threads(num_threads);
    vector<ThreadData> thread_args(num_threads);

    for(int i = 0; i < num_threads; ++i) {
        thread_args[i] = {i,
                          opt,
                          &workdir,
                          &server_queue,
                          &queue_mutex,
                          &global_found,
                          &global_checked};
        pthread_create(&threads[i], nullptr, worker_routine, &thread_args[i]);
    }

    for(int i = 0; i < num_threads; ++i) {
        pthread_join(threads[i], nullptr);
    }
    pthread_mutex_destroy(&queue_mutex);

    // Results are already aggregated via atomics.
    // Each process prints its own sub-total. The final aggregation must be done
    // externally (e.g., with a script). We can use the files or the std outo
    cout << "MATCHED " << global_found << "/" << global_checked << endl;

    // --- WRITE LOCAL RESULTS TO A UNIQUE FILE ---
    string output_filename =
            "gfind_results.rank-" + to_string(pfind_rank) + ".txt";
            "gfind_results.rank-" + to_string(sfind_rank) + ".txt";
    ofstream output_file(output_filename);
    if(!output_file.is_open()) {
        pfind_abort("Failed to open output file: " + output_filename);
    }
    output_file << "MATCHED " << local_found << "/" << local_checked << endl;
    output_file << "MATCHED " << global_found << "/" << global_checked << endl;
    output_file.close();

    cout << "[Rank " << pfind_rank << "] Finished. Wrote results to "
         << output_filename << endl;
    cout << "MATCHED " << local_found << "/" << local_checked << endl;
    return 0;
}

int
main(int argc, char** argv) {
    // --- Get Rank/Size from Environment Variables ---
    // --- Get Rank/Size from SLURM Environment Variables ---
    const char* env_rank = getenv("SLURM_PROCID");
    const char* env_size = getenv("SLURM_NPROCS");

    if(env_rank && env_size) {
        try {
            pfind_rank = stoi(env_rank);
            pfind_size = stoi(env_size);
            sfind_rank = stoi(env_rank);
            sfind_size = stoi(env_size);
        } catch(const std::exception& e) {
            cerr << "Could not parse SLURM environment variables: " << e.what()
                 << endl;
            // Fallback to sequential
            pfind_rank = 0;
            pfind_size = 1;
                 << ". Falling back to sequential mode." << endl;
            sfind_rank = 0;
            sfind_size = 1;
        }
    } else {
        // Fallback for running outside of srun
        if(getenv("SLURM_JOB_ID")) {
            cerr << "Warning: SLURM_JOB_ID is set, but SLURM_PROCID/SLURM_NPROCS are not. Are you running with `srun`?"
                 << endl;
        }
        cout << "SLURM variables not found. Running in sequential mode (rank 0 of 1)."
             << endl;
        pfind_rank = 0;
        pfind_size = 1;
        sfind_rank = 0;
        sfind_size = 1;
    }

    // Each process parses its own arguments. This is simple and correct.
    opt = pfind_parse_args(argc, argv, false);

    // Check if GekkoFS function is available
    if(gkfs_getsingleserverdir == nullptr) {
        pfind_abort(
                "GekkoFS functions not available. Is the library preloaded?");
    }

    int result = process_sequential_parallel(opt);
    int result = process_parallel_pthreads(opt);

    delete opt;
    return result;
+2 −3
Original line number Diff line number Diff line
@@ -47,9 +47,8 @@ std::pair<int, off64_t>
forward_update_metadentry_size(const std::string& path, const size_t size,
                               const off64_t offset, const bool append_flag);

std::pair<int, size_t>
forward_get_dirents_single(const std::string& path, int server, void* buf,
                           const size_t bulk_size);
std::pair<int, std::vector<char>>
forward_get_dirents_single(const std::string& path, int server);

} // namespace gkfs::rpc

+8 −3
Original line number Diff line number Diff line
@@ -1478,6 +1478,8 @@ gkfs_opendir(const std::string& path) {
            "{}() Sending async dirents for path '{}' to '{}' daemons ...",
            __func__, path, CTX->hosts().size());
        // Launch RPC calls asynchronously
        // We need to filter the results from the dentry cache as
        // forward_get_dirents_single gathers all the files
        for(uint64_t i = 0; i < CTX->hosts().size(); i++) {
            dcache_futures.push_back(std::async(std::launch::async, [&, i]() {
                if(gkfs::config::proxy::fwd_get_dirents_single &&
@@ -1502,6 +1504,11 @@ gkfs_opendir(const std::string& path) {
                    get<3>(dentry));
                auto ftype = get<1>(dentry) ? gkfs::filemap::FileType::directory
                                            : gkfs::filemap::FileType::regular;
                // if the name includes a / skip it (as it belongs to a
                // subdirectory
                if(get<0>(dentry).find('/') != std::string::npos) {
                    continue;
                }
                // filename, is_dir, size, ctime
                ret.second->add(get<0>(dentry), ftype);
                CTX->dentry_cache()->insert(
@@ -2010,13 +2017,12 @@ extern "C" int
gkfs_getsingleserverdir(const char* path, struct dirent_extended** dirp,
                        int server) {

    // --- 1. Initial Sanity Checks and Setup ---
    // The user must provide a valid pointer-to-a-pointer.
    if(dirp == nullptr) {
        errno = EINVAL;
        return -1;
    }
    // Initialize the user's pointer to NULL. This is a safe default.

    *dirp = nullptr;

    // --- 2. Fetch Data from RPC (Unchanged) ---
@@ -2037,7 +2043,6 @@ gkfs_getsingleserverdir(const char* path, struct dirent_extended** dirp,

    auto& open_dir = *ret.second;

    // --- 3. Handle Empty Directory Case ---
    if(open_dir.empty()) {
        return 0; // Success, 0 bytes written, *dirp is already NULL.
    }
+0 −1
Original line number Diff line number Diff line
@@ -930,7 +930,6 @@ forward_get_dirents_single(const string& path, int server) {
            return make_pair(EBUSY, nullptr);
        }

        // Send RPC (same as before, but now inside a loop)
        auto endp = CTX->hosts().at(targets[server]);
        gkfs::rpc::get_dirents_extended::input in(path, exposed_buffer);
        gkfs::rpc::get_dirents_extended::output out;
+135 −100
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@
#include <client/logging.hpp>

#include <common/rpc/rpc_util.hpp>

#include <zstd.h>
using namespace std;

namespace gkfs {
@@ -201,121 +201,156 @@ forward_get_metadentry_size_proxy(const std::string& path) {
    }
}


// This function takes the RPC output and the received buffer, and returns the
// parsed entries.
std::vector<std::tuple<const std::string, bool, size_t, time_t>>
decompress_and_parse_entries(
        const gkfs::rpc::get_dirents_extended_proxy::output& out,
        const void* compressed_buffer) {
    if(out.err() != 0) {
        throw std::runtime_error("Server returned an error: " +
                                 std::to_string(out.err()));
    }
    if(out.dirents_size() == 0) {
        return {}; // No entries, return empty vector
    }

    // === STEP 1: Discover the original size from the Zstd frame header ===
    const unsigned long long uncompressed_size =
            ZSTD_getFrameContentSize(compressed_buffer, out.dirents_size());

    if(uncompressed_size == ZSTD_CONTENTSIZE_ERROR) {
        throw std::runtime_error("Received data is not a valid Zstd frame.");
    }
    if(uncompressed_size == ZSTD_CONTENTSIZE_UNKNOWN) {
        throw std::runtime_error(
                "Zstd frame content size is unknown and was not written in the frame.");
    }

    // === STEP 2: Decompress the data into a new buffer ===
    std::vector<char> decompressed_data(uncompressed_size);
    const size_t result_size =
            ZSTD_decompress(decompressed_data.data(), uncompressed_size,
                            compressed_buffer, out.dirents_size());

    if(ZSTD_isError(result_size)) {
        throw std::runtime_error("Zstd decompression failed: " +
                                 std::string(ZSTD_getErrorName(result_size)));
    }
    if(result_size != uncompressed_size) {
        throw std::runtime_error("Decompression size mismatch.");
    }

    // === STEP 3: Parse the decompressed, raw data stream ===
    std::vector<std::tuple<const std::string, bool, size_t, time_t>> entries;
    const char* p = decompressed_data.data();
    const char* end = p + decompressed_data.size();

    while(p < end) {
        // Read is_dir
        bool is_dir = *reinterpret_cast<const bool*>(p);
        p += sizeof(bool);

        // Read file_size
        size_t file_size = *reinterpret_cast<const size_t*>(p);
        p += sizeof(size_t);

        // Read ctime
        time_t ctime = *reinterpret_cast<const time_t*>(p);
        p += sizeof(time_t);

        // Read name (which is null-terminated)
        std::string name(p);
        p += name.length() + 1;

        entries.emplace_back(name, is_dir, file_size, ctime);
    }

    return entries;
}


pair<int, unique_ptr<vector<tuple<const std::string, bool, size_t, time_t>>>>
forward_get_dirents_single_proxy(const string& path, int server) {

    LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
    LOG(DEBUG, "{}() enter for path '{}', server '{}'", __func__, path, server);
    auto endp = CTX->proxy_host();

    /* preallocate receiving buffer. The actual size is not known yet.
     *
     * On C++14 make_unique function also zeroes the newly allocated buffer.
     * It turns out that this operation is increadibly slow for such a big
     * buffer. Moreover we don't need a zeroed buffer here.
     */
    auto large_buffer = std::unique_ptr<char[]>(
            new char[gkfs::config::rpc::dirents_buff_size_proxy]);
    // Start with the default optimistic buffer size
    size_t buffer_size = gkfs::config::rpc::dirents_buff_size_proxy;
    auto large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);

    // We use the full size per server...
    const std::size_t per_host_buff_size =
            gkfs::config::rpc::dirents_buff_size_proxy;
    auto output_ptr = make_unique<
            vector<tuple<const std::string, bool, size_t, time_t>>>();
    int err = 0;
    const int max_retries = 2; // Prevent infinite loops

    // expose local buffers for RMA from servers
    std::vector<hermes::exposed_memory> exposed_buffers;
    exposed_buffers.reserve(1);
    for(int attempt = 0; attempt < max_retries; ++attempt) {
        hermes::exposed_memory exposed_buffer;
        try {
        exposed_buffers.emplace_back(ld_proxy_service->expose(
            exposed_buffer = ld_proxy_service->expose(
                    std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
                        large_buffer.get(), per_host_buff_size}},
                hermes::access_mode::write_only));
                            large_buffer.get(), buffer_size}},
                    hermes::access_mode::write_only);
        } catch(const std::exception& ex) {
        LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__,
            ex.what());
        return make_pair(EBUSY, std::move(output_ptr));
            LOG(ERROR, "{}() Failed to expose buffer on attempt {}: '{}'",
                __func__, attempt, ex.what());
            return make_pair(EBUSY, nullptr);
        }

    auto err = 0;
    // send RPCs
    std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents_extended_proxy>>
            handles;

        gkfs::rpc::get_dirents_extended_proxy::input in(path, server,
                                                    exposed_buffers[0]);
                                                        exposed_buffer);
        gkfs::rpc::get_dirents_extended_proxy::output out;

        try {
        LOG(DEBUG, "{}() Sending IPC to proxy", __func__);
        handles.emplace_back(
                ld_proxy_service->post<gkfs::rpc::get_dirents_extended_proxy>(
                        endp, in));
            LOG(DEBUG, "{}() Sending IPC to proxy (attempt {}, buffer size {})",
                __func__, attempt + 1, buffer_size);
            auto handle = ld_proxy_service
                                  ->post<gkfs::rpc::get_dirents_extended_proxy>(
                                          endp, in);
            out = handle.get().at(0);
        } catch(const std::exception& ex) {
        LOG(ERROR,
            "{}() Unable to send non-blocking proxy_get_dirents() on {} [peer: proxy] err '{}'",
            __func__, path, ex.what());
            LOG(ERROR, "{}() RPC to proxy failed on attempt {}: {}", __func__,
                attempt, ex.what());
            err = EBUSY;
            break; // Fatal error, break retry loop
        }

    LOG(DEBUG,
        "{}() path '{}' sent rpc_srv_get_dirents() rpc to proxy. Waiting on reply next and deserialize",
        __func__, path);

    // wait for RPC responses

    gkfs::rpc::get_dirents_extended_proxy::output out;

    try {
        // XXX We might need a timeout here to not wait forever for an
        // output that never comes?
        out = handles[0].get().at(0);
        // skip processing dirent data if there was an error during send
        // In this case all responses are gathered but their contents skipped

        if(out.err() != 0) {
            LOG(ERROR,
                "{}() Failed to retrieve dir entries from proxy. Error '{}', path '{}'",
                __func__, strerror(out.err()), path);
        // --- RETRY LOGIC ---
        if(out.err() == ENOBUFS) {
            size_t required_size = out.dirents_size();
            LOG(WARNING,
                "{}() Buffer too small. Proxy requested {} bytes. Retrying.",
                __func__, required_size);

            buffer_size = required_size;
            large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);
            continue; // Continue to the next attempt with the new buffer
        } else if(out.err() != 0) {
            LOG(ERROR, "{}() Proxy returned a fatal error: {}", __func__,
                strerror(out.err()));
            err = out.err();
            // We need to gather all responses before exiting
            break; // Break the loop
        }

        // --- SUCCESS ---
        LOG(DEBUG, "{}() RPC to proxy successful. Decompressing data.",
            __func__);
        try {
            auto entries_vector =
                    decompress_and_parse_entries(out, large_buffer.get());
            output_ptr = make_unique<
                    vector<tuple<const std::string, bool, size_t, time_t>>>(
                    std::move(entries_vector));
            err = 0;
        } catch(const std::exception& ex) {
        LOG(ERROR,
            "{}() Failed to get rpc output.. [path: {}, target host: proxy] err '{}'",
            __func__, path, ex.what());
        err = EBUSY;
        // We need to gather all responses before exiting
            LOG(ERROR, "{}() Failed to decompress/parse entries from proxy: {}",
                __func__, ex.what());
            err = EBADMSG;
        }

    // The parenthesis is extremely important if not the cast will add as a
    // size_t or a time_t and not as a char
    auto out_buff_ptr = static_cast<char*>(exposed_buffers[0].begin()->data());
    auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
    auto size_ptr = reinterpret_cast<size_t*>(
            (out_buff_ptr) + (out.dirents_size() * sizeof(bool)));
    auto ctime_ptr = reinterpret_cast<time_t*>(
            (out_buff_ptr) +
            (out.dirents_size() * (sizeof(bool) + sizeof(size_t))));
    auto names_ptr =
            out_buff_ptr + (out.dirents_size() *
                            (sizeof(bool) + sizeof(size_t) + sizeof(time_t)));

    for(std::size_t j = 0; j < out.dirents_size(); j++) {

        bool ftype = (*bool_ptr);
        bool_ptr++;

        size_t size = *size_ptr;
        size_ptr++;

        time_t ctime = *ctime_ptr;
        ctime_ptr++;

        auto name = std::string(names_ptr);
        // number of characters in entry + \0 terminator
        names_ptr += name.size() + 1;
        output_ptr->emplace_back(
                std::forward_as_tuple(name, ftype, size, ctime));
        break; // Success, break the retry loop
    }

    return make_pair(err, std::move(output_ptr));
}

Loading