Commit b0cd95ae authored by Ramon Nou's avatar Ramon Nou
Browse files

Reattemp if there is no space

parent 30c0f5a0
Loading
Loading
Loading
Loading
+77 −59
Original line number Diff line number Diff line
@@ -904,75 +904,93 @@ forward_get_dirents_single(const string& path, int server) {
        return make_pair(EINVAL, nullptr);
    }

    auto large_buffer = std::unique_ptr<char[]>(
            new char[gkfs::config::rpc::dirents_buff_size]);
    const std::size_t per_host_buff_size = gkfs::config::rpc::dirents_buff_size;
    size_t buffer_size = gkfs::config::rpc::dirents_buff_size;
    auto large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);

    auto output_ptr = make_unique<
            vector<tuple<const std::string, bool, size_t, time_t>>>();
    int err = 0;
    const int max_retries = 2; // Prevent infinite loops

    std::vector<hermes::exposed_memory> exposed_buffers;
    exposed_buffers.reserve(1);
    for(int attempt = 0; attempt < max_retries; ++attempt) {

        // Expose the current buffer for RMA.
        // This needs to be done on each iteration because the buffer might be
        // reallocated.
        hermes::exposed_memory exposed_buffer;
        try {
        // FIX 1: Explicitly construct the std::vector for the template
        // function.
        exposed_buffers.emplace_back(ld_network_service->expose(
            exposed_buffer = ld_network_service->expose(
                    std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
                        large_buffer.get(), per_host_buff_size}},
                hermes::access_mode::write_only));
                            large_buffer.get(), buffer_size}},
                    hermes::access_mode::write_only);
        } catch(const std::exception& ex) {
        LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__,
            ex.what());
            LOG(ERROR,
                "{}() Failed to expose buffers for RMA on attempt {}. err '{}'",
                __func__, attempt, ex.what());
            return make_pair(EBUSY, nullptr);
        }

    auto err = 0;
    std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents_extended>> handles;
        // Send RPC (same as before, but now inside a loop)
        auto endp = CTX->hosts().at(targets[server]);
    gkfs::rpc::get_dirents_extended::input in(path, exposed_buffers[0]);
        gkfs::rpc::get_dirents_extended::input in(path, exposed_buffer);
        gkfs::rpc::get_dirents_extended::output out;

        try {
        LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[server]);
        handles.emplace_back(
                ld_network_service->post<gkfs::rpc::get_dirents_extended>(endp,
                                                                          in));
            LOG(DEBUG,
                "{}() Sending RPC to host '{}' (attempt {}, buffer size {})",
                __func__, targets[server], attempt + 1, buffer_size);

            auto handle =
                    ld_network_service->post<gkfs::rpc::get_dirents_extended>(
                            endp, in);
            out = handle.get().at(0);
        } catch(const std::exception& ex) {
        LOG(ERROR,
            "{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
            __func__, path, targets[server], ex.what());
            LOG(ERROR, "{}() RPC post/get failed on attempt {}: {}", __func__,
                attempt, ex.what());
            err = EBUSY;
            break; // Fatal error, break the loop
        }

    if(!handles.empty()) {
        gkfs::rpc::get_dirents_extended::output out;
        try {
            out = handles[0].get().at(0);
            if(out.err() != 0) {
                LOG(ERROR,
                    "{}() Failed to retrieve dir entries from host '{}'. Server error '{}', path '{}'",
                    __func__, targets[server], strerror(out.err()), path);
        // === RETRY LOGIC ===
        if(out.err() == ENOBUFS) {
            size_t required_size = out.dirents_size();
            LOG(WARNING,
                "{}() Buffer too small. Server requested {} bytes. Retrying.",
                __func__, required_size);

            // Re-allocate the buffer to the exact size the server needs.
            buffer_size = required_size;
            large_buffer = std::unique_ptr<char[]>(new char[buffer_size]);

            // The `exposed_buffer` from this iteration will be destructed.
            // The loop will continue for the next attempt with the new buffer.
            continue;

        } else if(out.err() != 0) {
            // A different, fatal server-side error occurred.
            LOG(ERROR, "{}() Server returned a fatal error: {}", __func__,
                strerror(out.err()));
            err = out.err();
            } else {
            break; // Break the loop
        }

        // --- SUCCESS! ---
        // If we reach here, out.err() was 0.
        LOG(DEBUG, "{}() RPC successful. Decompressing data.", __func__);
        try {
                    // FIX 2: This call will now succeed because the function
                    // signature matches 'out's type.
                    auto entries_vector = decompress_and_parse_entries(
                            out, large_buffer.get());
                    output_ptr = make_unique<vector<
                            tuple<const std::string, bool, size_t, time_t>>>(
            auto entries_vector =
                    decompress_and_parse_entries(out, large_buffer.get());
            output_ptr = make_unique<
                    vector<tuple<const std::string, bool, size_t, time_t>>>(
                    std::move(entries_vector));
            err = 0; // Explicitly set success
        } catch(const std::exception& ex) {
                    LOG(ERROR,
                        "{}() Failed to decompress/parse entries from host '{}': {}",
                        __func__, targets[server], ex.what());
            LOG(ERROR, "{}() Failed to decompress/parse entries: {}", __func__,
                ex.what());
            err = EBADMSG;
        }
            }
        } catch(const std::exception& ex) {
            LOG(ERROR,
                "{}() Failed to get RPC output.. [path: {}, target host: {}] err '{}'",
                __func__, path, targets[server], ex.what());
            err = EBUSY;
        }

        break; // Success, so we must break the retry loop.
    }

    return make_pair(err, std::move(output_ptr));
+13 −8
Original line number Diff line number Diff line
@@ -799,29 +799,34 @@ rpc_srv_get_dirents_extended(hg_handle_t handle) {

    // Check if the final compressed data fits in the client's buffer
    if(client_bulk_size < compressed_size) {
        std::cout << "dirents_extended compression does not fit "
                  << client_bulk_size << " < " << compressed_size << std::endl;
        GKFS_DATA->spdlogger()->error(
                "{}() Compressed data ('{}' bytes) does not fit client buffer ('{}' bytes)",
                __func__, compressed_size, client_bulk_size);
        out.err = ENOBUFS;

        // Repurpose dirents_size to tell the client the required buffer size
        out.dirents_size = compressed_size;

        // Respond immediately WITHOUT performing the bulk transfer
        return gkfs::rpc::cleanup_respond(&handle, &in, &out);
    }

    // === STEP 3: Transfer ONLY the compressed data ===
    void* bulk_buf;

    void* segment_ptr = compressed_data.data();
    size_t transfer_size = compressed_size;
    ret = margo_bulk_create(mid, 1, &bulk_buf, &transfer_size,


    ret = margo_bulk_create(mid, 1, &segment_ptr, &transfer_size,
                            HG_BULK_READ_ONLY, &bulk_handle);
    if(ret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to create bulk handle for compressed data",
                __func__);
                "{}() Failed to create zero-copy bulk handle", __func__);
        return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
    }


    // A single, safe memcpy of the compressed data.
    memcpy(bulk_buf, compressed_data.data(), compressed_size);

    GKFS_DATA->spdlogger()->trace(
            "{}() Serialized '{}' entries to '{}' bytes, compressed to '{}' bytes. Transferring now.",
            __func__, entries.size(), uncompressed_size, compressed_size);