diff --git a/CHANGELOG.md b/CHANGELOG.md index cfa921829567a98e2a6448d0af8de72e84911f4a..ec561aad514e65f35ec826efc81e5ed80a598d16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - SYS_lstat does not exists on some architectures, change to newfstatat ([!269](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/269)) - We cannot use lstat directly as may cause a recursion call on libc interception. - Un/Packing order of directory entries in compressed format was incorrect ([!281](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/281)) + - Fix pytorch mmap ([!291](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/291)) ## [0.9.5] - 2025-08 diff --git a/include/client/gkfs_functions.hpp b/include/client/gkfs_functions.hpp index b1056c7c272ce73f9cfbc81e14a7fc39d1de13db..cdfe357c97641de1eeee485e59ad928a0349321a 100644 --- a/include/client/gkfs_functions.hpp +++ b/include/client/gkfs_functions.hpp @@ -145,6 +145,7 @@ gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count, ssize_t gkfs_pread(int fd, void* buf, size_t count, off64_t offset); + ssize_t gkfs_read(int fd, void* buf, size_t count); @@ -192,6 +193,7 @@ gkfs_munmap(void* addr, size_t length); int gkfs_msync(void* addr, size_t length, int flags); + } // namespace gkfs::syscall // gkfs_getsingleserverdir is using extern "C" to demangle it for C usage diff --git a/include/client/hooks.hpp b/include/client/hooks.hpp index 1b15b0bd5f33d342bfb56508c487d885aa2dca2e..a2951478f08e36c759401f7fad760615294d9c9b 100644 --- a/include/client/hooks.hpp +++ b/include/client/hooks.hpp @@ -135,6 +135,15 @@ ssize_t hook_pwritev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt, unsigned long pos_l, unsigned long pos_h); +ssize_t +hook_sendfile(int out_fd, int in_fd, off_t* offset, size_t count); + +#ifdef SYS_copy_file_range +ssize_t +hook_copy_file_range(int fd_in, loff_t* off_in, int fd_out, loff_t* off_out, + size_t len, unsigned int flags); +#endif + int hook_unlinkat(int dirfd, const char* cpath, int flags); diff --git a/include/client/open_file_map.hpp b/include/client/open_file_map.hpp index 8dbed3c38071d7cafb2a88a035a3cd6c0fc13910..e4b6bd6092c96d1c13ae9fb27c11a42ad9f7ef04 100644 --- a/include/client/open_file_map.hpp +++ b/include/client/open_file_map.hpp @@ -63,6 +63,7 @@ enum class OpenFile_flags { cloexec, created, // indicates if the file was created during open creation_pending, // indicates if the file creation is delayed + trunc_pending, // indicates O_TRUNC has been deferred flag_count // this is purely used as a size variable of this enum class }; diff --git a/src/client/gkfs_data.cpp b/src/client/gkfs_data.cpp index 3533ce3ef851bfae49a714ced34f24b5ce2d6d73..5632dcc0ee3eb1edaa58c677581aa3b8d30f8a11 100644 --- a/src/client/gkfs_data.cpp +++ b/src/client/gkfs_data.cpp @@ -133,7 +133,6 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, } file.set_flag(gkfs::filemap::OpenFile_flags::creation_pending, false); } - // clear inline data cache as it is stale if(!file.inline_data().empty()) file.inline_data(""); @@ -331,14 +330,19 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, ssize_t gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count, off64_t offset, bool update_pos) { + const auto pos_before = file.pos(); #ifdef GKFS_ENABLE_CLIENT_METRICS auto start_t = std::chrono::high_resolution_clock::now(); auto written = gkfs_do_write(file, buf, count, offset, update_pos); CTX->write_metrics()->add_event(written, start_t); - return written; #else - return gkfs_do_write(file, buf, count, offset, update_pos); + auto written = gkfs_do_write(file, buf, count, offset, update_pos); #endif + LOG(DEBUG, + "{}() path '{}' count {} offset {} update_pos {} written {} pos_before {} pos_after {}", + __func__, file.path(), count, offset, update_pos, written, pos_before, + file.pos()); + return written; } /** @@ -353,8 +357,10 @@ gkfs_write_ws(gkfs::filemap::OpenFile& file, const char* buf, size_t count, ssize_t gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) { auto file = CTX->file_map()->get(fd); - if(!file) - return 0; + if(!file) { + errno = EBADF; + return -1; + } return gkfs_write_ws(*file, reinterpret_cast(buf), count, offset); } @@ -370,8 +376,10 @@ gkfs_pwrite(int fd, const void* buf, size_t count, off64_t offset) { ssize_t gkfs_write(int fd, const void* buf, size_t count) { auto gkfs_fd = CTX->file_map()->get(fd); - if(!gkfs_fd) - return 0; + if(!gkfs_fd) { + errno = EBADF; + return -1; + } // call pwrite and update pos auto ret = gkfs_write_ws(*gkfs_fd, reinterpret_cast(buf), count, gkfs_fd->pos(), true); @@ -391,8 +399,10 @@ ssize_t gkfs_pwritev(int fd, const struct iovec* iov, int iovcnt, off_t offset) { auto file = CTX->file_map()->get(fd); - if(!file) - return 0; + if(!file) { + errno = EBADF; + return -1; + } auto pos = offset; // keep track of current position ssize_t written = 0; ssize_t ret; @@ -432,8 +442,10 @@ ssize_t gkfs_writev(int fd, const struct iovec* iov, int iovcnt) { auto gkfs_fd = CTX->file_map()->get(fd); - if(!gkfs_fd) - return 0; + if(!gkfs_fd) { + errno = EBADF; + return -1; + } auto pos = gkfs_fd->pos(); // retrieve the current offset auto ret = gkfs_pwritev(fd, iov, iovcnt, pos); assert(ret != 0); @@ -529,28 +541,36 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, __func__); } - pair ret; - if(gkfs::config::proxy::fwd_io && CTX->use_proxy() && - count > gkfs::config::proxy::fwd_io_count_threshold) { - ret = gkfs::rpc::forward_read_proxy(file.path(), buf, offset, count); - } else { - std::set failed; // set with failed targets. - if(CTX->get_replicas() != 0) { + auto do_chunk_read = [&]() -> std::pair { + pair ret; + if(gkfs::config::proxy::fwd_io && CTX->use_proxy() && + count > gkfs::config::proxy::fwd_io_count_threshold) { + ret = gkfs::rpc::forward_read_proxy(file.path(), buf, offset, + count); + } else { + std::set failed; // set with failed targets. + if(CTX->get_replicas() != 0) { - ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, - CTX->get_replicas(), failed); - while(ret.first == EIO) { ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, CTX->get_replicas(), failed); - LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'", - ret.first); - } + while(ret.first == EIO) { + ret = gkfs::rpc::forward_read(file.path(), buf, offset, + count, CTX->get_replicas(), + failed); + LOG(WARNING, + "gkfs::rpc::forward_read() failed with ret '{}'", + ret.first); + } - } else { - ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, 0, - failed); + } else { + ret = gkfs::rpc::forward_read(file.path(), buf, offset, count, + 0, failed); + } } - } + return ret; + }; + + auto ret = do_chunk_read(); auto err = ret.first; if(err) { LOG(WARNING, "gkfs::rpc::forward_read() failed with ret '{}'", err); @@ -594,8 +614,10 @@ gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count, ssize_t gkfs_pread(int fd, void* buf, size_t count, off64_t offset) { auto gkfs_fd = CTX->file_map()->get(fd); - if(!gkfs_fd) - return 0; + if(!gkfs_fd) { + errno = EBADF; + return -1; + } return gkfs_read_ws(*gkfs_fd, reinterpret_cast(buf), count, offset); } @@ -610,8 +632,12 @@ gkfs_pread(int fd, void* buf, size_t count, off64_t offset) { ssize_t gkfs_read(int fd, void* buf, size_t count) { auto gkfs_fd = CTX->file_map()->get(fd); - if(!gkfs_fd) - return 0; + if(!gkfs_fd) { + errno = EBADF; + return -1; + } + LOG(DEBUG, "{}() reading path '{}' count {} pos {}", __func__, + gkfs_fd->path(), count, gkfs_fd->pos()); auto pos = gkfs_fd->pos(); // retrieve the current offset auto ret = gkfs_read_ws(*gkfs_fd, reinterpret_cast(buf), count, pos); // Update offset in file descriptor in the file map @@ -634,8 +660,10 @@ ssize_t gkfs_preadv(int fd, const struct iovec* iov, int iovcnt, off_t offset) { auto file = CTX->file_map()->get(fd); - if(!file) - return 0; + if(!file) { + errno = EBADF; + return -1; + } auto pos = offset; // keep track of current position ssize_t read = 0; ssize_t ret; @@ -675,8 +703,10 @@ ssize_t gkfs_readv(int fd, const struct iovec* iov, int iovcnt) { auto gkfs_fd = CTX->file_map()->get(fd); - if(!gkfs_fd) - return 0; + if(!gkfs_fd) { + errno = EBADF; + return -1; + } auto pos = gkfs_fd->pos(); // retrieve the current offset auto ret = gkfs_preadv(fd, iov, iovcnt, pos); assert(ret != 0); @@ -691,8 +721,8 @@ int gkfs_fsync(unsigned int fd) { auto file = CTX->file_map()->get(fd); if(!file) { - errno = 0; - return 0; + errno = EBADF; + return -1; } // flush write size cache to be server consistent if(CTX->use_write_size_cache()) { diff --git a/src/client/gkfs_functions.cpp b/src/client/gkfs_functions.cpp index a7f2b0822a75f584ff193d44383ecb1497d3bd1b..8df851d28280ba64e494ff0b69073f4f8cc24bf0 100644 --- a/src/client/gkfs_functions.cpp +++ b/src/client/gkfs_functions.cpp @@ -41,6 +41,8 @@ #include #include #include +#include +#include #include #include #include @@ -55,6 +57,13 @@ #include #include #include +#include +#include +#include +#include +#include +#include +#include #include #ifdef GKFS_ENABLE_CLIENT_METRICS @@ -65,6 +74,7 @@ extern "C" { #include // used for file types in the getdents{,64}() functions #include // used for definition of alignment macros +#include #include #include #include @@ -73,18 +83,23 @@ extern "C" { using namespace std; +namespace gkfs::syscall { + namespace { -// set to store void * addr, fd, length and offset -// set to store void * addr, fd, length, offset, prot -// Protected by mmap_set_mutex for thread-safe access from parallel Python -// threads -std::set> mmap_set; -std::mutex mmap_set_mutex; +struct MmapEntry { + void* addr; + size_t length; + std::string path; + off_t offset; + int prot; + int flags; +}; -} // namespace +std::mutex mmap_mtx; +std::unordered_map mmap_registry; -namespace gkfs::syscall { +} // namespace void* gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd, @@ -93,73 +108,142 @@ gkfs_mmap(void* addr, size_t length, int prot, int flags, int fd, errno = EINVAL; return MAP_FAILED; } - void* ptr = calloc(1, length); - if(ptr == nullptr) { + + auto gkfs_fd = CTX->file_map()->get(fd); + if(!gkfs_fd) { + return ::mmap(addr, length, prot, flags, fd, offset); + } + + std::string path = gkfs_fd->path(); + + // Allocate new anonymous, SHARED mapping so child processes inherit it if + // needed (MAP_SHARED | MAP_ANONYMOUS preserves it across fork if possible) + int map_flags = + MAP_ANONYMOUS | (flags & (MAP_SHARED | MAP_PRIVATE | MAP_FIXED)); + void* ptr = ::mmap(addr, length, prot | PROT_READ | PROT_WRITE, map_flags, + -1, 0); + if(ptr == MAP_FAILED) { return MAP_FAILED; } - // Pre-populate the buffer from the GekkoFS file (read-write mapping) - auto ret = gkfs::syscall::gkfs_pread(fd, ptr, length, offset); - if(ret == -1) { - free(ptr); + + // fault memory for RDMA pinning + std::memset(ptr, 0, length); + + // Call pread WITHOUT the lock so Mercury network threads do not deadlock + const auto seeded = gkfs_pread(fd, ptr, length, offset); + if(seeded < 0) { + ::munmap(ptr, length); return MAP_FAILED; } - // Register mapping under lock so concurrent threads don't race on mmap_set + + // Restrict protections if NOT requested to have write + if((prot & PROT_WRITE) == 0 && prot != (prot | PROT_READ | PROT_WRITE)) { + ::mprotect(ptr, length, prot); + } + + // Re-acquire lock to insert mapping { - std::lock_guard lock(mmap_set_mutex); - mmap_set.insert(std::make_tuple(ptr, fd, length, offset, prot)); + std::lock_guard lock(mmap_mtx); + mmap_registry[ptr] = {ptr, length, path, offset, prot, flags}; } + return ptr; } int -// cppcheck-suppress constParameterPointer gkfs_msync(void* addr, size_t length, int flags) { - std::lock_guard lock(mmap_set_mutex); - // Find by start address; msync may pass a sub-range length so we use the - // full mapping length stored in mmap_set. - auto it = std::find_if( - mmap_set.begin(), mmap_set.end(), - [addr](const auto& t) { return std::get<0>(t) == addr; }); - - if(it != mmap_set.end()) { - int fd = std::get<1>(*it); - size_t map_length = std::get<2>(*it); // use stored length, not caller's - off_t offset = std::get<3>(*it); - int prot = std::get<4>(*it); - if(prot & PROT_WRITE) { - gkfs::syscall::gkfs_pwrite(fd, addr, map_length, offset); + if(length == 0) { + errno = EINVAL; + return -1; + } + + std::string write_path; + void* write_addr = nullptr; + size_t write_len = 0; + off_t write_off = 0; + bool do_writeback = false; + + { + std::lock_guard lock(mmap_mtx); + auto it = mmap_registry.find(addr); + if(it != mmap_registry.end()) { + auto& entry = it->second; + + if((entry.prot & PROT_WRITE) && (entry.flags & MAP_SHARED)) { + do_writeback = true; + write_path = entry.path; + write_addr = entry.addr; + write_len = entry.length; + write_off = entry.offset; + } + } else { + return 0; // Not tracked } - return 0; } - return -1; -} + // Perform writeback without holding registry lock + if(do_writeback) { + int fd = gkfs_open(write_path, 0, O_WRONLY); + if(fd >= 0) { + gkfs_pwrite(fd, write_addr, write_len, write_off); + gkfs_close(fd); + } else { + LOG(ERROR, "{}() failed to open file for msync writeback: {}", + __func__, write_path); + } + } + + return 1; +} int gkfs_munmap(void* addr, size_t length) { - std::unique_lock lock(mmap_set_mutex); - auto it = std::find_if( - mmap_set.begin(), mmap_set.end(), - [&addr](const std::tuple& t) { - return std::get<0>(t) == addr; - }); - if(it != mmap_set.end()) { - int fd = std::get<1>(*it); - size_t map_length = std::get<2>(*it); - off_t offset = std::get<3>(*it); - int prot = std::get<4>(*it); - // Flush dirty pages back before freeing - if(prot & PROT_WRITE) { - gkfs::syscall::gkfs_pwrite(fd, addr, map_length, offset); + if(length == 0) { + errno = EINVAL; + return -1; + } + + std::string write_path; + void* write_addr = nullptr; + size_t write_len = 0; + off_t write_off = 0; + bool do_writeback = false; + + { + std::lock_guard lock(mmap_mtx); + auto it = mmap_registry.find(addr); + if(it != mmap_registry.end()) { + auto& entry = it->second; + + if((entry.prot & PROT_WRITE) && (entry.flags & MAP_SHARED)) { + do_writeback = true; + write_path = entry.path; + write_addr = entry.addr; + write_len = entry.length; + write_off = entry.offset; + } + + mmap_registry.erase(it); + } else { + return 0; // Not tracked } - mmap_set.erase(it); - lock.unlock(); // release lock before free to avoid holding it longer - free(addr); - return 0; } - return -1; -} + // Perform writeback exactly ONCE without holding the lock + if(do_writeback) { + int fd = gkfs_open(write_path, 0, O_WRONLY); + if(fd >= 0) { + gkfs_pwrite(fd, write_addr, write_len, write_off); + gkfs_close(fd); + } else { + LOG(ERROR, "{}() failed to open file for munmap writeback: {}", + __func__, write_path); + } + } + + ::munmap(addr, length); + return 1; +} int gkfs_utimensat(const std::string& path, const struct timespec times[2]) { @@ -225,4 +309,4 @@ gkfs_getsingleserverdir_filtered(const std::string& path, int server, path, server, start_key, filter_name, filter_size, filter_ctime); } -} // namespace gkfs::syscall \ No newline at end of file +} // namespace gkfs::syscall diff --git a/src/client/gkfs_libc.cpp b/src/client/gkfs_libc.cpp index b65c9b976a89a70a0689e5c753f46df4ce4189c0..ad1c443bd062118e62f5f005132dbc2a1f8a4311 100644 --- a/src/client/gkfs_libc.cpp +++ b/src/client/gkfs_libc.cpp @@ -42,9 +42,10 @@ #include #include #include -#include // For AT_FDCWD, O_* flags etc. -#include // For DIR, struct dirent -#include // For unshare, close, etc. +#include // For AT_FDCWD, O_* flags etc. +#include // For DIR, struct dirent +#include // For unshare, close, etc. +#include #include // For mode_t, struct stat #include // For off_t, ssize_t, etc. @@ -60,6 +61,8 @@ #include // For std::remove_if in get_open_fds (commented out) #include // For strcpy, strchr, strlen #include // For malloc, free, qsort +#include +#include // Linux Specific (consider guarding if portability is a high concern) #include // For _STAT_VER (might be glibc specific, ensure availability) @@ -77,6 +80,8 @@ #include #include +// Forward declaration removed + //========================= Global Atomics and Variables //=======================// @@ -276,20 +281,38 @@ static ResultEntry* results = nullptr; //===================================// void -gkfs_init_routine_placeholder() {} +gkfs_init_routine_placeholder() { + // no-op: init_preload() is invoked as a constructor + // (__attribute__((constructor))) Calling init_preload() here causes every + // intercepted libc function to re-enter the initialization body + // (atomic_exchange only guards pthread_atfork). +} void log_arguments(const char* symbol) { DEBUG_INFO("[BYPASS] {}", symbol); } +// Helper to safely handle null pointers for logging +template +const T& +safe_log_arg(const T& arg) { + return arg; +} + +static const char* +safe_log_arg(const char* arg) { + return arg ? arg : "(null)"; +} + // Variadic case: 1+ arguments template void log_arguments(const char* symbol, Args&&... args) { std::stringstream ss; ss << "[BYPASS] Calling " << symbol << " with arguments: "; - ((ss << "[" << typeid(Args).name() << "] " << args << " "), ...); + ((ss << "[" << typeid(Args).name() << "] " << safe_log_arg(args) << " "), + ...); DEBUG_INFO("{}", ss.str()); } @@ -299,13 +322,17 @@ void log_argumentsx(const char* symbol, Args&&... args) { std::stringstream ss; ss << "[BYPASS-ERROR] Calling " << symbol << " with arguments: "; - ((ss << "[" << typeid(Args).name() << "] " << args << " "), ...); + ((ss << "[" << typeid(Args).name() << "] " << safe_log_arg(args) << " "), + ...); DEBUG_INFO("{}", ss.str()); } // Convert stat to stat64 static void convert(struct stat* src, struct stat64* dest) { + if(dest == nullptr || src == nullptr) { + return; + } dest->st_dev = static_cast<__dev_t>(src->st_dev); dest->st_ino = static_cast<__ino64_t>(src->st_ino); dest->st_mode = static_cast<__mode_t>(src->st_mode); @@ -355,6 +382,68 @@ is_gkfs_fd(int fd) { return CTX->file_map()->exist(fd); } +constexpr size_t k_fd_copy_chunk_size = 1UL * 1024UL * 1024UL; + +static ssize_t +copy_between_fds(int out_fd, int in_fd, off64_t* in_off, off64_t* out_off, + size_t count) { + if(count == 0) { + return 0; + } + + std::vector buffer(std::min(count, k_fd_copy_chunk_size)); + size_t total = 0; + off64_t in_pos = in_off ? *in_off : 0; + off64_t out_pos = out_off ? *out_off : 0; + + while(total < count) { + const auto chunk = std::min(buffer.size(), count - total); + const ssize_t nread = + in_off ? pread64(in_fd, buffer.data(), chunk, in_pos) + : read(in_fd, buffer.data(), chunk); + if(nread < 0) { + return total > 0 ? static_cast(total) : -1; + } + if(nread == 0) { + break; + } + + if(in_off) { + in_pos += static_cast(nread); + } + + size_t consumed = 0; + while(consumed < static_cast(nread)) { + const ssize_t nwrite = + out_off ? pwrite64(out_fd, buffer.data() + consumed, + static_cast(nread) - consumed, + out_pos) + : write(out_fd, buffer.data() + consumed, + static_cast(nread) - consumed); + if(nwrite < 0) { + return total > 0 ? static_cast(total) : -1; + } + if(nwrite == 0) { + errno = EIO; + return total > 0 ? static_cast(total) : -1; + } + consumed += static_cast(nwrite); + if(out_off) { + out_pos += static_cast(nwrite); + } + } + total += consumed; + } + + if(in_off) { + *in_off = in_pos; + } + if(out_off) { + *out_off = out_pos; + } + return static_cast(total); +} + struct GkfsDir { // Hypothetical structure that might be used if DIR is cast int fd; char* path; @@ -490,6 +579,19 @@ DLSYM_WRAPPER(ssize_t, pwritev2, (int fd, const struct iovec* iov, int iovcnt, off_t offset, int flags), (fd, iov, iovcnt, offset, flags), "pwritev2") +DLSYM_WRAPPER(ssize_t, sendfile, + (int out_fd, int in_fd, off_t* offset, size_t count), + (out_fd, in_fd, offset, count), "sendfile") +#if defined(__USE_LARGEFILE64) || defined(_LARGEFILE64_SOURCE) || \ + defined(__linux__) +DLSYM_WRAPPER(ssize_t, sendfile64, + (int out_fd, int in_fd, off64_t* offset, size_t count), + (out_fd, in_fd, offset, count), "sendfile64") +#endif +DLSYM_WRAPPER(ssize_t, copy_file_range, + (int fd_in, off64_t* off_in, int fd_out, off64_t* off_out, + size_t len, unsigned int flags), + (fd_in, off_in, fd_out, off_out, len, flags), "copy_file_range") DLSYM_WRAPPER(off_t, lseek, (int fd, off_t offset, int whence), (fd, offset, whence), "lseek") DLSYM_WRAPPER(off64_t, lseek64, (int fd, off64_t offset, int whence), @@ -846,6 +948,7 @@ close_range(unsigned int low, unsigned int high, int flags) { ssize_t read(int fd, void* buf, size_t nbyte) { gkfs_init_routine_placeholder(); + DEBUG_INFO("read(fd={}, nbyte={})", fd, nbyte); GKFS_OPERATION(read, fd, buf, nbyte); GKFS_FALLBACK(read, fd, buf, nbyte); } @@ -860,6 +963,7 @@ write(int fd, const void* buf, size_t nbyte) { ssize_t pread(int fd, void* buf, size_t count, off_t offset) { gkfs_init_routine_placeholder(); + DEBUG_INFO("pread(fd={}, count={}, offset={})", fd, count, offset); GKFS_OPERATION(pread, fd, buf, count, offset); GKFS_FALLBACK(pread, fd, buf, count, offset); } @@ -874,6 +978,7 @@ pwrite(int fd, const void* buf, size_t count, off_t offset) { ssize_t pread64(int fd, void* buf, size_t count, off64_t offset) { gkfs_init_routine_placeholder(); + DEBUG_INFO("pread64(fd={}, count={}, offset={})", fd, count, offset); GKFS_OPERATION(pread, fd, buf, count, offset); // GekkoFS pread likely handles large offsets GKFS_FALLBACK(pread64, fd, buf, count, offset); @@ -933,6 +1038,65 @@ pwritev2(int fd, const struct iovec* iov, int iovcnt, off_t offset, int flags) { GKFS_FALLBACK(pwritev2, fd, iov, iovcnt, offset, flags) } +ssize_t +sendfile(int out_fd, int in_fd, off_t* offset, size_t count) { + gkfs_init_routine_placeholder(); + if(CTX->interception_enabled() && + (is_gkfs_fd(in_fd) || is_gkfs_fd(out_fd))) { + off64_t in_off = offset ? static_cast(*offset) : 0; + const auto ret = copy_between_fds( + out_fd, in_fd, offset ? &in_off : nullptr, nullptr, count); + if(ret >= 0 && offset) { + *offset = static_cast(in_off); + } + return ret; + } + GKFS_FALLBACK(sendfile, out_fd, in_fd, offset, count); +} + +#if defined(__USE_LARGEFILE64) || defined(_LARGEFILE64_SOURCE) || \ + defined(__linux__) +ssize_t +sendfile64(int out_fd, int in_fd, off64_t* offset, size_t count) { + gkfs_init_routine_placeholder(); + if(CTX->interception_enabled() && + (is_gkfs_fd(in_fd) || is_gkfs_fd(out_fd))) { + const auto ret = + copy_between_fds(out_fd, in_fd, offset, nullptr, count); + return ret; + } + GKFS_FALLBACK(sendfile64, out_fd, in_fd, offset, count); +} +#endif + +ssize_t +copy_file_range(int fd_in, off64_t* off_in, int fd_out, off64_t* off_out, + size_t len, unsigned int flags) { + gkfs_init_routine_placeholder(); + if(CTX->interception_enabled() && + (is_gkfs_fd(fd_in) || is_gkfs_fd(fd_out))) { + if(flags != 0) { + errno = EINVAL; + return -1; + } + off64_t in_off = off_in ? *off_in : 0; + off64_t out_off = off_out ? *off_out : 0; + const auto ret = + copy_between_fds(fd_out, fd_in, off_in ? &in_off : nullptr, + off_out ? &out_off : nullptr, len); + if(ret >= 0) { + if(off_in) { + *off_in = in_off; + } + if(off_out) { + *off_out = out_off; + } + } + return ret; + } + GKFS_FALLBACK(copy_file_range, fd_in, off_in, fd_out, off_out, len, flags); +} + off_t lseek(int fd, off_t offset, int whence) { gkfs_init_routine_placeholder(); @@ -1366,7 +1530,9 @@ version int mkdir(const char* path, mode_t mode) { gkfs_init_routine_placeholder(); - DEBUG_INFO("[MKDIR] Attempting to mkdir: {}", path); + if(path != nullptr) { + DEBUG_INFO("[MKDIR] Attempting to mkdir: {}", path); + } if(CTX->interception_enabled()) { std::string resolved; switch(resolve_gkfs_path(AT_FDCWD, path, resolved)) { @@ -2510,32 +2676,38 @@ fcntl(int fd, int cmd, ...) { void* mmap(void* addr, size_t length, int prot, int flags, int fd, off_t offset) { gkfs_init_routine_placeholder(); - // If fd is GekkoFS fd, GekkoFS needs to provide mmap support. - // This is complex: requires GekkoFS to manage memory regions or map its - // data. - GKFS_OPERATION(mmap, addr, length, prot, flags, fd, - offset); // gkfs_mmap - GKFS_FALLBACK(mmap, addr, length, prot, flags, fd, offset); + // Route GekkoFS fds through gkfs_mmap (calloc + pread based). + GKFS_OPERATION(mmap, addr, length, prot, flags, fd, offset); + // For non-GekkoFS fds use the raw kernel syscall to avoid any possible + // infinite recursion through dlsym(RTLD_NEXT, "mmap") on glibc builds + // where mmap/mmap64 are weak aliases pointing back to our interposer. + void* ret = reinterpret_cast( + syscall(SYS_mmap, addr, length, prot, flags, fd, offset)); + if(ret == reinterpret_cast(-1)) { + return MAP_FAILED; + } + return ret; } // mmap64 is the large-file alias for mmap on Linux x86_64. // Python's built-in mmap module and NumPy memmap call this variant directly, // bypassing plain mmap(). Without this interceptor, GekkoFS fds fall through // to the kernel and return ENXIO. -// -// IMPORTANT: We intentionally fall back to dlsym_mmap (not dlsym_mmap64) for -// non-GekkoFS fds. On 64-bit Linux, mmap64 is implemented as an alias of mmap -// in glibc — dlsym(RTLD_NEXT, "mmap64") can therefore resolve back to our own -// mmap64 interposer, causing infinite recursion → stack overflow → SIGSEGV. #if defined(__USE_LARGEFILE64) || defined(_LARGEFILE64_SOURCE) || \ defined(__linux__) void* mmap64(void* addr, size_t length, int prot, int flags, int fd, off_t offset) { gkfs_init_routine_placeholder(); - // Delegate to gkfs_mmap when fd belongs to GekkoFS, otherwise fallback. + // Route GekkoFS fds through gkfs_mmap. GKFS_OPERATION(mmap, addr, length, prot, flags, fd, offset); - // Fall back via the plain mmap dlsym wrapper — avoids infinite recursion. - GKFS_FALLBACK(mmap, addr, length, prot, flags, fd, offset); + // Same direct syscall fallback — mmap and mmap64 issue the same SYS_mmap + // on 64-bit Linux. Avoids the dlsym recursion risk entirely. + void* ret = reinterpret_cast( + syscall(SYS_mmap, addr, length, prot, flags, fd, offset)); + if(ret == reinterpret_cast(-1)) { + return MAP_FAILED; + } + return ret; } #endif @@ -2590,22 +2762,26 @@ fopen(const char* path, const char* mode) { int flags = 0; mode_t open_mode = 0666; // Default mode for creation - // Simplified mode parsing (from original, needs to be robust) - if(strchr(mode, 'a')) { - flags = O_WRONLY | O_CREAT | O_APPEND; - } else if(strchr(mode, 'w')) { - flags = O_WRONLY | O_CREAT | O_TRUNC; - } else if(strchr(mode, 'r')) { + // Handle fopen modes correctly according to POSIX + if(mode[0] == 'r') { flags = O_RDONLY; + } else if(mode[0] == 'w') { + flags = O_WRONLY | O_CREAT | O_TRUNC; + } else if(mode[0] == 'a') { + flags = O_WRONLY | O_CREAT | O_APPEND; } else { errno = EINVAL; return nullptr; - } // Invalid mode start + } - if(strchr(mode, '+')) { // r+, w+, a+ - flags &= ~(O_RDONLY | O_WRONLY); // Clear O_RDONLY/O_WRONLY - // if set by r/w/a - flags |= O_RDWR; + // Handle '+' for read/write + if(strchr(mode, '+')) { + flags = (flags & ~(O_RDONLY | O_WRONLY)) | O_RDWR; + } + + // Handle 'x' for exclusive creation (if GekkoFS supports it) + if(strchr(mode, 'x')) { + flags |= O_EXCL; } // 'b' (binary) is ignored on POSIX for open() flags. // 'x' (O_EXCL) could be handled if GekkoFS open supports it. @@ -2857,7 +3033,7 @@ fputs(const char* str, FILE* stream) { // stream->_flags |= _IO_ERR_SEEN; return EOF; } - return 0; // Success (non-negative value for fputs) + return 0; // Only one active write mapping per path expected } GKFS_FALLBACK(fputs, str, stream); } @@ -2999,7 +3175,9 @@ aio_error(const struct aiocb* aiocbp) { int mkstemp(char* templates) { gkfs_init_routine_placeholder(); - DEBUG_INFO("[BYPASS] mkstemp(template='{}')", templates); + if(templates != nullptr) { + DEBUG_INFO("[BYPASS] mkstemp(template='{}')", templates); + } GKFS_FALLBACK(mkstemp, templates); } @@ -3171,4 +3349,4 @@ _ZNSt10filesystem10remove_allERKNS_7__cxx114pathE( } } return real_std_fs_remove_all_ptr(p); -} \ No newline at end of file +} diff --git a/src/client/gkfs_metadata.cpp b/src/client/gkfs_metadata.cpp index 13b71fa84413df5516b4b982b3a6dea792de474d..4530885aaae2777b85fb5050458588713fcffd93 100644 --- a/src/client/gkfs_metadata.cpp +++ b/src/client/gkfs_metadata.cpp @@ -56,6 +56,7 @@ #include #include #include +#include #include #ifdef GKFS_ENABLE_CLIENT_METRICS @@ -330,9 +331,10 @@ gkfs_open(const std::string& path, mode_t mode, int flags) { md.inline_data(""); } // RENAMED OR SYMLINK NOT PROTECTED - return CTX->file_map()->add( - std::make_shared(new_path, - flags)); + auto file = std::make_shared(new_path, + flags); + auto fd = CTX->file_map()->add(file); + return fd; } } } @@ -517,6 +519,7 @@ gkfs_remove(const std::string& path) { errno = err; return -1; } + return 0; } @@ -664,7 +667,7 @@ gkfs_rename(const string& old_path, const string& new_path) { int gkfs_stat(const string& path, struct stat* buf, bool follow_links, bool bypass_rename) { - if(CTX->use_write_size_cache()) { + if(CTX->use_write_size_cache() && CTX->write_size_cache()) { auto err = CTX->write_size_cache()->flush(path, true).first; if(err) { LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__, @@ -724,7 +727,7 @@ gkfs_stat(const string& path, struct stat* buf, bool follow_links, int gkfs_statx(int dirfs, const std::string& path, int flags, unsigned int mask, struct statx* buf, bool follow_links) { - if(CTX->use_write_size_cache()) { + if(CTX->use_write_size_cache() && CTX->write_size_cache()) { auto err = CTX->write_size_cache()->flush(path, true).first; if(err) { LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__, @@ -869,9 +872,14 @@ gkfs_statvfs(struct statvfs* buf) { * @param whence * @return 0 on success, -1 on failure */ -off_t -gkfs_lseek(unsigned int fd, off_t offset, unsigned int whence) { - return gkfs_lseek(CTX->file_map()->get(fd), offset, whence); +off64_t +gkfs_lseek(unsigned int fd, off64_t offset, unsigned int whence) { + auto gkfs_fd = CTX->file_map()->get(fd); + if(!gkfs_fd) { + errno = EBADF; + return -1; + } + return gkfs_lseek(gkfs_fd, offset, whence); } /** @@ -902,7 +910,7 @@ gkfs_lseek(shared_ptr gkfs_fd, off_t offset, gkfs_fd->pos(gkfs_fd->pos() + offset); break; case SEEK_END: { - if(CTX->use_write_size_cache()) { + if(CTX->use_write_size_cache() && CTX->write_size_cache()) { CTX->write_size_cache()->flush(gkfs_fd->path()); } @@ -1019,7 +1027,7 @@ gkfs_truncate(const std::string& path, off_t length) { return -1; } - if(CTX->use_write_size_cache()) { + if(CTX->use_write_size_cache() && CTX->write_size_cache()) { auto err = CTX->write_size_cache()->flush(path, true).first; if(err) { LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__, @@ -1765,15 +1773,17 @@ int gkfs_close(unsigned int fd) { auto file = CTX->file_map()->get(fd); if(file) { + const auto path = file->path(); + if(file->get_flag(gkfs::filemap::OpenFile_flags::creation_pending)) { - gkfs_create(file->path(), file->mode()); + gkfs_create(path, file->mode()); file->set_flag(gkfs::filemap::OpenFile_flags::creation_pending, false); } // flush write size cache to be server consistent - if(CTX->use_write_size_cache()) { - auto err = CTX->write_size_cache()->flush(file->path(), true).first; + if(CTX->use_write_size_cache() && CTX->write_size_cache()) { + auto err = CTX->write_size_cache()->flush(path, true).first; if(err) { LOG(ERROR, "{}() write_size_cache() failed with err '{}'", __func__, err); @@ -1784,17 +1794,15 @@ gkfs_close(unsigned int fd) { if(CTX->use_dentry_cache() && gkfs::config::cache::clear_dentry_cache_on_close) { // clear cache for directory - if(CTX->file_map()->get(fd)->type() == - gkfs::filemap::FileType::directory) { - CTX->dentry_cache()->clear_dir( - CTX->file_map()->get(fd)->path()); + if(file->type() == gkfs::filemap::FileType::directory) { + CTX->dentry_cache()->clear_dir(path); } } if(CTX->protect_files_generator()) { - auto path = CTX->file_map()->get(fd)->path(); generate_lock_file(path, false); } + // No call to the daemon is required CTX->file_map()->remove(fd); return 0; diff --git a/src/client/hooks.cpp b/src/client/hooks.cpp index 9fb5faac360cff0ff0a9355b324b1f6d00dd4d1b..b6f37e7980ffb1c9c162e06575ed6eb0741fd034 100644 --- a/src/client/hooks.cpp +++ b/src/client/hooks.cpp @@ -47,7 +47,9 @@ #include +#include #include +#include #include @@ -67,6 +69,98 @@ with_errno(T ret) { return (ret < 0) ? -errno : ret; } +constexpr size_t k_fd_copy_chunk_size = 1UL * 1024UL * 1024UL; + +ssize_t +copy_between_fds(int out_fd, int in_fd, off64_t* in_off, off64_t* out_off, + size_t count) { + auto in_file = CTX->file_map()->get(in_fd); + auto out_file = CTX->file_map()->get(out_fd); + + std::vector buffer(std::min(count, k_fd_copy_chunk_size)); + size_t total = 0; + off64_t in_pos = in_off ? *in_off : 0; + off64_t out_pos = out_off ? *out_off : 0; + + auto read_once = [&](char* dst, size_t len, off64_t pos) -> ssize_t { + if(in_off) { + if(in_file) { + auto ret = gkfs::syscall::gkfs_pread(in_fd, dst, len, pos); + return ret < 0 ? -errno : ret; + } + return syscall_no_intercept_wrapper(SYS_pread64, in_fd, dst, len, + pos); + } + + if(in_file) { + auto ret = gkfs::syscall::gkfs_read(in_fd, dst, len); + return ret < 0 ? -errno : ret; + } + return syscall_no_intercept_wrapper(SYS_read, in_fd, dst, len); + }; + + auto write_once = [&](const char* src, size_t len, off64_t pos) -> ssize_t { + if(out_off) { + if(out_file) { + auto ret = gkfs::syscall::gkfs_pwrite(out_fd, src, len, pos); + return ret < 0 ? -errno : ret; + } + return syscall_no_intercept_wrapper(SYS_pwrite64, out_fd, src, len, + pos); + } + + if(out_file) { + auto ret = gkfs::syscall::gkfs_write(out_fd, src, len); + return ret < 0 ? -errno : ret; + } + return syscall_no_intercept_wrapper(SYS_write, out_fd, src, len); + }; + + while(total < count) { + const auto chunk = std::min(buffer.size(), count - total); + const auto nread = read_once(buffer.data(), chunk, in_pos); + if(nread < 0) { + return total > 0 ? static_cast(total) : nread; + } + if(nread == 0) { + break; // EOF + } + + if(in_off) { + in_pos += static_cast(nread); + } + + size_t consumed = 0; + while(consumed < static_cast(nread)) { + const auto nwrite = + write_once(buffer.data() + consumed, + static_cast(nread) - consumed, out_pos); + if(nwrite < 0) { + return total > 0 ? static_cast(total) : nwrite; + } + if(nwrite == 0) { + errno = EIO; + return total > 0 ? static_cast(total) : -EIO; + } + consumed += static_cast(nwrite); + if(out_off) { + out_pos += static_cast(nwrite); + } + } + + total += consumed; + } + + if(in_off) { + *in_off = in_pos; + } + if(out_off) { + *out_off = out_pos; + } + + return static_cast(total); +} + } // namespace namespace gkfs::hook { @@ -216,8 +310,8 @@ hook_fstat(unsigned int fd, struct stat* buf) { return -EFAULT; } - if(CTX->file_map()->exist(fd)) { - auto path = CTX->file_map()->get(fd)->path(); + if(auto file = CTX->file_map()->get(fd)) { + auto path = file->path(); if(gkfs::config::metadata::rename_support) { // Special case for fstat and rename, fd points to new file... // We can change file_map and recall @@ -277,7 +371,9 @@ hook_read(unsigned int fd, void* buf, size_t count) { return -EFAULT; } - if(CTX->file_map()->exist(fd)) { + if(auto file = CTX->file_map()->get(fd)) { + LOG(DEBUG, "{}() fd {} handled by GKFS path '{}'", __func__, fd, + file->path()); return with_errno(gkfs::syscall::gkfs_read(fd, buf, count)); } return syscall_no_intercept_wrapper(SYS_read, fd, buf, count); @@ -334,7 +430,9 @@ hook_write(unsigned int fd, const char* buf, size_t count) { return -EFAULT; } - if(CTX->file_map()->exist(fd)) { + if(auto file = CTX->file_map()->get(fd)) { + LOG(DEBUG, "{}() fd {} handled by GKFS path '{}'", __func__, fd, + file->path()); return with_errno(gkfs::syscall::gkfs_write(fd, buf, count)); } return syscall_no_intercept_wrapper(SYS_write, fd, buf, count); @@ -382,11 +480,73 @@ hook_pwritev(unsigned long fd, const struct iovec* iov, unsigned long iovcnt, return syscall_no_intercept_wrapper(SYS_pwritev, fd, iov, iovcnt, pos_l); } +ssize_t +hook_sendfile(int out_fd, int in_fd, off_t* offset, size_t count) { + LOG(DEBUG, "{}() called with out_fd: {}, in_fd: {}, offset: {}, count: {}", + __func__, out_fd, in_fd, fmt::ptr(offset), count); + + auto in_file = CTX->file_map()->get(in_fd); + auto out_file = CTX->file_map()->get(out_fd); + if(!in_file && !out_file) { +#ifdef SYS_sendfile + return syscall_no_intercept_wrapper(SYS_sendfile, out_fd, in_fd, offset, + count); +#else + return -ENOSYS; +#endif + } + + off64_t in_off = offset ? static_cast(*offset) : 0; + auto ret = copy_between_fds(out_fd, in_fd, offset ? &in_off : nullptr, + nullptr, count); + if(ret >= 0 && offset) { + *offset = static_cast(in_off); + } + return ret; +} + +#ifdef SYS_copy_file_range +ssize_t +hook_copy_file_range(int fd_in, loff_t* off_in, int fd_out, loff_t* off_out, + size_t len, unsigned int flags) { + LOG(DEBUG, + "{}() called with fd_in: {}, off_in: {}, fd_out: {}, off_out: {}, len: {}, flags: {}", + __func__, fd_in, fmt::ptr(off_in), fd_out, fmt::ptr(off_out), len, + flags); + + auto in_file = CTX->file_map()->get(fd_in); + auto out_file = CTX->file_map()->get(fd_out); + if(!in_file && !out_file) { + return syscall_no_intercept_wrapper(SYS_copy_file_range, fd_in, off_in, + fd_out, off_out, len, flags); + } + if(flags != 0) { + return -EINVAL; + } + + off64_t in_off = off_in ? static_cast(*off_in) : 0; + off64_t out_off = off_out ? static_cast(*off_out) : 0; + auto ret = copy_between_fds(fd_out, fd_in, off_in ? &in_off : nullptr, + off_out ? &out_off : nullptr, len); + if(ret >= 0) { + if(off_in) { + *off_in = static_cast(in_off); + } + if(off_out) { + *off_out = static_cast(out_off); + } + } + return ret; +} +#endif + int hook_unlinkat(int dirfd, const char* cpath, int flags) { - LOG(DEBUG, "{}() called with dirfd: {}, path: \"{}\", flags: {}", __func__, - dirfd, cpath, flags); + if(cpath != nullptr) { + LOG(DEBUG, "{}() called with dirfd: {}, path: \"{}\", flags: {}", + __func__, dirfd, cpath, flags); + } if((flags & ~AT_REMOVEDIR) != 0) { LOG(ERROR, "{}() Flags unknown: {}", __func__, flags); @@ -538,9 +698,11 @@ hook_faccessat(int dirfd, const char* cpath, int mode) { int hook_faccessat2(int dirfd, const char* cpath, int mode, int flags) { - LOG(DEBUG, - "{}() called with dirfd: '{}', path: '{}', mode: '{}', flags: '{}'", - __func__, dirfd, cpath, mode, flags); + if(cpath != nullptr) { + LOG(DEBUG, + "{}() called with dirfd: '{}', path: '{}', mode: '{}', flags: '{}'", + __func__, dirfd, cpath, mode, flags); + } std::string resolved; auto rstatus = CTX->relativize_fd_path(dirfd, cpath, resolved); @@ -743,12 +905,18 @@ hook_fchmod(unsigned int fd, mode_t mode) { int hook_chmod(const char* path, mode_t mode) { + if(path == nullptr) { + return -EFAULT; + } LOG(DEBUG, "{}() called with path: \"{}\", mode: {}", __func__, path, mode); return 0; } int hook_lchown(const char* path, uid_t owner, gid_t group) { + if(path == nullptr) { + return -EFAULT; + } LOG(DEBUG, "{}() called with path: \"{}\", owner: {}, group: {}", __func__, path, owner, group); @@ -768,6 +936,9 @@ hook_lchown(const char* path, uid_t owner, gid_t group) { int hook_chown(const char* path, uid_t owner, gid_t group) { + if(path == nullptr) { + return -EFAULT; + } LOG(DEBUG, "{}() called with path: \"{}\", owner: {}, group: {}", __func__, path, owner, group); @@ -801,10 +972,12 @@ hook_fchown(unsigned int fd, uid_t owner, gid_t group) { int hook_fchownat(int dirfd, const char* cpath, uid_t owner, gid_t group, int flags) { - LOG(DEBUG, - "{}() called with dirfd: {}, path: \"{}\", owner: {}, group: {}, " - "flags: '{}'", - __func__, dirfd, cpath, owner, group, flags); + if(cpath != nullptr) { + LOG(DEBUG, + "{}() called with dirfd: {}, path: \"{}\", owner: {}, group: {}, " + "flags: '{}'", + __func__, dirfd, cpath, owner, group, flags); + } std::string resolved; // Force follow to true for resolution check to ensure we find the object @@ -1331,7 +1504,9 @@ hook_mmap(void* addr, size_t length, int prot, int flags, int fd, "{}() called with addr '{}' length '{}' prot '{}' flags '{}' fd '{}' offset '{}'", __func__, fmt::ptr(addr), length, prot, flags, fd, offset); - if(CTX->file_map()->exist(fd)) { + if(auto file = CTX->file_map()->get(fd)) { + LOG(DEBUG, "{}() fd {} handled by GKFS path '{}'", __func__, fd, + file->path()); return gkfs::syscall::gkfs_mmap(addr, length, prot, flags, fd, offset); } return reinterpret_cast(syscall_no_intercept_wrapper( @@ -1343,9 +1518,12 @@ hook_munmap(void* addr, size_t length) { LOG(DEBUG, "{}() called with addr '{}' length '{}'", __func__, fmt::ptr(addr), length); - auto res = gkfs::syscall::gkfs_munmap(addr, length); - if(res == 0) - return res; + int res = gkfs::syscall::gkfs_munmap(addr, length); + if(res == 1) { + return 0; + } else if(res == -1) { + return -1; + } return syscall_no_intercept_wrapper(SYS_munmap, addr, length); } @@ -1354,9 +1532,12 @@ hook_msync(void* addr, size_t length, int flags) { LOG(DEBUG, "{}() called with addr '{}' length '{}' flags '{}'", __func__, fmt::ptr(addr), length, flags); - auto res = gkfs::syscall::gkfs_msync(addr, length, flags); - if(res == 0) - return res; + int res = gkfs::syscall::gkfs_msync(addr, length, flags); + if(res == 1) { + return 0; + } else if(res == -1) { + return -1; + } return syscall_no_intercept_wrapper(SYS_msync, addr, length, flags); } diff --git a/src/client/intercept.cpp b/src/client/intercept.cpp index ee6797ca61f54dbabf30fac72feb18585915365e..0cf78d905f677ca5fac1b6c336f0dc7c114ca913 100644 --- a/src/client/intercept.cpp +++ b/src/client/intercept.cpp @@ -695,6 +695,21 @@ hook(long syscall_number, long arg0, long arg1, long arg2, long arg3, long arg4, static_cast(arg3), static_cast(arg4)); break; +#ifdef SYS_sendfile + case SYS_sendfile: + *result = gkfs::hook::hook_sendfile( + static_cast(arg0), static_cast(arg1), + reinterpret_cast(arg2), static_cast(arg3)); + break; +#endif +#ifdef SYS_copy_file_range + case SYS_copy_file_range: + *result = gkfs::hook::hook_copy_file_range( + static_cast(arg0), reinterpret_cast(arg1), + static_cast(arg2), reinterpret_cast(arg3), + static_cast(arg4), static_cast(arg5)); + break; +#endif #ifdef SYS_unlink case SYS_unlink: *result = gkfs::hook::hook_unlinkat( diff --git a/src/client/preload.cpp b/src/client/preload.cpp index 9e9b80664a764c6eaebe57d6d929fca73476918e..11a7b4423d3fc5b473b50ff24ccc0527570eeff6 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include @@ -397,7 +398,7 @@ init_preload() { #endif CTX->init_logging(); - // from here ownwards it is safe to print messages + // from here onwards it is safe to print messages LOG(DEBUG, "Logging subsystem initialized"); // Kernel modules such as ib_uverbs may create fds in kernel space and pass @@ -414,8 +415,6 @@ init_preload() { CTX->protect_fds(true); LOG(INFO, "Protecting user fds"); } else { - // Another alternative is to use start issuing fds from gekko from a - // offset. but without protecting the FDs CTX->range_fd(gkfs::env::var_is_set(gkfs::env::RANGE_FD)); LOG(INFO, "Moving FDs to range"); } @@ -490,6 +489,7 @@ init_preload() { std::atexit(quick_exit_handler); } + /** * Called last when preload library is used with the LD_PRELOAD environment * variable diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index a8d70a5672d8cd00ac78e88b7fa2f1f4caafa783..cc8466cd07833abe61d4e52fab34ce9d225408c1 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -322,9 +322,16 @@ PreloadContext::relativize_fd_path(int dirfd, const char* raw_path, // - GekkoFS fd → return its stored path as internal // - external/unknown fd → return fd_unknown so the hook forwards to kernel if(raw_path == nullptr) { - if(dirfd != AT_FDCWD && ofm_->exist(dirfd)) { - relative_path = ofm_->get(dirfd)->path(); - return RelativizeStatus::internal; + if((flags & AT_EMPTY_PATH) == 0) { + return RelativizeStatus::external; + } + + if(dirfd != AT_FDCWD) { + auto open_file = ofm_->get(dirfd); + if(open_file) { + relative_path = open_file->path(); + return RelativizeStatus::internal; + } } // dirfd is either AT_FDCWD (which requires a valid path) or an // external fd — let the kernel handle it. @@ -360,16 +367,22 @@ PreloadContext::relativize_fd_path(int dirfd, const char* raw_path, } else { // check if we have the AT_EMPTY_PATH flag // for fstatat. - if(flags & AT_EMPTY_PATH) { - relative_path = ofm_->get(dirfd)->path(); - return RelativizeStatus::internal; + if((flags & AT_EMPTY_PATH) != 0 && raw_path[0] == '\0') { + auto open_file = ofm_->get(dirfd); + if(open_file) { + relative_path = open_file->path(); + return RelativizeStatus::internal; + } } } // path is relative to fd - auto dir = ofm_->get_dir(dirfd); - if(dir == nullptr) { + auto open_file = ofm_->get(dirfd); + if(open_file == nullptr || + open_file->type() != gkfs::filemap::FileType::directory) { return RelativizeStatus::fd_not_a_dir; } + auto dir = + std::static_pointer_cast(open_file); path = mountdir_; path.append(dir->path()); path.push_back(gkfs::path::separator); @@ -403,13 +416,17 @@ PreloadContext::relativize_path(const char* raw_path, std::string path; - if(raw_path != nullptr && raw_path[0] != gkfs::path::separator) { - /* Path is not absolute, we need to prepend CWD; - * First reserve enough space to minimize memory copy - */ - path = gkfs::path::prepend_path(cwd_, raw_path); + if(raw_path != nullptr) { + if(raw_path[0] != gkfs::path::separator) { + /* Path is not absolute, we need to prepend CWD; + * First reserve enough space to minimize memory copy + */ + path = gkfs::path::prepend_path(cwd_, raw_path); + } else { + path = raw_path; + } } else { - path = raw_path; + return false; } auto [is_in_path, resolved_path] = diff --git a/src/client/preload_util.cpp b/src/client/preload_util.cpp index a9aeba4dfe701de06e1dc521698314eec8958032..27733afb416090030a3d2bae862ef8cd9480a6ba 100644 --- a/src/client/preload_util.cpp +++ b/src/client/preload_util.cpp @@ -247,7 +247,7 @@ get_metadata(const string& path, bool follow_links, bool include_inline) { std::string inline_data; int err{}; // Use file metadata from dentry cache if available - if(CTX->use_dentry_cache()) { + if(CTX->use_dentry_cache() && CTX->dentry_cache()) { // get parent and filename path to retrieve the cache entry std::filesystem::path p(path); auto parent = p.parent_path().string(); diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 1a6cb357f0370d2e99d714c8c803d174b8503aa1..23434fc3e68e1f9f700e30241aeadf75e8c77961 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -48,9 +48,18 @@ namespace gkfs::rpc { int forward_create(const std::string& path, const mode_t mode, const int copy) { + if(!CTX->distributor()) { + LOG(ERROR, "{}() Distributor not initialized!", __func__); + return ENOTCONN; + } auto endp = CTX->hosts().at( CTX->distributor()->locate_file_metadata(path, copy)); + if(!CTX->rpc_engine()) { + LOG(ERROR, "{}() RPC engine not initialized!", __func__); + return ENOTCONN; + } + gkfs::rpc::rpc_mk_node_in_t in; in.path = path; in.mode = mode; @@ -78,9 +87,18 @@ forward_create_write_inline(const std::string& path, mode_t mode, int forward_stat(const std::string& path, string& attr, string& inline_data, const int copy, const bool include_inline) { + if(!CTX->distributor()) { + LOG(ERROR, "{}() Distributor not initialized!", __func__); + return ENOTCONN; + } auto endp = CTX->hosts().at( CTX->distributor()->locate_file_metadata(path, copy)); + if(!CTX->rpc_engine()) { + LOG(ERROR, "{}() RPC engine not initialized!", __func__); + return ENOTCONN; + } + gkfs::rpc::rpc_path_only_in_t in; in.path = path; in.include_inline = include_inline; @@ -837,7 +855,7 @@ forward_get_dirents_filtered(const std::string& path, int server, } // reuse standard decompression for now as format is same - entries = gkfs::rpc::decompress_and_parse_entries_filtered( + entries = gkfs::rpc::decompress_and_parse_entries( out, large_buffer.get()); last_scanned_key = out.last_scanned_key; diff --git a/tests/apps/CMakeLists.txt b/tests/apps/CMakeLists.txt index 83fefaeaf4184ec162109529d190c0acd1b85c8c..6d73d88b2513de51aa0f91a2edf99e72bde9ca5c 100644 --- a/tests/apps/CMakeLists.txt +++ b/tests/apps/CMakeLists.txt @@ -62,6 +62,8 @@ gekko_add_test(wacomm wacomm.sh) gekko_add_test(lockfile lockfile.sh) +gekko_add_test(flex_mmap_local flex-mmap-local.sh) + # --- Installation of Test Scripts --- if(GKFS_INSTALL_TESTS) diff --git a/tests/apps/flex-mmap-local.sh b/tests/apps/flex-mmap-local.sh new file mode 100755 index 0000000000000000000000000000000000000000..84b39c8f49b2e94c5ee299c51ae62e687e366a8d --- /dev/null +++ b/tests/apps/flex-mmap-local.sh @@ -0,0 +1,111 @@ +#!/usr/bin/env bash +# flex-mmap-local.sh — CI test for GekkoFS mmap interception compatibility +# with NumPy memmap workloads (FlexLLM-style). +# +# Expected environment variables (all have CI-compatible defaults): +# IO : GekkoFS install prefix +# DAEMON : path to gkfs_daemon binary +# GKFS : path to libgkfs_intercept.so +# GKFS_LIBC : path to libgkfs_libc_intercept.so + +set -euo pipefail + +export IO="${IO:-/builds/gitlab/hpc/gekkofs/gkfs/install}" +export GKFS="${GKFS:-$IO/lib/libgkfs_intercept.so}" +export GKFS_LIBC="${GKFS_LIBC:-$IO/lib/libgkfs_libc_intercept.so}" +export DAEMON="${DAEMON:-$IO/bin/gkfs_daemon}" +export LD_LIBRARY_PATH=/root/wacommplusplus/build/external/lib:$IO/lib/:$LD_LIBRARY_PATH + +GKROOT="${GKROOT:-/tmp/flex-mmap-ci/root}" +MNT="${MNT:-/tmp/flex-mmap-ci/mnt}" +LOG_DIR="${LOG_DIR:-/tmp/flex-mmap-ci/logs}" + +GKFS_PRELOAD="${GKFS_PRELOAD:-$GKFS_LIBC}" +CLIENT_LOG_LEVEL="${CLIENT_LOG_LEVEL:-error}" +LOOPS="${LOOPS:-50}" +THREADS="${THREADS:-2}" +MB="${MB:-8}" +COPY_METHOD="${COPY_METHOD:-shutil}" +PATH_MODE="${PATH_MODE:-shared}" +HOLD_MS="${HOLD_MS:-1}" + +# Resolve the directory containing this script (source tree, not build tree) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# ------------------------------------------------------------------- +# Set up a Python venv with numpy if the system python3 lacks it. +# This is necessary for CI images that ship only a bare python3. +# ------------------------------------------------------------------- +VENV_DIR="/tmp/flex-mmap-ci/venv" +if ! python3 -c "import numpy" 2>/dev/null; then + echo "numpy not found — creating venv and installing numpy..." + python3 -m venv "$VENV_DIR" + # shellcheck source=/dev/null + source "$VENV_DIR/bin/activate" + pip install --quiet numpy +else + # Activate an existing venv if present, else use system python3 + if [[ -f "$VENV_DIR/bin/activate" ]]; then + # shellcheck source=/dev/null + source "$VENV_DIR/bin/activate" + fi +fi +PYTHON_BIN="${PYTHON_BIN:-$(command -v python3)}" + +# ------------------------------------------------------------------- +# Prepare directories +# ------------------------------------------------------------------- +mkdir -p "$GKROOT" "$MNT" "$LOG_DIR" +rm -rf "${LOG_DIR:?}"/* + +cleanup() { + pkill -9 gkfs_daemon >/dev/null 2>&1 || true +} +trap cleanup EXIT + +cleanup +sleep 1 +rm -rf "${GKROOT:?}"/* "${MNT:?}"/* 2>/dev/null || true + +# ------------------------------------------------------------------- +# Start daemon +# ------------------------------------------------------------------- +"$DAEMON" -r "$GKROOT" -m "$MNT" \ + >"$LOG_DIR/daemon.out" 2>"$LOG_DIR/daemon.err" & +sleep 4 + +if ! pgrep -x gkfs_daemon >/dev/null 2>&1; then + echo "ERROR: gkfs_daemon failed to start. See $LOG_DIR/daemon.err" >&2 + cat "$LOG_DIR/daemon.err" >&2 + exit 1 +fi + +# ------------------------------------------------------------------- +# Run repro script +# ------------------------------------------------------------------- +set +e +LD_PRELOAD="$GKFS_PRELOAD" \ +LIBGKFS_LOG="$CLIENT_LOG_LEVEL" \ +LIBGKFS_LOG_OUTPUT="$LOG_DIR/client.log" \ +"$PYTHON_BIN" -u "${SCRIPT_DIR}/flex-mmap-repro.py" \ + --mount "$MNT" \ + --loops "$LOOPS" \ + --threads "$THREADS" \ + --mb "$MB" \ + --copy-method "$COPY_METHOD" \ + --path-mode "$PATH_MODE" \ + --hold-ms "$HOLD_MS" \ + >"$LOG_DIR/repro.out" 2>"$LOG_DIR/repro.err" +RC=$? +set -e + +echo "LOG_DIR=$LOG_DIR" +echo "REPRO_RC=$RC" +if [[ $RC -eq 0 ]]; then + echo "Flex mmap repro PASS" +else + echo "Flex mmap repro FAIL (RC=$RC)" + tail -20 "$LOG_DIR/repro.err" >&2 +fi + +exit $RC diff --git a/tests/apps/flex-mmap-repro.py b/tests/apps/flex-mmap-repro.py new file mode 100755 index 0000000000000000000000000000000000000000..12374ca247e96198b7668a790ba9a449d610e514 --- /dev/null +++ b/tests/apps/flex-mmap-repro.py @@ -0,0 +1,280 @@ +#!/usr/bin/env python3 +"""CPU-only repro for FlexLLM-style NumPy memmap header corruption on GKFS. + +This intentionally exercises: +1) copy a valid .npy into GKFS offload path, +2) map file with np.lib.format.open_memmap(), +3) trigger read() from offset 0 while mmap is active, +4) reopen memmap and verify NumPy magic stays valid. +""" + +from __future__ import annotations + +import argparse +import os +import queue +import random +import shutil +import sys +import tempfile +import threading +import time +from pathlib import Path + +import numpy as np + + +NUMPY_MAGIC = b"\x93NUMPY" + + +def build_source_file(src_file: Path, mb: int) -> None: + elems = (mb * 1024 * 1024) // 4 + arr = np.arange(elems, dtype=np.float32) + np.save(src_file, arr) + + +def check_magic(path: Path) -> tuple[bool, bytes]: + with open(path, "rb") as f: + head = f.read(8) + return head.startswith(NUMPY_MAGIC), head + + +def copy_via_rw_loop(src: Path, dst: Path) -> None: + """Deterministic copy path: avoid platform fast-copy syscalls.""" + # 4MB chunks + chk = 4 * 1024 * 1024 + with open(src, "rb") as fin, open(dst, "wb") as fout: + while data := fin.read(chk): + fout.write(data) + fout.flush() + + +def copy_via_mmap_loop(src: Path, dst: Path) -> None: + src_arr = np.lib.format.open_memmap(src, mode="c") + dst_arr = np.lib.format.open_memmap(dst, mode="w+", shape=src_arr.shape, dtype=src_arr.dtype) + dst_arr[:] = src_arr[:] + dst_arr.flush() + del dst_arr + del src_arr + + +def exercise_mapping(tid: int, iter_idx: int, dst: Path, + stop_event: threading.Event, errs: list[str], + lock: threading.Lock, hold_ms: int) -> bool: + try: + mm = np.lib.format.open_memmap(dst, mode="r+") + except Exception as exc: # noqa: BLE001 + with lock: + errs.append(f"thread {tid} iter {iter_idx} open_memmap(r+) failed: {exc}") + stop_event.set() + return False + + # read-triggered flush stimulus while mapping is alive + fd = os.open(dst, os.O_RDONLY) + try: + os.lseek(fd, 0, os.SEEK_SET) + _ = os.read(fd, 512 * 1024) + finally: + os.close(fd) + + _ = float(mm[0]) + if hold_ms > 0: + time.sleep((hold_ms / 1000.0) * (1.0 + random.random())) + del mm + + ok, head = check_magic(dst) + if not ok: + with lock: + errs.append( + f"thread {tid} iter {iter_idx} bad magic raw={head!r} file={dst}" + ) + stop_event.set() + return False + + try: + mm2 = np.lib.format.open_memmap(dst, mode="r") + _ = float(mm2[0]) + del mm2 + except Exception as exc: # noqa: BLE001 + with lock: + errs.append(f"thread {tid} iter {iter_idx} open_memmap(r) failed: {exc}") + stop_event.set() + return False + + return True + + +def contention_worker(tid: int, mount_offload: Path, src_file: Path, loops: int, + stop_event: threading.Event, errs: list[str], + lock: threading.Lock, copy_method: str, path_mode: str, + hold_ms: int) -> None: + if path_mode == "shared": + # Match FlexLLM offload naming/lifecycle: workers contend on the same + # rotating temp files. + local_paths = [mount_offload / "t_0", mount_offload / "t_1"] + else: + local_paths = [mount_offload / f"t_{tid}_0.npy", + mount_offload / f"t_{tid}_1.npy"] + + for i in range(loops): + if stop_event.is_set(): + return + # Keep workers intentionally unsynchronized over shared paths to expose + # mmap+truncate+read publication races. + dst = local_paths[(i + tid) % 2] + if copy_method == "shutil": + shutil.copyfile(src_file, dst) + else: + copy_via_rw_loop(src_file, dst) + if not exercise_mapping(tid, i, dst, stop_event, errs, lock, hold_ms): + return + + +def pipeline_producer(src_file: Path, loops: int, paths: list[Path], + copy_method: str, stop_event: threading.Event, + errs: list[str], lock: threading.Lock, + work_q: queue.Queue[tuple[int, Path] | None], + path_slots: dict[Path, threading.Semaphore], + consumers: int) -> None: + for i in range(loops): + if stop_event.is_set(): + break + dst = paths[i % len(paths)] + slot = path_slots[dst] + acquired = False + try: + slot.acquire() + acquired = True + if copy_method == "shutil": + shutil.copyfile(src_file, dst) + elif copy_method == "mmap": + copy_via_mmap_loop(src_file, dst) + else: + copy_via_rw_loop(src_file, dst) + work_q.put((i, dst)) + acquired = False + except Exception as exc: # noqa: BLE001 + with lock: + errs.append(f"producer iter {i} copy failed for {dst}: {exc}") + stop_event.set() + finally: + if acquired: + slot.release() + + for _ in range(consumers): + work_q.put(None) + + +def pipeline_consumer(tid: int, stop_event: threading.Event, errs: list[str], + lock: threading.Lock, + work_q: queue.Queue[tuple[int, Path] | None], + path_slots: dict[Path, threading.Semaphore], + hold_ms: int) -> None: + while True: + item = work_q.get() + if item is None: + work_q.task_done() + return + iter_idx, dst = item + try: + if stop_event.is_set(): + return + exercise_mapping(tid, iter_idx, dst, stop_event, errs, lock, hold_ms) + finally: + path_slots[dst].release() + work_q.task_done() + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--mount", required=True, help="GKFS mount directory") + ap.add_argument("--loops", type=int, default=300) + ap.add_argument("--threads", type=int, default=2) + ap.add_argument("--mb", type=int, default=32, help="source tensor size in MB") + ap.add_argument("--copy-method", choices=["rw", "shutil", "mmap"], default="shutil") + ap.add_argument("--path-mode", choices=["shared", "per-thread"], + default="shared") + ap.add_argument("--hold-ms", type=int, default=1, + help="extra mapping hold time to widen races") + ap.add_argument("--workflow", choices=["pipeline", "contention"], + default="pipeline") + args = ap.parse_args() + + mount = Path(args.mount) + offload = mount / "offload" + offload.mkdir(parents=True, exist_ok=True) + + tmpdir = Path(tempfile.mkdtemp(prefix="gkfs-flex-repro-")) + src = tmpdir / "src.npy" + build_source_file(src, args.mb) + + errs: list[str] = [] + lock = threading.Lock() + stop_event = threading.Event() + t0 = time.time() + if args.workflow == "contention": + threads = [ + threading.Thread( + target=contention_worker, + args=(t, offload, src, args.loops, stop_event, errs, lock, + args.copy_method, args.path_mode, args.hold_ms), + daemon=True, + ) + for t in range(args.threads) + ] + for t in threads: + t.start() + for t in threads: + t.join() + else: + if args.path_mode == "shared": + paths = [offload / "t_0", offload / "t_1"] + else: + paths = [offload / f"t_{i}_0.npy" for i in range(args.threads)] + paths.extend(offload / f"t_{i}_1.npy" for i in range(args.threads)) + + path_slots = {p: threading.Semaphore(1) for p in paths} + work_q: queue.Queue[tuple[int, Path] | None] = queue.Queue( + maxsize=max(8, args.threads * 4) + ) + producer = threading.Thread( + target=pipeline_producer, + args=(src, args.loops, paths, args.copy_method, stop_event, errs, + lock, work_q, path_slots, args.threads), + daemon=True, + ) + consumers = [ + threading.Thread( + target=pipeline_consumer, + args=(t, stop_event, errs, lock, work_q, path_slots, + args.hold_ms), + daemon=True, + ) + for t in range(args.threads) + ] + producer.start() + for t in consumers: + t.start() + producer.join() + work_q.join() + for t in consumers: + t.join() + + dt = time.time() - t0 + + if errs: + for e in errs: + print(e, file=sys.stderr) + print(f"FAILED after {dt:.2f}s", file=sys.stderr) + return 1 + + print( + f"PASS threads={args.threads} loops={args.loops} mb={args.mb} " + f"path_mode={args.path_mode} copy={args.copy_method} " + f"workflow={args.workflow} time={dt:.2f}s" + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())