From e7c42f6e9cde994fafc7fe581681648d1fb7ba69 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 10 Mar 2026 07:48:28 +0100 Subject: [PATCH 1/6] atomic and LOG changed (fuse) --- include/client/fuse/fuse_client.hpp | 14 ++- src/client/fuse/fuse_client.cpp | 158 ++++++++++++++-------------- 2 files changed, 94 insertions(+), 78 deletions(-) diff --git a/include/client/fuse/fuse_client.hpp b/include/client/fuse/fuse_client.hpp index a9e9db022..8fbc0298a 100644 --- a/include/client/fuse/fuse_client.hpp +++ b/include/client/fuse/fuse_client.hpp @@ -72,6 +72,7 @@ extern "C" { #include #include #include +#include #ifdef __FreeBSD__ #include @@ -111,7 +112,18 @@ struct _uintptr_to_must_hold_fuse_ino_t_dummy_struct { struct Inode { std::string path; - uint64_t lookup_count; + std::atomic lookup_count; + + Inode() : lookup_count(0) {} + Inode(const std::string& p, uint64_t lc) : path(p), lookup_count(lc) {} + Inode(const Inode& other) : path(other.path), lookup_count(other.lookup_count.load()) {} + Inode& operator=(const Inode& other) { + if (this != &other) { + path = other.path; + lookup_count.store(other.lookup_count.load()); + } + return *this; + } }; enum { diff --git a/src/client/fuse/fuse_client.cpp b/src/client/fuse/fuse_client.cpp index 647023329..425e09472 100644 --- a/src/client/fuse/fuse_client.cpp +++ b/src/client/fuse/fuse_client.cpp @@ -48,17 +48,6 @@ static std::unordered_map local_fifos; static fuse_ino_t next_ino = 2; // reserve 1 for root static const std::string fifo_path = "/tmp/gekkofs_fifos/"; -#ifdef GKFS_DEBUG_BUILD -#define DEBUG_INFO(ud, fmt, ...) \ - do { \ - if((ud) && (ud)->debug) { \ - fuse_log(FUSE_LOG_DEBUG, "[DEBUG] " fmt "\n", ##__VA_ARGS__); \ - } \ - } while(0) -#else -#define DEBUG_INFO(...) /* No debug output */ -#endif - #include namespace gkfs::syscall { @@ -75,7 +64,8 @@ alloc_inode(const std::string& path) { ino = (fuse_ino_t) inode; } else { ino = next_ino++; - ino_map[ino] = {path, 1}; + ino_map.emplace(std::piecewise_construct, std::forward_as_tuple(ino), + std::forward_as_tuple(path, 1)); } path_map[path] = ino; return ino; @@ -100,15 +90,20 @@ remove_inode_by_path(const std::string path) { std::lock_guard lk(ino_mutex); auto it_src = path_map.find(path); if(it_src != path_map.end()) { - if(gkfs::config::fuse::pointertrick) { - delete get_inode(it_src->second); - } else { - ino_map.erase(it_src->second); - } path_map.erase(it_src); } } +static void +remove_inode_by_ino(fuse_ino_t ino) { + std::lock_guard lk(ino_mutex); + if(gkfs::config::fuse::pointertrick) { + delete get_inode(ino); + } else { + ino_map.erase(ino); + } +} + static int fill_fuse_entry_param(const u_data* ud, const std::string path, fuse_entry_param& fep, bool check_path_map = true) { @@ -121,6 +116,7 @@ fill_fuse_entry_param(const u_data* ud, const std::string path, fuse_ino_t ino = 0; bool found = false; + Inode* inode = nullptr; // CRITICAL SECTION { @@ -130,14 +126,20 @@ fill_fuse_entry_param(const u_data* ud, const std::string path, if(it != path_map.end()) { ino = it->second; found = true; + if(gkfs::config::fuse::pointertrick) { + inode = (Inode*) ino; + } else { + auto it_ino = ino_map.find(ino); + if(it_ino != ino_map.end()) { + inode = &it_ino->second; + } + } } } } // Lock releases here if(found) { - auto inode = get_inode(ino); if(inode) { - // this could be critical because get_inode is not locked inode->lookup_count++; } } else { @@ -225,23 +227,22 @@ passthrough_ll_help(void) { static void init_handler(void* userdata, struct fuse_conn_info* conn) { struct u_data* ud = (struct u_data*) userdata; - DEBUG_INFO(ud, - "init handler: requested readahead %i direct_io %i max_read2 %i", - ud->max_readahead, ud->direct_io, ud->max_read2); - DEBUG_INFO(ud, "current values: readahead %i direct_io %i max_read2 %i", - conn->max_readahead, conn->max_read, conn->max_write); + LOG(DEBUG, "init handler: requested readahead {} direct_io {} max_read2 {}", + ud->max_readahead, ud->direct_io, ud->max_read2); + LOG(DEBUG, "current values: readahead {} direct_io {} max_read2 {}", + conn->max_readahead, conn->max_read, conn->max_write); #if FUSE_MAJOR_VERSION > 3 || \ (FUSE_MAJOR_VERSION == 3 && FUSE_MINOR_VERSION >= 12) if(ud->writeback) { fuse_set_feature_flag(conn, FUSE_CAP_WRITEBACK_CACHE); - DEBUG_INFO(ud, "init_handler: try to activate writeback"); + LOG(DEBUG, "init_handler: try to activate writeback"); } // TODO copy the other feature flags here #else if(ud->writeback) { conn->want |= FUSE_CAP_WRITEBACK_CACHE; - DEBUG_INFO(ud, "init_handler: try to activate writeback"); + LOG(DEBUG, "init_handler: try to activate writeback"); } // TODO make this optional!!! // default activated if kernel supports it: FUSE_CAP_PARALLEL_DIROPS, @@ -286,21 +287,21 @@ init_handler(void* userdata, struct fuse_conn_info* conn) { static void destroy_handler(void* userdata) { struct u_data* ud = (struct u_data*) userdata; - DEBUG_INFO(ud, "destroy handler"); + LOG(DEBUG, "destroy handler"); // userdata is GekkoFuse* if passed } static void lookup_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { auto* ud = udata(req); - DEBUG_INFO(ud, "lookup handler ino %u", parent); + LOG(DEBUG, "lookup handler ino {}", parent); auto* parent_inode = get_inode(parent); if(!parent_inode) { fuse_reply_err(req, ENOENT); return; } std::string child = get_path(parent_inode, name); - DEBUG_INFO(ud, "lookup %s", child.c_str()); + LOG(DEBUG, "lookup {}", child.c_str()); if(ud->fifo) { auto iit = local_fifos.find(child); @@ -330,7 +331,7 @@ lookup_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { static void getattr_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "getattr handler"); + LOG(DEBUG, "getattr handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -340,7 +341,7 @@ getattr_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { struct stat st; int rc = gkfs::syscall::gkfs_stat(inode->path, &st, false); if(rc) { - DEBUG_INFO(ud, "getattr error %u", rc); + LOG(DEBUG, "getattr error {}", rc); fuse_reply_err(req, errno); return; } @@ -351,7 +352,7 @@ static void setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "setattr handler ino %u", ino); + LOG(DEBUG, "setattr handler ino {}", ino); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -362,8 +363,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, off_t new_size = attr->st_size; int res = gkfs::syscall::gkfs_truncate(inode->path, new_size); if(res < 0) { - DEBUG_INFO(ud, "setattr truncate failed on %s", - inode->path.c_str()); + LOG(DEBUG, "setattr truncate failed on {}", inode->path.c_str()); fuse_reply_err(req, EIO); return; } @@ -372,7 +372,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, struct stat st; int rc = gkfs::syscall::gkfs_stat(inode->path, &st, false); if(rc) { - DEBUG_INFO(ud, "getattr error %u", rc); + LOG(DEBUG, "getattr error {}", rc); fuse_reply_err(req, errno); return; } @@ -386,7 +386,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, static void open_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "open handler"); + LOG(DEBUG, "open handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -408,7 +408,7 @@ static void lseek_handler(fuse_req_t req, fuse_ino_t ino, off_t off, int whence, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "lseek handler"); + LOG(DEBUG, "lseek handler"); int lc = gkfs::syscall::gkfs_lseek(fi->fh, off, whence); if(lc < 0) { fuse_reply_err(req, errno); @@ -421,7 +421,7 @@ static void read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "read handler"); + LOG(DEBUG, "read handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); if(!inode) { @@ -432,7 +432,7 @@ read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, std::vector buf(size); int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.data(), size, off); if(rc < 0) { - DEBUG_INFO(ud, "read fail"); + LOG(DEBUG, "read fail"); fuse_reply_err(req, errno); return; } @@ -443,7 +443,7 @@ static void write_handler(fuse_req_t req, fuse_ino_t ino, const char* buf, size_t size, off_t off, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "write handler"); + LOG(DEBUG, "write handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); if(!inode) { @@ -453,7 +453,7 @@ write_handler(fuse_req_t req, fuse_ino_t ino, const char* buf, size_t size, } int rc = gkfs::syscall::gkfs_pwrite(fi->fh, buf, size, off); if(rc < 0) { - DEBUG_INFO(ud, "write fail"); + LOG(DEBUG, "write fail"); fuse_reply_err(req, errno); return; } @@ -477,16 +477,15 @@ create_handler(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t mode, int rc = gkfs::syscall::gkfs_create(path, mode); int errno_bu = errno; if(rc == -1) { - fuse_log(FUSE_LOG_DEBUG, - "create failed, here here mode %i flags %i errno %i\n", mode, + LOG(DEBUG, "create failed, here here mode {} flags {} errno {}", mode, fi->flags, errno); } int fd = gkfs::syscall::gkfs_open(path, mode, fi->flags); */ - DEBUG_INFO(ud, "create handler %s", path.c_str()); + LOG(DEBUG, "create handler {}", path.c_str()); int fd = gkfs::syscall::gkfs_open(path, mode, fi->flags | O_CREAT); if(fd < 0) { - DEBUG_INFO(ud, "create -> open failed errno %i", errno); + LOG(DEBUG, "create -> open failed errno {}", errno); fuse_reply_err(req, errno); return; } @@ -507,7 +506,7 @@ create_handler(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t mode, static void unlink_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { auto* ud = udata(req); - DEBUG_INFO(ud, "unlink handler"); + LOG(DEBUG, "unlink handler"); auto* parent_inode = get_inode(parent); if(!parent_inode) { fuse_reply_err(req, ENOENT); @@ -538,17 +537,17 @@ unlink_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { static void opendir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "opendir handler"); + LOG(DEBUG, "opendir handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOTDIR); return; } - DEBUG_INFO(ud, "open dir %s", inode->path.c_str()); + LOG(DEBUG, "open dir {}", inode->path.c_str()); const int fd = gkfs::syscall::gkfs_opendir(inode->path); - DEBUG_INFO(ud, "\t with fd %i \n", fd); + LOG(DEBUG, "\t with fd {} ", fd); if(fd < 0) { fuse_reply_err(req, ENOTDIR); @@ -563,7 +562,7 @@ static void readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "readdir handler"); + LOG(DEBUG, "readdir handler"); auto open_dir = CTX->file_map()->get_dir(fi->fh); if(open_dir == nullptr) { @@ -571,7 +570,7 @@ readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } - DEBUG_INFO(ud, "read dir %s", open_dir->path().c_str()); + LOG(DEBUG, "read dir {}", open_dir->path().c_str()); // Allocate a buffer to accumulate entries char* buf = static_cast(malloc(size)); @@ -645,7 +644,7 @@ readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, static void statfs_handler(fuse_req_t req, fuse_ino_t ino) { auto* ud = udata(req); - DEBUG_INFO(ud, "statfs handler ino %lu", ino); + LOG(DEBUG, "statfs handler ino {}", ino); struct statfs stfs{}; int rc = gkfs::syscall::gkfs_statfs(&stfs); if(rc < 0) { @@ -672,7 +671,7 @@ static void readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "readdirplus handler"); + LOG(DEBUG, "readdirplus handler"); auto open_dir = CTX->file_map()->get_dir(fi->fh); if(open_dir == nullptr) { @@ -680,7 +679,7 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } - DEBUG_INFO(ud, "read dirplus %s", open_dir->path().c_str()); + LOG(DEBUG, "read dirplus {}", open_dir->path().c_str()); // Allocate a buffer to accumulate entries char* buf = static_cast(malloc(size)); @@ -699,7 +698,7 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, std::string child = open_dir->path() == "/" ? "/" + de.name() : open_dir->path() + "/" + de.name(); - DEBUG_INFO(ud, "read dirplus child %s", child.c_str()); + LOG(DEBUG, "read dirplus child {}", child.c_str()); fuse_entry_param e = {}; @@ -777,10 +776,10 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, static void releasedir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "releasedir handler"); + LOG(DEBUG, "releasedir handler"); if(CTX->interception_enabled() && CTX->file_map()->exist(fi->fh)) { - int ret = gkfs::syscall::gkfs_close(fd); // Close GekkoFS internal FD + int ret = + gkfs::syscall::gkfs_close(fi->fh); // Close GekkoFS internal FD fuse_reply_err(req, ret); return; @@ -792,7 +791,7 @@ releasedir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { static void release_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "release handler"); + LOG(DEBUG, "release handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -809,8 +808,7 @@ release_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { /// decrement lookup count static void forget_handler(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) { - auto* ud = udata(req); - DEBUG_INFO(ud, "forget handler"); + LOG(DEBUG, "forget handler"); Inode* inode = get_inode(ino); if(!inode) { @@ -818,24 +816,29 @@ forget_handler(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) { return; } - if(inode->lookup_count > nlookup) - inode->lookup_count -= nlookup; - else - inode->lookup_count = 0; + uint64_t current = inode->lookup_count.load(); + while(current >= nlookup) { + if(inode->lookup_count.compare_exchange_weak(current, + current - nlookup)) { + break; + } + } + if(current < nlookup) { + inode->lookup_count.store(0); + } - if(inode->lookup_count == 0) { // && inode.open_count == 0 - remove_inode_by_path(inode->path); - DEBUG_INFO(ud, "reached lookup_count 0"); + if(inode->lookup_count.load() == 0) { // && inode.open_count == 0 + LOG(DEBUG, "reached lookup_count 0 for ino {}", ino); + remove_inode_by_ino(ino); } fuse_reply_none(req); - // fuse_reply_err(req, 0); } static void flush_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "flush handler"); + LOG(DEBUG, "flush handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); if(!inode) { @@ -861,7 +864,7 @@ fsync_handler(fuse_req_t req, fuse_ino_t ino, int datasync, static void access_handler(fuse_req_t req, fuse_ino_t ino, int mask) { auto* ud = udata(req); - DEBUG_INFO(ud, "access handler"); + LOG(DEBUG, "access handler"); if(ud->access && !ud->fifo) { auto* inode = get_inode(ino); if(!inode) { @@ -891,7 +894,7 @@ mkdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name, return; } std::string path = get_path(parent_inode, name); - DEBUG_INFO(ud, "mkdir parent %s name %s", parent_inode->path.c_str(), name); + LOG(DEBUG, "mkdir parent {} name {}", parent_inode->path.c_str(), name); int rc = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); if(rc == -1) { fuse_reply_err(req, errno); @@ -916,7 +919,7 @@ rmdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { return; } std::string path = get_path(parent_inode, name); - DEBUG_INFO(ud, "rmdir %s", path.c_str()); + LOG(DEBUG, "rmdir {}", path.c_str()); int rc = gkfs::syscall::gkfs_rmdir(path); if(rc == -1) { fuse_reply_err(req, errno); @@ -987,7 +990,7 @@ symlink_handler(fuse_req_t req, const char* linkname, fuse_ino_t parent, // this shows the link on ls -l e.attr.st_mode = S_IFLNK | 0777; // mark as symlink + full perms e.attr.st_size = strlen(target.c_str()); - // DEBUG_INFO(ud, "stat mode %i, iflink %i", e.attr.st_mode, S_IFLNK); + // LOG(DEBUG, "stat mode {}, iflink {}", e.attr.st_mode, S_IFLNK); fuse_reply_entry(req, &e); } @@ -1142,10 +1145,11 @@ init_gekkofs() { struct stat st; int rc = gkfs::syscall::gkfs_stat(root_path, &st); if(rc < 0) { - fuse_log(FUSE_LOG_ERR, "failed to open root\n"); + LOG(ERROR, "failed to open root"); exit(1); } - root_inode = {root_path, 1}; + root_inode.path = root_path; + root_inode.lookup_count = 1; path_map[root_path] = FUSE_ROOT_ID; std::cout << "root node allocated" << std::endl; } @@ -1295,7 +1299,7 @@ main(int argc, char* argv[]) { break; } } else if(ud.timeout < 0) { - fuse_log(FUSE_LOG_ERR, "timeout is negative (%lf)\n", ud.timeout); + LOG(ERROR, "timeout is negative ({})", ud.timeout); exit(1); } -- GitLab From ad9c5790c3653b70325a0ddf177e5550b510e718 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 10 Mar 2026 11:45:17 +0100 Subject: [PATCH 2/6] more perf opt fuse --- README.md | 4 + include/client/fuse/fuse_client.hpp | 3 + src/client/fuse/fuse_client.cpp | 264 ++++++++++++------ tests/integration/fuse/test_metadata_bench.py | 98 +++++++ tests/integration/fuse/test_stress.py | 45 +++ 5 files changed, 331 insertions(+), 83 deletions(-) create mode 100644 tests/integration/fuse/test_metadata_bench.py create mode 100644 tests/integration/fuse/test_stress.py diff --git a/README.md b/README.md index 3898baf13..4eb0547a0 100644 --- a/README.md +++ b/README.md @@ -628,6 +628,10 @@ Client-metrics require the CMake argument `-DGKFS_ENABLE_CLIENT_METRICS=ON` (see - `LIBGKFS_READ_INLINE_PREFETCH` - Prefetch inline data when opening files (default: OFF). - `LIBGKFS_USE_DIRENTS_COMPRESSION` - Enable compression for directory entries (default: OFF). - `LIBGKFS_DIRENTS_BUFF_SIZE` - Buffer size for directory entries (default: 8MB). +- `GKFS_FUSE_ENTRY_TIMEOUT` - Caching timeout for dentry entries in the FUSE client (default: 1.0). +- `GKFS_FUSE_ATTR_TIMEOUT` - Caching timeout for file attributes in the FUSE client (default: 1.0). +- `GKFS_FUSE_NEGATIVE_TIMEOUT` - Caching timeout for negative lookups in the FUSE client (default: 1.0). +- `GKFS_FUSE_WRITEBACK` - Enable writeback cache in the FUSE client (default: OFF). #### Caching ##### Dentry cache diff --git a/include/client/fuse/fuse_client.hpp b/include/client/fuse/fuse_client.hpp index 8fbc0298a..19866c4b0 100644 --- a/include/client/fuse/fuse_client.hpp +++ b/include/client/fuse/fuse_client.hpp @@ -147,6 +147,9 @@ struct u_data { int xattr; char* mountpoint; double timeout; + double entry_timeout; + double attr_timeout; + double negative_timeout; int cache; int timeout_set; }; diff --git a/src/client/fuse/fuse_client.cpp b/src/client/fuse/fuse_client.cpp index 425e09472..1f9406989 100644 --- a/src/client/fuse/fuse_client.cpp +++ b/src/client/fuse/fuse_client.cpp @@ -40,12 +40,26 @@ #include static struct fuse_lowlevel_ops ll_ops; -static std::mutex ino_mutex; +struct InodeShard { + std::mutex mutex; + std::unordered_map ino_map; +}; + +struct PathShard { + std::mutex mutex; + std::unordered_map path_map; +}; + +const int SHARD_COUNT = 32; +static PathShard path_shards[SHARD_COUNT]; +static InodeShard ino_shards[SHARD_COUNT]; + static Inode root_inode; -static std::unordered_map ino_map; -static std::unordered_map path_map; -static std::unordered_map local_fifos; -static fuse_ino_t next_ino = 2; // reserve 1 for root +static std::unordered_map + local_fifos; // TODO shard if needed +static std::mutex fifos_mutex; + +static std::atomic next_ino{2}; // reserve 1 for root static const std::string fifo_path = "/tmp/gekkofs_fifos/"; #include @@ -57,17 +71,21 @@ gkfs_statfs(struct statfs* buf); static fuse_ino_t alloc_inode(const std::string& path) { - std::lock_guard lk(ino_mutex); fuse_ino_t ino; if(gkfs::config::fuse::pointertrick) { Inode* inode = new Inode{path, 1}; ino = (fuse_ino_t) inode; } else { ino = next_ino++; - ino_map.emplace(std::piecewise_construct, std::forward_as_tuple(ino), - std::forward_as_tuple(path, 1)); - } - path_map[path] = ino; + auto& i_shard = ino_shards[std::hash{}(ino) % SHARD_COUNT]; + std::lock_guard lk(i_shard.mutex); + i_shard.ino_map.emplace(std::piecewise_construct, + std::forward_as_tuple(ino), + std::forward_as_tuple(path, 1)); + } + auto& p_shard = path_shards[std::hash{}(path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + p_shard.path_map[path] = ino; return ino; } @@ -79,28 +97,40 @@ get_inode(fuse_ino_t ino) { if(gkfs::config::fuse::pointertrick) { return (Inode*) ino; } else { - std::lock_guard lk(ino_mutex); - auto it = ino_map.find(ino); - return it != ino_map.end() ? &it->second : nullptr; + auto& i_shard = ino_shards[std::hash{}(ino) % SHARD_COUNT]; + std::lock_guard lk(i_shard.mutex); + auto it = i_shard.ino_map.find(ino); + return it != i_shard.ino_map.end() ? &it->second : nullptr; } } static void remove_inode_by_path(const std::string path) { - std::lock_guard lk(ino_mutex); - auto it_src = path_map.find(path); - if(it_src != path_map.end()) { - path_map.erase(it_src); + auto& p_shard = path_shards[std::hash{}(path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + auto it_src = p_shard.path_map.find(path); + if(it_src != p_shard.path_map.end()) { + p_shard.path_map.erase(it_src); } } static void remove_inode_by_ino(fuse_ino_t ino) { - std::lock_guard lk(ino_mutex); + auto* inode = get_inode(ino); + if(!inode) + return; + + // Remove from path_map if it was still linked + if(!inode->path.empty()) { + remove_inode_by_path(inode->path); + } + if(gkfs::config::fuse::pointertrick) { - delete get_inode(ino); + delete inode; } else { - ino_map.erase(ino); + auto& i_shard = ino_shards[std::hash{}(ino) % SHARD_COUNT]; + std::lock_guard lk(i_shard.mutex); + i_shard.ino_map.erase(ino); } } @@ -120,36 +150,38 @@ fill_fuse_entry_param(const u_data* ud, const std::string path, // CRITICAL SECTION { - std::lock_guard lk(ino_mutex); + auto& p_shard = + path_shards[std::hash{}(path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); if(check_path_map) { - auto it = path_map.find(path); - if(it != path_map.end()) { + auto it = p_shard.path_map.find(path); + if(it != p_shard.path_map.end()) { ino = it->second; found = true; if(gkfs::config::fuse::pointertrick) { inode = (Inode*) ino; } else { - auto it_ino = ino_map.find(ino); - if(it_ino != ino_map.end()) { + auto& i_shard = ino_shards[std::hash{}(ino) % + SHARD_COUNT]; + std::lock_guard i_lk(i_shard.mutex); + auto it_ino = i_shard.ino_map.find(ino); + if(it_ino != i_shard.ino_map.end()) { inode = &it_ino->second; + inode->lookup_count++; } } } } } // Lock releases here - if(found) { - if(inode) { - inode->lookup_count++; - } - } else { + if(!found) { ino = alloc_inode(path); } fep.ino = ino; fep.attr = st; - fep.attr_timeout = ud->timeout; - fep.entry_timeout = ud->timeout; + fep.attr_timeout = ud->attr_timeout; + fep.entry_timeout = ud->entry_timeout; return 0; } @@ -235,9 +267,15 @@ init_handler(void* userdata, struct fuse_conn_info* conn) { #if FUSE_MAJOR_VERSION > 3 || \ (FUSE_MAJOR_VERSION == 3 && FUSE_MINOR_VERSION >= 12) if(ud->writeback) { - fuse_set_feature_flag(conn, FUSE_CAP_WRITEBACK_CACHE); + conn->want |= FUSE_CAP_WRITEBACK_CACHE; LOG(DEBUG, "init_handler: try to activate writeback"); } + if(ud->max_background > 0) { + conn->want |= FUSE_CAP_ASYNC_DIO; + } + conn->want |= FUSE_CAP_PARALLEL_DIROPS; + conn->want |= FUSE_CAP_ATOMIC_O_TRUNC; + // TODO copy the other feature flags here #else if(ud->writeback) { @@ -286,7 +324,6 @@ init_handler(void* userdata, struct fuse_conn_info* conn) { static void destroy_handler(void* userdata) { - struct u_data* ud = (struct u_data*) userdata; LOG(DEBUG, "destroy handler"); // userdata is GekkoFuse* if passed } @@ -301,7 +338,7 @@ lookup_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { return; } std::string child = get_path(parent_inode, name); - LOG(DEBUG, "lookup {}", child.c_str()); + LOG(DEBUG, "lookup {}", child); if(ud->fifo) { auto iit = local_fifos.find(child); @@ -363,7 +400,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, off_t new_size = attr->st_size; int res = gkfs::syscall::gkfs_truncate(inode->path, new_size); if(res < 0) { - LOG(DEBUG, "setattr truncate failed on {}", inode->path.c_str()); + LOG(DEBUG, "setattr truncate failed on {}", inode->path); fuse_reply_err(req, EIO); return; } @@ -407,7 +444,6 @@ open_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { static void lseek_handler(fuse_req_t req, fuse_ino_t ino, off_t off, int whence, struct fuse_file_info* fi) { - auto* ud = udata(req); LOG(DEBUG, "lseek handler"); int lc = gkfs::syscall::gkfs_lseek(fi->fh, off, whence); if(lc < 0) { @@ -420,7 +456,6 @@ lseek_handler(fuse_req_t req, fuse_ino_t ino, off_t off, int whence, static void read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { - auto* ud = udata(req); LOG(DEBUG, "read handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); @@ -429,20 +464,21 @@ read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } } - std::vector buf(size); - int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.data(), size, off); + + // Allocate buffer without zero-initialization for performance + auto buf = std::make_unique(size); + int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.get(), size, off); if(rc < 0) { LOG(DEBUG, "read fail"); fuse_reply_err(req, errno); return; } - fuse_reply_buf(req, buf.data(), rc); + fuse_reply_buf(req, buf.get(), rc); } static void write_handler(fuse_req_t req, fuse_ino_t ino, const char* buf, size_t size, off_t off, struct fuse_file_info* fi) { - auto* ud = udata(req); LOG(DEBUG, "write handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); @@ -482,7 +518,7 @@ create_handler(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t mode, } int fd = gkfs::syscall::gkfs_open(path, mode, fi->flags); */ - LOG(DEBUG, "create handler {}", path.c_str()); + LOG(DEBUG, "create handler {}", path); int fd = gkfs::syscall::gkfs_open(path, mode, fi->flags | O_CREAT); if(fd < 0) { LOG(DEBUG, "create -> open failed errno {}", errno); @@ -536,14 +572,13 @@ unlink_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { static void opendir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); LOG(DEBUG, "opendir handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOTDIR); return; } - LOG(DEBUG, "open dir {}", inode->path.c_str()); + LOG(DEBUG, "open dir {}", inode->path); const int fd = gkfs::syscall::gkfs_opendir(inode->path); @@ -570,7 +605,7 @@ readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } - LOG(DEBUG, "read dir {}", open_dir->path().c_str()); + LOG(DEBUG, "read dir {}", open_dir->path()); // Allocate a buffer to accumulate entries char* buf = static_cast(malloc(size)); @@ -643,7 +678,6 @@ readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, static void statfs_handler(fuse_req_t req, fuse_ino_t ino) { - auto* ud = udata(req); LOG(DEBUG, "statfs handler ino {}", ino); struct statfs stfs{}; int rc = gkfs::syscall::gkfs_statfs(&stfs); @@ -679,7 +713,7 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } - LOG(DEBUG, "read dirplus {}", open_dir->path().c_str()); + LOG(DEBUG, "read dirplus {}", open_dir->path()); // Allocate a buffer to accumulate entries char* buf = static_cast(malloc(size)); @@ -698,7 +732,7 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, std::string child = open_dir->path() == "/" ? "/" + de.name() : open_dir->path() + "/" + de.name(); - LOG(DEBUG, "read dirplus child {}", child.c_str()); + LOG(DEBUG, "read dirplus child {}", child); fuse_entry_param e = {}; @@ -790,7 +824,6 @@ releasedir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { /// releases file descriptor, not connected to lookup_count static void release_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); LOG(DEBUG, "release handler"); auto* inode = get_inode(ino); if(!inode) { @@ -837,7 +870,6 @@ forget_handler(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) { static void flush_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); LOG(DEBUG, "flush handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); @@ -894,7 +926,7 @@ mkdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name, return; } std::string path = get_path(parent_inode, name); - LOG(DEBUG, "mkdir parent {} name {}", parent_inode->path.c_str(), name); + LOG(DEBUG, "mkdir parent {} name {}", parent_inode->path, name); int rc = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); if(rc == -1) { fuse_reply_err(req, errno); @@ -912,14 +944,13 @@ mkdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name, static void rmdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { - auto* ud = udata(req); auto* parent_inode = get_inode(parent); if(!parent_inode) { fuse_reply_err(req, ENOENT); return; } std::string path = get_path(parent_inode, name); - LOG(DEBUG, "rmdir {}", path.c_str()); + LOG(DEBUG, "rmdir {}", path); int rc = gkfs::syscall::gkfs_rmdir(path); if(rc == -1) { fuse_reply_err(req, errno); @@ -1042,40 +1073,51 @@ rename_handler(fuse_req_t req, fuse_ino_t old_parent, const char* old_name, return; } - ino_mutex.lock(); - auto it_src = path_map.find(old_path); fuse_ino_t src_ino = 0; - - if(it_src != path_map.end()) { - src_ino = it_src->second; - path_map.erase(it_src); - path_map[new_path] = src_ino; - } else { - src_ino = alloc_inode(new_path); - path_map[new_path] = src_ino; + { + auto& p_shard = + path_shards[std::hash{}(old_path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + auto it_src = p_shard.path_map.find(old_path); + if(it_src != p_shard.path_map.end()) { + src_ino = it_src->second; + p_shard.path_map.erase(it_src); + } } - ino_mutex.unlock(); - auto* inode = get_inode(src_ino); - if(inode) { - inode->path = new_path; - } + if(src_ino == 0) { + src_ino = alloc_inode(new_path); + } else { + auto& p_shard = + path_shards[std::hash{}(new_path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + + fuse_ino_t old_dst_ino = 0; + auto it_dst = p_shard.path_map.find(new_path); + if(it_dst != p_shard.path_map.end()) { + old_dst_ino = it_dst->second; + } - // If destination existed and was overwritten, detach its mapping - if(dst_exists) { - auto it_dst = path_map.find(new_path); - if(it_dst != path_map.end()) { - fuse_ino_t dst_ino = it_dst->second; - path_map.erase(it_dst); + p_shard.path_map[new_path] = src_ino; - // Mark the old dst inode as disconnected (no pathname). - // Keep it alive until lookup_count==0 and open_count==0, then free - // in forget(). - auto& dst_rec = ino_map[dst_ino]; - dst_rec.path.clear(); + if(old_dst_ino != 0) { + auto& i_shard = ino_shards[std::hash{}(old_dst_ino) % + SHARD_COUNT]; + std::lock_guard i_lk(i_shard.mutex); + auto it_ino = i_shard.ino_map.find(old_dst_ino); + if(it_ino != i_shard.ino_map.end()) { + it_ino->second.path.clear(); + } } } + auto& i_shard = ino_shards[std::hash{}(src_ino) % SHARD_COUNT]; + std::lock_guard i_lk(i_shard.mutex); + auto it_ino = i_shard.ino_map.find(src_ino); + if(it_ino != i_shard.ino_map.end()) { + it_ino->second.path = new_path; + } + fuse_reply_err(req, 0); } @@ -1149,11 +1191,44 @@ init_gekkofs() { exit(1); } root_inode.path = root_path; - root_inode.lookup_count = 1; - path_map[root_path] = FUSE_ROOT_ID; + auto& p_shard = + path_shards[std::hash{}(root_path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + p_shard.path_map[root_path] = FUSE_ROOT_ID; std::cout << "root node allocated" << std::endl; } +static void +forget_multi_handler(fuse_req_t req, size_t count, + struct fuse_forget_data* forgets) { + LOG(DEBUG, "forget_multi handler count {}", count); + for(size_t i = 0; i < count; i++) { + fuse_ino_t ino = forgets[i].ino; + uint64_t nlookup = forgets[i].nlookup; + + Inode* inode = get_inode(ino); + if(!inode) + continue; + + uint64_t current = inode->lookup_count.load(); + while(current >= nlookup) { + if(inode->lookup_count.compare_exchange_weak(current, + current - nlookup)) { + break; + } + } + if(current < nlookup) { + inode->lookup_count.store(0); + } + + if(inode->lookup_count.load() == 0) { + LOG(DEBUG, "reached lookup_count 0 for ino {} (multi)", ino); + remove_inode_by_ino(ino); + } + } + fuse_reply_none(req); +} + static void init_ll_ops(fuse_lowlevel_ops* ops) { // file @@ -1163,7 +1238,7 @@ init_ll_ops(fuse_lowlevel_ops* ops) { ops->create = create_handler; ops->unlink = unlink_handler; ops->forget = forget_handler; - // ops->forget_multi + ops->forget_multi = forget_multi_handler; ops->readlink = readlink_handler; ops->mknod = mknod_handler; ops->symlink = symlink_handler; @@ -1303,6 +1378,29 @@ main(int argc, char* argv[]) { exit(1); } + ud.entry_timeout = ud.timeout; + ud.attr_timeout = ud.timeout; + ud.negative_timeout = ud.timeout; + + if(const char* env_e = std::getenv("GKFS_FUSE_ENTRY_TIMEOUT")) { + ud.entry_timeout = std::stod(env_e); + LOG(INFO, "FUSE entry_timeout set to {}", ud.entry_timeout); + } + if(const char* env_a = std::getenv("GKFS_FUSE_ATTR_TIMEOUT")) { + ud.attr_timeout = std::stod(env_a); + LOG(INFO, "FUSE attr_timeout set to {}", ud.attr_timeout); + } + if(const char* env_n = std::getenv("GKFS_FUSE_NEGATIVE_TIMEOUT")) { + ud.negative_timeout = std::stod(env_n); + LOG(INFO, "FUSE negative_timeout set to {}", ud.negative_timeout); + } + + if(const char* env_w = std::getenv("GKFS_FUSE_WRITEBACK")) { + ud.writeback = + (std::string(env_w) == "ON" || std::string(env_w) == "1"); + LOG(INFO, "FUSE writeback set to {}", ud.writeback); + } + ud.mountpoint = strdup(opts.mountpoint); init_gekkofs(); diff --git a/tests/integration/fuse/test_metadata_bench.py b/tests/integration/fuse/test_metadata_bench.py new file mode 100644 index 000000000..abbf9e940 --- /dev/null +++ b/tests/integration/fuse/test_metadata_bench.py @@ -0,0 +1,98 @@ +import os +import time +import threading +import pytest +import statistics + +def benchmark_metadata_ops(mountdir, num_threads, num_iters): + def worker(tid, results): + thread_dir = os.path.join(mountdir, f"bench_t{tid}") + os.makedirs(thread_dir, exist_ok=True) + + start = time.perf_counter() + for i in range(num_iters): + # Create/Stat/Remove + p = os.path.join(thread_dir, f"f{i}") + with open(p, "w") as f: + f.write("x") + os.stat(p) + os.remove(p) + end = time.perf_counter() + results.append(end - start) + + results = [] + threads = [] + for i in range(num_threads): + t = threading.Thread(target=worker, args=(i, results)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + total_time = max(results) + ops_per_sec = (num_threads * num_iters * 3) / total_time + return ops_per_sec + +def benchmark_read_latency(mountdir, num_iters): + p = os.path.join(mountdir, "baseline_file") + with open(p, "w") as f: + f.write("data") + + # Warm up + os.stat(p) + + latencies = [] + for i in range(num_iters): + start = time.perf_counter() + os.stat(p) + end = time.perf_counter() + latencies.append(end - start) + + return statistics.median(latencies) * 1e6 # in microseconds + +def benchmark_write_throughput(mountdir, num_threads, size_mb): + def worker(tid, results): + p = os.path.join(mountdir, f"write_bench_t{tid}") + data = b"x" * 1024 * 1024 # 1MB + + start = time.perf_counter() + with open(p, "wb") as f: + for _ in range(size_mb): + f.write(data) + f.flush() + os.fsync(f.fileno()) + end = time.perf_counter() + results.append(end - start) + + results = [] + threads = [] + for i in range(num_threads): + t = threading.Thread(target=worker, args=(i, results)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + total_time = max(results) + throughput_mb = (num_threads * size_mb) / total_time + return throughput_mb + +@pytest.mark.parametrize("num_threads", [1, 8]) +def test_benchmark_performance(gkfs_daemon, fuse_client, num_threads): + mountdir = fuse_client.mountdir + iters = 100 + + print(f"\n--- Benchmarking with {num_threads} threads ---") + + # Run with current settings + ops = benchmark_metadata_ops(mountdir, num_threads, iters) + print(f"Metadata Throughput: {ops:.2f} ops/sec") + + lat = benchmark_read_latency(mountdir, iters) + print(f"Stat Latency (Median): {lat:.2f} us") + + if num_threads == 1: + tp = benchmark_write_throughput(mountdir, 1, 100) + print(f"Write Throughput (1 thread): {tp:.2f} MB/s") diff --git a/tests/integration/fuse/test_stress.py b/tests/integration/fuse/test_stress.py new file mode 100644 index 000000000..cbf74ea57 --- /dev/null +++ b/tests/integration/fuse/test_stress.py @@ -0,0 +1,45 @@ +import os +import threading +import pytest +import sh + +def thread_task(mountdir, thread_id, num_iters): + thread_dir = os.path.join(mountdir, f"thread_{thread_id}") + os.makedirs(thread_dir, exist_ok=True) + + for i in range(num_iters): + file_path = os.path.join(thread_dir, f"file_{i}") + # Create + with open(file_path, "w") as f: + f.write("data") + # Stat + os.stat(file_path) + # Unlink + os.remove(file_path) + + # Subdir create/rmdir + subdir = os.path.join(thread_dir, f"dir_{i}") + os.mkdir(subdir) + os.rmdir(subdir) + +@pytest.mark.parametrize("num_threads", [10]) +@pytest.mark.parametrize("iters_per_thread", [100]) +def test_metadata_stress(gkfs_daemon, fuse_client, num_threads, iters_per_thread): + mountdir = fuse_client.mountdir + threads = [] + + for i in range(num_threads): + t = threading.Thread(target=thread_task, args=(mountdir, i, iters_per_thread)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # Final check of the root directory + assert "thread_0" in os.listdir(mountdir) + + # Cleanup + for i in range(num_threads): + thread_dir = os.path.join(mountdir, f"thread_{i}") + sh.rm("-rf", thread_dir) -- GitLab From a85ba1e67fd29534d3ab8dac7790d1a9be6d1011 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 10 Mar 2026 13:51:56 +0100 Subject: [PATCH 3/6] fix dcache_futures nullptr --- CHANGELOG.md | 4 ++++ README.md | 5 +++-- src/client/gkfs_metadata.cpp | 14 +++++++++++++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32bcb3b2f..51c44810e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Added FUSE support([!264](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/264)) - fuse_client mounting a virtual path exists, avoiding usage of LD_PRELOAD. - Added tests for FUSE support + - Optimized and fixed fuse ([!293](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/293)) + - New optimization options + - Less contention with threading + diff --git a/README.md b/README.md index 4eb0547a0..597b0a043 100644 --- a/README.md +++ b/README.md @@ -283,18 +283,19 @@ The `fuse_client` requires the `LIBGKFS_HOSTS_FILE` environment variable to be s Common launch arguments: - `-f`: Run in foreground. -- `-s`: Single-threaded operation (recommended). +- `-s`: Single-threaded operation. With the recent FUSE optimizations, multi-threaded operation is now the default and recommended for performance. Use `-s` only for debugging. - ``: The directory where GekkoFS will be mounted. FUSE-specific options can be passed via `-o`: - `writeback`: Enable writeback cache. - `direct_io`: Enable direct I/O (can boost performance significantly). +- `max_threads=`: Limit the maximum number of background threads FUSE can spawn to serve requests (supported in FUSE 3.12+). - `timeout=`: Caching timeout (default: 1.0). - `cache={never,auto,always}`: Cache policy. Example with options: ```bash -/bin/fuse_client -f -s -o writeback,direct_io,timeout=5.0 +/bin/fuse_client -f -o writeback,direct_io,timeout=5.0,max_threads=32 ``` ## Logging diff --git a/src/client/gkfs_metadata.cpp b/src/client/gkfs_metadata.cpp index 4530885aa..ef0e1ed53 100644 --- a/src/client/gkfs_metadata.cpp +++ b/src/client/gkfs_metadata.cpp @@ -1165,6 +1165,19 @@ gkfs_opendir(const std::string& path) { ret.second->add("..", gkfs::filemap::FileType::directory); for(auto& fut : dcache_futures) { auto res = fut.get(); // Wait for the RPC result + + if(res.first != 0) { + ret.first = res.first; + LOG(ERROR, "{}() RPC failed with error: {}", __func__, + res.first); + continue; + } + if(!res.second) { + LOG(ERROR, "{}() RPC returned null entries vector", __func__); + ret.first = EIO; + continue; + } + auto& open_dir = *res.second; for(auto& dentry : open_dir) { // type returns as unsigned char @@ -1186,7 +1199,6 @@ gkfs_opendir(const std::string& path) { get<3>(dentry)}); cnt++; } - ret.first = res.first; } LOG(DEBUG, "{}() Unpacked dirents for path '{}' counted '{}' entries", __func__, path, cnt); -- GitLab From b7a75e9e7c3736c787200fa7c02ede27704df492 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 10 Mar 2026 18:12:58 +0100 Subject: [PATCH 4/6] sharding optimization --- include/client/open_file_map.hpp | 17 +++++-- src/client/open_file_map.cpp | 79 ++++++++++++++++++++++---------- src/client/preload.cpp | 10 ++-- 3 files changed, 77 insertions(+), 29 deletions(-) diff --git a/include/client/open_file_map.hpp b/include/client/open_file_map.hpp index 15c4245a9..54a2fdb20 100644 --- a/include/client/open_file_map.hpp +++ b/include/client/open_file_map.hpp @@ -138,10 +138,18 @@ private: class OpenFileMap { - private: - std::map> files_; - std::recursive_mutex files_mutex_; + struct MapShard { + std::map> files; + mutable std::recursive_mutex mutex; + }; + + static constexpr size_t num_shards = 32; + std::array shards_; + std::atomic total_files_{0}; + + MapShard& + get_shard(int fd); int safe_generate_fd_idx_(); @@ -194,6 +202,9 @@ public: std::vector get_range(unsigned int first, unsigned int last); + + bool + empty() const; }; } // namespace gkfs::filemap diff --git a/src/client/open_file_map.cpp b/src/client/open_file_map.cpp index ebc4d86ca..71a32d113 100644 --- a/src/client/open_file_map.cpp +++ b/src/client/open_file_map.cpp @@ -145,11 +145,22 @@ OpenFile::inline_data_size(size_t size) { OpenFile::inline_data_size_ = size; } +OpenFileMap::MapShard& +OpenFileMap::get_shard(int fd) { + if(fd < 0) { + return shards_[0]; // Default shard for invalid FDs, or we could throw + } + return shards_[fd % num_shards]; +} + shared_ptr OpenFileMap::get(int fd) { - lock_guard lock(files_mutex_); - auto f = files_.find(fd); - if(f == files_.end()) { + if(fd < 0) + return nullptr; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + auto f = shard.files.find(fd); + if(f == shard.files.end()) { return nullptr; } else { return f->second; @@ -167,9 +178,12 @@ OpenFileMap::get_dir(int dirfd) { bool OpenFileMap::exist(const int fd) { - lock_guard lock(files_mutex_); - auto f = files_.find(fd); - return !(f == files_.end()); + if(fd < 0) + return false; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + auto f = shard.files.find(fd); + return !(f == shard.files.end()); } int @@ -224,19 +238,27 @@ OpenFileMap::safe_generate_fd_idx_() { int OpenFileMap::add(std::shared_ptr open_file) { auto fd = safe_generate_fd_idx_(); - lock_guard lock(files_mutex_); - files_[fd] = open_file; + if(fd < 0) + return fd; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + shard.files[fd] = open_file; + total_files_++; return fd; } bool OpenFileMap::remove(const int fd) { - lock_guard lock(files_mutex_); - auto f = files_.find(fd); - if(f == files_.end()) { + if(fd < 0) + return false; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + auto f = shard.files.find(fd); + if(f == shard.files.end()) { return false; } - files_.erase(fd); + shard.files.erase(fd); + total_files_--; if(!CTX->protect_fds()) { if(!CTX->range_fd()) { @@ -245,7 +267,7 @@ OpenFileMap::remove(const int fd) { return true; } } - if(fd_validation_needed && files_.empty()) { + if(fd_validation_needed && empty()) { fd_validation_needed = false; LOG(DEBUG, "fd_validation flag reset"); } @@ -254,20 +276,20 @@ OpenFileMap::remove(const int fd) { int OpenFileMap::dup(const int oldfd) { - lock_guard lock(files_mutex_); auto open_file = get(oldfd); if(open_file == nullptr) { errno = EBADF; return -1; } auto newfd = safe_generate_fd_idx_(); - files_.insert(make_pair(newfd, open_file)); + auto& shard = get_shard(newfd); + lock_guard lock(shard.mutex); + shard.files.insert(make_pair(newfd, open_file)); return newfd; } int OpenFileMap::dup2(const int oldfd, const int newfd) { - lock_guard lock(files_mutex_); auto open_file = get(oldfd); if(open_file == nullptr) { errno = EBADF; @@ -283,7 +305,10 @@ OpenFileMap::dup2(const int oldfd, const int newfd) { // by os streams that we do not overwrite if(get_fd_idx() < newfd && newfd != 0 && newfd != 1 && newfd != 2) fd_validation_needed = true; - files_.insert(make_pair(newfd, open_file)); + + auto& shard = get_shard(newfd); + lock_guard lock(shard.mutex); + shard.files.insert(make_pair(newfd, open_file)); return newfd; } @@ -318,15 +343,23 @@ OpenFileMap::get_fd_idx() { std::vector OpenFileMap::get_range(unsigned int first, unsigned int last) { - std::lock_guard lock(files_mutex_); std::vector result; - // files_ is a sorted std::map, so lower_bound gives us an efficient start - auto it = files_.lower_bound(static_cast(first)); - while(it != files_.end() && static_cast(it->first) <= last) { - result.push_back(it->first); - ++it; + for(auto& shard : shards_) { + lock_guard lock(shard.mutex); + auto it = shard.files.lower_bound(static_cast(first)); + while(it != shard.files.end() && + static_cast(it->first) <= last) { + result.push_back(it->first); + ++it; + } } + std::sort(result.begin(), result.end()); return result; } +bool +OpenFileMap::empty() const { + return total_files_ == 0; +} + } // namespace gkfs::filemap \ No newline at end of file diff --git a/src/client/preload.cpp b/src/client/preload.cpp index 11a7b4423..c08388ec4 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -385,12 +385,16 @@ init_preload() { #ifdef ENABLE_USER return; #endif + static std::recursive_mutex init_mutex; + std::lock_guard lock(init_mutex); + if(init) { + return; + } // The original errno value will be restored after initialization to not // leak internal error codes auto oerrno = errno; - if(atomic_exchange(&init, 1) == 0) { - pthread_atfork(&at_fork, &at_parent, &at_child); - } + init = true; + pthread_atfork(&at_fork, &at_parent, &at_child); #ifndef BYPASS_SYSCALL CTX->enable_interception(); -- GitLab From 6cf45a3606347845896ccd56116fec10980cf229 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 10 Mar 2026 19:16:59 +0100 Subject: [PATCH 5/6] fix remove chunk --- src/client/rpc/forward_metadata.cpp | 31 +++ src/daemon/handler/srv_metadata.cpp | 12 - .../directories/test_backend_cleanup.py | 243 ++++++++++++++++-- tests/integration/harness/gkfs.py | 58 ++++- 4 files changed, 294 insertions(+), 50 deletions(-) diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 23434fc3e..5baf556ec 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -118,6 +118,7 @@ int forward_remove(const std::string& path, bool rm_dir, int8_t num_copies) { int err = 0; + // Step 1: Remove metadata from the responsible server(s) // We iterate over replicas for(auto copy = 0; copy < num_copies + 1; copy++) { auto endp = CTX->hosts().at( @@ -135,6 +136,36 @@ forward_remove(const std::string& path, bool rm_dir, int8_t num_copies) { err = out.err; } } + + // If metadata removal failed, bail out early + if(err != 0) + return err; + + // Step 2: Broadcast remove_data to ALL daemons so that every host can + // clean up whatever chunks it holds for this file. + // NOTE: When using the proxy, the proxy handles this broadcast itself, so + // we skip it here (CTX->use_proxy() is checked by the caller). + // We send remove_data unconditionally for non-directory removes. + // Daemons handle the no-op case safely (empty chunk dir == nothing to do). + if(!rm_dir) { + auto rpc_remove_data = + CTX->rpc_engine()->define(gkfs::rpc::tag::remove_data); + + gkfs::rpc::rpc_rm_node_in_t rm_in; + rm_in.path = path; + rm_in.rm_dir = true; // tells daemon to remove the whole chunk directory + + for(std::size_t i = 0; i < CTX->hosts().size(); i++) { + try { + rpc_remove_data.on(CTX->hosts().at(i)).async(rm_in); + } catch(const std::exception& e) { + LOG(WARNING, + "{}() Failed to send remove_data RPC to host {}: {}", + __func__, i, e.what()); + } + } + } + return err; } diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 3b9132f59..f98798aae 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -270,18 +270,6 @@ rpc_srv_remove_metadata(const tl::request& req, } } -/** - * @brief Serves a request to remove all file data chunks on this daemon. - * @internal - * The handler simply issues the removal of all chunk files on the local file - * system. - * - * All exceptions must be caught here and dealt with accordingly. Any errors are - * placed in the response. - * @endinteral - * @param handle Mercury RPC handle - * @return Mercury error code to Mercury - */ /** * @brief Serves a request to remove all file data chunks on this daemon. * @internal diff --git a/tests/integration/directories/test_backend_cleanup.py b/tests/integration/directories/test_backend_cleanup.py index dcb293794..b544cc0c2 100644 --- a/tests/integration/directories/test_backend_cleanup.py +++ b/tests/integration/directories/test_backend_cleanup.py @@ -3,33 +3,93 @@ from pathlib import Path import errno import stat import os +import time import pytest from harness.logger import logger +from harness.gkfs import Daemon + +# --------------------------------------------------------------------------- +# Helper: a thin workspace adapter so that each daemon in a multi-server +# test can have its own rootdir and logdir while sharing everything else +# (mountdir, twd / hosts file, bindirs, libdirs). +# --------------------------------------------------------------------------- +class _DaemonWorkspaceAdapter: + """Proxy workspace that redirects rootdir and logdir to per-daemon + sub-directories while forwarding everything else to the real workspace.""" + + def __init__(self, real_workspace, suffix): + self._ws = real_workspace + # Per-daemon directories + self._rootdir = real_workspace.twd / f"root_{suffix}" + self._logdir = real_workspace.logdir / f"daemon_{suffix}" + self._rootdir.mkdir(parents=True, exist_ok=True) + self._logdir.mkdir(parents=True, exist_ok=True) + + # --- overridden per-daemon paths ---------------------------------------- + @property + def rootdir(self): + return self._rootdir + + @property + def logdir(self): + return self._logdir + + # --- forwarded from real workspace -------------------------------------- + @property + def twd(self): + return self._ws.twd + + @property + def mountdir(self): + return self._ws.mountdir + + @property + def bindirs(self): + return self._ws.bindirs + + @property + def libdirs(self): + return self._ws.libdirs + + @property + def tmpdir(self): + return self._ws.tmpdir + + +def _wait_until(predicate, timeout_s=8.0, poll_s=0.2): + deadline = time.time() + timeout_s + while time.time() < deadline: + if predicate(): + return True + time.sleep(poll_s) + return predicate() + + +# --------------------------------------------------------------------------- +# Single-daemon test (existing behaviour) +# --------------------------------------------------------------------------- @pytest.mark.parametrize("client_fixture", ["gkfs_client", "gkfs_clientLibc"]) def test_backend_cleanup(client_fixture, request, gkfs_daemon): gkfs_client = request.getfixturevalue(client_fixture) mountdir = gkfs_daemon.mountdir rootdir = gkfs_daemon.rootdir - + file_path = mountdir / "cleanup_file" dir_path = mountdir / "cleanup_dir" - + # 1. Create a directory ret = gkfs_client.mkdir(dir_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval == 0 - + # 2. Create a file ret = gkfs_client.open(file_path, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval != -1 - + # 3. Write some data to generate chunks ret = gkfs_client.write_validate(file_path, 600000) assert ret.retval == 0 - - # Let's see what's in the backend rootdir - # Usually chunks are in `chunks` directory - # We will just walk the rootdir and count files/directories to see what exists + def get_backend_contents(directory): contents = [] for root, dirs, files in os.walk(directory): @@ -38,32 +98,167 @@ def test_backend_cleanup(client_fixture, request, gkfs_daemon): for f in files: contents.append(os.path.join(root, f)) return contents - + contents_before = get_backend_contents(rootdir) logger.info(f"Backend contents before deletion: {contents_before}") - - # Check that chunks exist somewhere in rootdir - # GekkoFS stores chunks typically in `/chunks/` + chunk_dir = rootdir / "chunks" - - # Ensure properties exist assert chunk_dir.exists() - - # Count chunks in chunk_dir + chunks_before = get_backend_contents(chunk_dir) - assert len(chunks_before) > 0, f"Expected chunks in {chunk_dir}, but found none. Total backend: {contents_before}" - + assert len(chunks_before) > 0, \ + f"Expected chunks in {chunk_dir}, but found none. Total backend: {contents_before}" + # 4. Remove file and directory ret = gkfs_client.unlink(file_path) assert ret.retval == 0 - + ret = gkfs_client.rmdir(dir_path) assert ret.retval == 0 - - # 5. Check backend again + + # 5. Check backend again – all chunks must be gone contents_after = get_backend_contents(rootdir) logger.info(f"Backend contents after deletion: {contents_after}") - + + emptied = _wait_until(lambda: len(get_backend_contents(chunk_dir)) == 0) chunks_after = get_backend_contents(chunk_dir) - # The chunk directory corresponding to the file should be deleted or empty - assert len(chunks_after) == 0, f"Expected data directory to be empty, but found: {chunks_after}" + assert emptied, \ + f"Expected data directory to be empty within timeout, but found: {chunks_after}" + + +# --------------------------------------------------------------------------- +# Multi-daemon test – reproduces the bug where chunks are not removed from +# all servers when a file that spans multiple daemons is deleted. +# --------------------------------------------------------------------------- +@pytest.mark.parametrize("client_fixture", ["gkfs_client", "gkfs_clientLibc"]) +def test_backend_cleanup_multi_daemon(client_fixture, request, test_workspace): + """ + Starts NUM_DAEMONS GekkoFS daemons that share a single mountdir and + hosts file. A file large enough to be distributed across all servers is + written and then deleted. After deletion *every* server's chunk + directory must be empty. + + This test reproduces the bug reported in March 2026 where file chunks + remained on workers 1 and 2 after a removal issued from worker 0. + """ + + NUM_DAEMONS = 3 + # Number of 64 KiB chunks to write – enough to distribute across 3 servers + # (GekkoFS default chunk size is 524288 B; 64 chunks = 32 MiB total) + WRITE_BYTES = 64 * 524288 + + interface = request.config.getoption('--interface') + + daemons = [] + adapters = [] + + try: + # ---------------------------------------------------------------- + # Start NUM_DAEMONS daemons, all sharing the same hosts file (twd) + # and mountdir but each with its own rootdir / logdir. + # ---------------------------------------------------------------- + for idx in range(NUM_DAEMONS): + adapter = _DaemonWorkspaceAdapter(test_workspace, idx) + adapters.append(adapter) + + daemon = Daemon(interface, "rocksdb", adapter) + daemon.run() + daemons.append(daemon) + logger.info(f"Daemon {idx} started (rootdir={adapter.rootdir})") + + gkfs_client = request.getfixturevalue(client_fixture) + mountdir = test_workspace.mountdir + + # All chunk dirs – one per daemon + chunk_dirs = [a.rootdir / "chunks" for a in adapters] + + # ---------------------------------------------------------------- + # Create and write a large file so chunks land on multiple servers + # ---------------------------------------------------------------- + file_path = mountdir / "multi_server_file" + + ret = gkfs_client.open( + file_path, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO, + ) + assert ret.retval != -1, "Failed to open file for writing" + + ret = gkfs_client.write_validate(file_path, WRITE_BYTES) + assert ret.retval == 0, "Failed to write data to file" + + # ---------------------------------------------------------------- + # Verify that at least SOME chunks exist across the cluster + # ---------------------------------------------------------------- + def get_chunks(chunk_dir): + if not chunk_dir.exists(): + return [] + result = [] + for root, dirs, files in os.walk(chunk_dir): + result.extend(os.path.join(root, f) for f in files) + return result + + all_chunks_before = [] + for i, cd in enumerate(chunk_dirs): + c = get_chunks(cd) + logger.info(f"Daemon {i} chunks before delete: {len(c)} chunks in {cd}") + all_chunks_before.extend(c) + + assert len(all_chunks_before) > 0, \ + "No chunks found on any daemon after write – something went wrong" + + # Check that data is distributed (at least 2 daemons should have chunks + # for a sufficiently large file). + daemons_with_chunks = sum(1 for cd in chunk_dirs if len(get_chunks(cd)) > 0) + logger.info(f"{daemons_with_chunks}/{NUM_DAEMONS} daemons have chunks before delete") + assert daemons_with_chunks > 1, ( + f"Expected chunks to be distributed across >1 daemon, " + f"but only {daemons_with_chunks} daemon(s) have data. " + f"The file may be too small to trigger distribution." + ) + + # ---------------------------------------------------------------- + # Delete the file + # ---------------------------------------------------------------- + ret = gkfs_client.unlink(file_path) + assert ret.retval == 0, f"unlink failed with retval={ret.retval}" + + # ---------------------------------------------------------------- + # Bug check: ALL daemons must have zero chunks after deletion. + # Cleanup is asynchronous, so allow a grace period. + # ---------------------------------------------------------------- + failed_daemons = [] + + def all_chunk_dirs_empty(): + nonlocal failed_daemons + failed_daemons = [] + for i, cd in enumerate(chunk_dirs): + remaining = get_chunks(cd) + if remaining: + failed_daemons.append((i, remaining)) + return len(failed_daemons) == 0 + + emptied = _wait_until(all_chunk_dirs_empty) + + if not emptied: + for i, remaining in failed_daemons: + logger.error( + f"Daemon {i} still has {len(remaining)} chunk(s) after delete: " + f"{remaining[:10]}{'...' if len(remaining) > 10 else ''}" + ) + else: + for i, _cd in enumerate(chunk_dirs): + logger.info(f"Daemon {i}: chunk directory is clean after delete ✓") + + assert emptied, ( + f"Bug reproduced! {len(failed_daemons)} daemon(s) still have residual chunks " + f"after the file was deleted. Affected daemons: " + f"{[idx for idx, _ in failed_daemons]}" + ) + + finally: + for daemon in reversed(daemons): + try: + daemon.shutdown() + except Exception as e: + logger.warning(f"Error shutting down daemon: {e}") diff --git a/tests/integration/harness/gkfs.py b/tests/integration/harness/gkfs.py index 3de7d7587..947c93626 100644 --- a/tests/integration/harness/gkfs.py +++ b/tests/integration/harness/gkfs.py @@ -204,6 +204,36 @@ def _find_search_paths(additional_paths=None): return paths_to_search + +def _compose_ld_library_path(preferred_dirs, workspace_dirs, inherited_ld_path): + """ + Build LD_LIBRARY_PATH with deterministic priority and de-duplicated entries. + The preferred directories are always placed first. + """ + + ordered_entries = [] + seen = set() + + def add_entry(entry): + if not entry: + return + normalized = str(Path(entry).resolve()) + if normalized in seen: + return + seen.add(normalized) + ordered_entries.append(normalized) + + for d in preferred_dirs: + add_entry(d) + + for d in workspace_dirs: + add_entry(d) + + for d in inherited_ld_path.split(':'): + add_entry(d) + + return ':'.join(ordered_entries) + class FwdDaemonCreator: """ Factory that allows tests to create forwarding daemons in a workspace. @@ -588,10 +618,6 @@ class Client: self._env = os.environ.copy() self._proxy = proxy - libdirs = ':'.join( - filter(None, [str(p) for p in self._workspace.libdirs] + - [os.environ.get('LD_LIBRARY_PATH', '')])) - # ensure the client interception library is available: # to avoid running code with potentially installed libraries, # it must be found in one (and only one) of the workspace's bindirs @@ -614,6 +640,12 @@ class Client: self._preload_library = preloads[0] + libdirs = _compose_ld_library_path( + preferred_dirs=[self._preload_library.parent], + workspace_dirs=self._workspace.libdirs, + inherited_ld_path=os.environ.get('LD_LIBRARY_PATH', ''), + ) + self._patched_env = { 'LD_LIBRARY_PATH': libdirs, 'LD_PRELOAD': str(self._preload_library), @@ -707,10 +739,6 @@ class ClientLibc: self._cmd = find_command(gkfs_client_cmd, self._workspace.bindirs) self._env = os.environ.copy() - libdirs = ':'.join( - filter(None, [str(p) for p in self._workspace.libdirs] + - [os.environ.get('LD_LIBRARY_PATH', '')])) - # ensure the client interception library is available: # to avoid running code with potentially installed libraries, # it must be found in one (and only one) of the workspace's bindirs @@ -724,15 +752,17 @@ class ClientLibc: logger.error(f'No client libraries found in the test\'s binary directories:') pytest.exit("Aborted due to initialization error. Check test logs.") - if len(preloads) != 1: - logger.error(f'Multiple client libraries found in the test\'s binary directories:') - for p in preloads: - logger.error(f' {p}') - logger.error(f'Make sure that only one copy of the client library is available.') - pytest.exit("Aborted due to initialization error. Check test logs.") + if len(preloads) > 1: + logger.warning(f'Multiple client libraries found. Using the first one: {preloads[0]}') self._preload_library = preloads[0] + libdirs = _compose_ld_library_path( + preferred_dirs=[self._preload_library.parent], + workspace_dirs=self._workspace.libdirs, + inherited_ld_path=os.environ.get('LD_LIBRARY_PATH', ''), + ) + self._patched_env = { 'LD_LIBRARY_PATH': libdirs, 'LD_PRELOAD': str(self._preload_library), -- GitLab From 15631512d757b1e8c2000d9b76cd072c165ab1bc Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 10 Mar 2026 19:18:51 +0100 Subject: [PATCH 6/6] changelog --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 51c44810e..3fe49a59e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,9 +33,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Added tests for FUSE support - Optimized and fixed fuse ([!293](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/293)) - New optimization options - - Less contention with threading - - + - Less contention with threading using sharding + + ### Changed @@ -51,6 +51,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Fix pytorch mmap ([!291](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/291)) - Fix cuda in syscall ([!292](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/292)) - mmap and dangling fd issues + - Fix remove chunk bug ([!294](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/294)) ## [0.9.5] - 2025-08 -- GitLab