Commit 695bd442 authored by Ramon Nou's avatar Ramon Nou
Browse files

added direct read

parent 193d7801
Loading
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -84,6 +84,8 @@ static constexpr auto DATA_DISTRIBUTION_HOSTS =

static constexpr auto NUM_REPL = ADD_PREFIX("NUM_REPL");
static constexpr auto PROXY_PID_FILE = ADD_PREFIX("PROXY_PID_FILE");
static constexpr auto FUZZY_DIRECT_READ = ADD_PREFIX("FUZZY_DIRECT_READ");
static constexpr auto DAEMON_ROOTDIR = ADD_PREFIX("DAEMON_ROOTDIR");
namespace cache {
static constexpr auto DENTRY = ADD_PREFIX("DENTRY_CACHE");
static constexpr auto WRITE_SIZE = ADD_PREFIX("WRITE_SIZE_CACHE");
+3 −0
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@
#define GEKKOFS_CONFIG_HPP

#include <common/cmake_configure.hpp>
#include <string>

// environment prefixes (are concatenated in env module at compile time)
#define CLIENT_ENV_PREFIX "LIBGKFS_"
@@ -190,6 +191,8 @@ inline bool use_data_compression = false;

// Enable fuzzy relocation (optimistic local read)
inline bool fuzzy_relocation = false;
inline bool fuzzy_direct_read = false;
inline std::string daemon_rootdir = "";
} // namespace rpc

namespace rocksdb {
+5 −0
Original line number Diff line number Diff line
@@ -90,6 +90,11 @@ PreloadContext::PreloadContext()
    use_compression_ =
            gkfs::env::get_var(gkfs::env::DATA_COMPRESSION, "OFF") == "ON";

    gkfs::config::rpc::fuzzy_direct_read =
            gkfs::env::get_var(gkfs::env::FUZZY_DIRECT_READ, "OFF") == "ON";
    gkfs::config::rpc::daemon_rootdir =
            gkfs::env::get_var(gkfs::env::DAEMON_ROOTDIR, "");

    const std::string env_dirents_buff_size =
            gkfs::env::get_var(gkfs::env::DIRENTS_BUFF_SIZE);
    if(!env_dirents_buff_size.empty()) {
+64 −0
Original line number Diff line number Diff line
@@ -53,6 +53,11 @@
#include <map>
#include <iostream>

#include <sys/syscall.h>
#include <fcntl.h>
#include <unistd.h>
#include <algorithm>

using namespace std;

namespace gkfs::rpc {
@@ -424,6 +429,65 @@ forward_read(const string& path, void* buf, const off64_t offset,
       failed.empty()) {
        auto local_host_id = CTX->local_host_id();
        if(local_host_id != static_cast<unsigned int>(-1)) {
            // Direct Read Optimization
            if(gkfs::config::rpc::fuzzy_direct_read &&
               !gkfs::config::rpc::daemon_rootdir.empty()) {
                bool direct_success = true;
                size_t total_read = 0;
                auto ptr = static_cast<char*>(buf);
                auto current_offset = offset;

                for(uint64_t i = chnk_start; i <= chnk_end; ++i) {
                    // Calculate path: <daemon_rootdir>/data/chunks/<dir>/<id>
                    auto path_copy = path.substr(1);
                    std::replace(path_copy.begin(), path_copy.end(), '/', ':');
                    auto chunk_path = fmt::format(
                            "{}/data/chunks/{}/{}",
                            gkfs::config::rpc::daemon_rootdir, path_copy, i);

                    off64_t chnk_off = 0;
                    size_t chnk_sz = gkfs::config::rpc::chunksize;

                    if(i == chnk_start) {
                        chnk_off =
                                current_offset % gkfs::config::rpc::chunksize;
                        chnk_sz -= chnk_off;
                    }
                    // If it is the last chunk, we need to adjust size
                    if(i == chnk_end) {
                        size_t remaining = read_size - total_read;
                        if(remaining < chnk_sz)
                            chnk_sz = remaining;
                    }

                    long fd = ::syscall(SYS_open, chunk_path.c_str(), O_RDONLY);
                    if(fd < 0) {
                        direct_success = false;
                        break;
                    }

                    long bytes =
                            ::syscall(SYS_pread64, fd, ptr, chnk_sz, chnk_off);
                    ::syscall(SYS_close, fd);

                    if(bytes != static_cast<long>(chnk_sz)) {
                        direct_success = false;
                        break;
                    }

                    ptr += bytes;
                    total_read += bytes;
                    current_offset += bytes;
                }

                if(direct_success) {
                    // Check if we missed any bytes?
                    // Loop condition `i <= chnk_end` covers all chunks.
                    // The last chunk size adjustment ensures we read exactly
                    // `read_size`.
                    return std::make_pair(0, total_read);
                }
            }

            auto read_rpc = CTX->rpc_engine()->define(gkfs::rpc::tag::read);

+54 −0
Original line number Diff line number Diff line
@@ -98,3 +98,57 @@ def test_fuzzy_relocation(test_workspace, gkfs_client, gkfs_shell):
    finally:
        for d in daemons:
            d.shutdown()

def test_fuzzy_direct_read(test_workspace, gkfs_client, gkfs_shell):
    """
    Verify fuzzy data relocation with direct read optimization.
    Using 1 daemon to ensure 'local' is well-defined.
    """
    interface = "lo"
    daemons = []
    
    # Client's hosts file
    client_hosts_file = test_workspace.twd / "gkfs_hosts.txt"
    if client_hosts_file.exists():
        client_hosts_file.unlink()

    try:
        # Start 1 daemon
        d = CustomDaemon(interface, test_workspace, 0, 1)
        d.run()
        daemons.append(d)
        
        # Write hosts file
        client_hosts_file.write_text(d.hostfile.read_text())
        
        file01 = test_workspace.mountdir / "file01"
        
        # Configure client environment for Normal Write
        # We don't enable direct read for write (it's only for read in forward_read)
        # But we enable fuzzy relocation (base flag) just in case, though write is standard.
        gkfs_client._patched_env['GKFS_HOSTS_FILE'] = str(client_hosts_file)
        gkfs_shell._patched_env['GKFS_HOSTS_FILE'] = str(client_hosts_file)
        
        # Touch and Write
        ret_touch = gkfs_shell.touch(file01)
        assert ret_touch.exit_code == 0
        
        ret_write = gkfs_client.write_validate(file01, 1024 * 1024) # 1MB
        assert ret_write.retval == 0

        # Now Configure Direct Read
        gkfs_client._patched_env['LIBGKFS_FUZZY_RELOCATION'] = 'ON'
        gkfs_client._patched_env['LIBGKFS_FUZZY_DIRECT_READ'] = 'ON'
        # Point to the daemon's root directory for direct access
        gkfs_client._patched_env['LIBGKFS_DAEMON_ROOTDIR'] = str(d.rootdir)
        
        # Read file
        # This should trigger the direct read path in forward_data.cpp
        ret_read = gkfs_client.read(file01, 1024 * 1024)
        assert ret_read.retval == 1024 * 1024
        # We assume correctness if it returns success. 
        # (Ideally we'd trace syscalls, but checking success + correct size is good start)

    finally:
        for d in daemons:
            d.shutdown()