Commit 27598ccf authored by Ramon Nou's avatar Ramon Nou
Browse files

Retry on standard directory

parent f9c50fdd
Loading
Loading
Loading
Loading
+79 −36
Original line number Diff line number Diff line
@@ -740,27 +740,96 @@ forward_get_dirents(const string& path) {

    auto send_error = err != 0;
    auto open_dir = make_shared<gkfs::filemap::OpenDir>(path);

    // Add special files
    open_dir->add(".", gkfs::filemap::FileType::directory);
    open_dir->add("..", gkfs::filemap::FileType::directory);

    // Helper lambda to deserialize data
    auto deserialize_dirents = [&](void* buffer_ptr, size_t num_entries) {
        bool* bool_ptr = reinterpret_cast<bool*>(buffer_ptr);
        char* names_ptr = reinterpret_cast<char*>(buffer_ptr) +
                          (num_entries * sizeof(bool));

        for(std::size_t j = 0; j < num_entries; j++) {
            gkfs::filemap::FileType ftype =
                    (*bool_ptr) ? gkfs::filemap::FileType::directory
                                : gkfs::filemap::FileType::regular;
            bool_ptr++;

            auto name = std::string(names_ptr);
            names_ptr += name.size() + 1;

            open_dir->add(name, ftype);
        }
    };

    // wait for RPC responses
    for(std::size_t i = 0; i < handles.size(); ++i) {

        gkfs::rpc::get_dirents::output out;

        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            out = handles[i].get().at(0);
            // skip processing dirent data if there was an error during send
            // In this case all responses are gathered but their contents
            // skipped

            // skip processing if there was an error during send
            if(send_error)
                continue;

            // --- RETRY LOGIC START ---
            if(out.err() == ENOBUFS) {
                // The buffer was too small. The daemon returns the required size in dirents_size (in bytes).
                size_t required_size = out.dirents_size();
                LOG(DEBUG, "{}() Buffer too small for host '{}'. Required: {}, Available: {}. Retrying...",
                    __func__, targets[i], required_size, per_host_buff_size);

                // Allocate exact size needed
                auto retry_buffer = std::unique_ptr<char[]>(new char[required_size]);
                
                // Expose new buffer
                hermes::exposed_memory retry_exposed_buffer;
                try {
                    retry_exposed_buffer = ld_network_service->expose(
                        std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
                            retry_buffer.get(), required_size}},
                        hermes::access_mode::write_only);
                } catch(const std::exception& ex) {
                    LOG(ERROR, "{}() Failed to expose retry buffer. err '{}'", __func__, ex.what());
                    err = EBUSY;
                    continue;
                }

                // Resend RPC
                auto endp = CTX->hosts().at(targets[i]);
                gkfs::rpc::get_dirents::input in(path, retry_exposed_buffer);
                
                try {
                    auto retry_out = ld_network_service->post<gkfs::rpc::get_dirents>(endp, in).get().at(0);
                    if (retry_out.err() != 0) {
                        LOG(ERROR, "{}() Retry failed on host '{}'. Error '{}'", 
                            __func__, targets[i], strerror(retry_out.err()));
                        err = retry_out.err();
                        continue;
                    }
                    
                    // Success on retry
                    deserialize_dirents(retry_buffer.get(), retry_out.dirents_size());
                    
                } catch (const std::exception& ex) {
                     LOG(ERROR, "{}() Retry RPC failed for host '{}'. err '{}'",
                        __func__, targets[i], ex.what());
                    err = EBUSY;
                    continue;
                }
                continue; // Done with this host
            }
            // --- RETRY LOGIC END ---

            if(out.err() != 0) {
                LOG(ERROR,
                    "{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
                    __func__, targets[i], strerror(out.err()), path);
                err = out.err();
                // We need to gather all responses before exiting
                continue;
            }
        } catch(const std::exception& ex) {
@@ -768,42 +837,16 @@ forward_get_dirents(const string& path) {
                "{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
                __func__, path, targets[i], ex.what());
            err = EBUSY;
            // We need to gather all responses before exiting
            continue;
        }

        // Standard success path
        // each server wrote information to its pre-defined region in
        // large_buffer, recover it by computing the base_address for each
        // particular server and adding the appropriate offsets
        // large_buffer
        assert(exposed_buffers[i].count() == 1);
        void* base_ptr = exposed_buffers[i].begin()->data();
        
        bool* bool_ptr = reinterpret_cast<bool*>(base_ptr);
        char* names_ptr = reinterpret_cast<char*>(base_ptr) +
                          (out.dirents_size() * sizeof(bool));
        // Add special files like an standard fs.
        open_dir->add(".", gkfs::filemap::FileType::directory);
        open_dir->add("..", gkfs::filemap::FileType::directory);
        for(std::size_t j = 0; j < out.dirents_size(); j++) {

            gkfs::filemap::FileType ftype =
                    (*bool_ptr) ? gkfs::filemap::FileType::directory
                                : gkfs::filemap::FileType::regular;
            bool_ptr++;

            // Check that we are not outside the recv_buff for this specific
            // host
            assert((names_ptr - reinterpret_cast<char*>(base_ptr)) > 0);
            assert(static_cast<unsigned long int>(
                           names_ptr - reinterpret_cast<char*>(base_ptr)) <
                   per_host_buff_size);

            auto name = std::string(names_ptr);
            // number of characters in entry + \0 terminator
            names_ptr += name.size() + 1;

            open_dir->add(name, ftype);
        }
        deserialize_dirents(base_ptr, out.dirents_size());
    }
    return make_pair(err, open_dir);
}
+2 −0
Original line number Diff line number Diff line
@@ -601,6 +601,8 @@ rpc_srv_get_dirents(hg_handle_t handle) {
        GKFS_DATA->spdlogger()->error(
                "{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
                __func__, bulk_size, out_size);
        // Return the required size to the client
        out.dirents_size = out_size;
        out.err = ENOBUFS;
        return gkfs::rpc::cleanup_respond(&handle, &in, &out);
    }