diff --git a/CHANGELOG.md b/CHANGELOG.md index 32bcb3b2f557dae4edf79dd493990cad570aecb3..51c44810e538f929be51e9da1d3a025b31280688 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Added FUSE support([!264](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/264)) - fuse_client mounting a virtual path exists, avoiding usage of LD_PRELOAD. - Added tests for FUSE support + - Optimized and fixed fuse ([!293](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/293)) + - New optimization options + - Less contention with threading + diff --git a/README.md b/README.md index 3898baf13bf7beb8207e349a0e941dd144088676..597b0a043cf1eb93ba1614fac70a8ec9d40d1ce8 100644 --- a/README.md +++ b/README.md @@ -283,18 +283,19 @@ The `fuse_client` requires the `LIBGKFS_HOSTS_FILE` environment variable to be s Common launch arguments: - `-f`: Run in foreground. -- `-s`: Single-threaded operation (recommended). +- `-s`: Single-threaded operation. With the recent FUSE optimizations, multi-threaded operation is now the default and recommended for performance. Use `-s` only for debugging. - ``: The directory where GekkoFS will be mounted. FUSE-specific options can be passed via `-o`: - `writeback`: Enable writeback cache. - `direct_io`: Enable direct I/O (can boost performance significantly). +- `max_threads=`: Limit the maximum number of background threads FUSE can spawn to serve requests (supported in FUSE 3.12+). - `timeout=`: Caching timeout (default: 1.0). - `cache={never,auto,always}`: Cache policy. Example with options: ```bash -/bin/fuse_client -f -s -o writeback,direct_io,timeout=5.0 +/bin/fuse_client -f -o writeback,direct_io,timeout=5.0,max_threads=32 ``` ## Logging @@ -628,6 +629,10 @@ Client-metrics require the CMake argument `-DGKFS_ENABLE_CLIENT_METRICS=ON` (see - `LIBGKFS_READ_INLINE_PREFETCH` - Prefetch inline data when opening files (default: OFF). - `LIBGKFS_USE_DIRENTS_COMPRESSION` - Enable compression for directory entries (default: OFF). - `LIBGKFS_DIRENTS_BUFF_SIZE` - Buffer size for directory entries (default: 8MB). +- `GKFS_FUSE_ENTRY_TIMEOUT` - Caching timeout for dentry entries in the FUSE client (default: 1.0). +- `GKFS_FUSE_ATTR_TIMEOUT` - Caching timeout for file attributes in the FUSE client (default: 1.0). +- `GKFS_FUSE_NEGATIVE_TIMEOUT` - Caching timeout for negative lookups in the FUSE client (default: 1.0). +- `GKFS_FUSE_WRITEBACK` - Enable writeback cache in the FUSE client (default: OFF). #### Caching ##### Dentry cache diff --git a/include/client/fuse/fuse_client.hpp b/include/client/fuse/fuse_client.hpp index a9e9db0223e91cc1b7c7adb6134944bf5a56971f..19866c4b0da6d4327fe535bd657190094de9287b 100644 --- a/include/client/fuse/fuse_client.hpp +++ b/include/client/fuse/fuse_client.hpp @@ -72,6 +72,7 @@ extern "C" { #include #include #include +#include #ifdef __FreeBSD__ #include @@ -111,7 +112,18 @@ struct _uintptr_to_must_hold_fuse_ino_t_dummy_struct { struct Inode { std::string path; - uint64_t lookup_count; + std::atomic lookup_count; + + Inode() : lookup_count(0) {} + Inode(const std::string& p, uint64_t lc) : path(p), lookup_count(lc) {} + Inode(const Inode& other) : path(other.path), lookup_count(other.lookup_count.load()) {} + Inode& operator=(const Inode& other) { + if (this != &other) { + path = other.path; + lookup_count.store(other.lookup_count.load()); + } + return *this; + } }; enum { @@ -135,6 +147,9 @@ struct u_data { int xattr; char* mountpoint; double timeout; + double entry_timeout; + double attr_timeout; + double negative_timeout; int cache; int timeout_set; }; diff --git a/src/client/fuse/fuse_client.cpp b/src/client/fuse/fuse_client.cpp index 6470233290a1d274e2600a163371f516ad882c2b..1f9406989960083f690a43547a40fe7f7f1e62d0 100644 --- a/src/client/fuse/fuse_client.cpp +++ b/src/client/fuse/fuse_client.cpp @@ -40,24 +40,27 @@ #include static struct fuse_lowlevel_ops ll_ops; -static std::mutex ino_mutex; +struct InodeShard { + std::mutex mutex; + std::unordered_map ino_map; +}; + +struct PathShard { + std::mutex mutex; + std::unordered_map path_map; +}; + +const int SHARD_COUNT = 32; +static PathShard path_shards[SHARD_COUNT]; +static InodeShard ino_shards[SHARD_COUNT]; + static Inode root_inode; -static std::unordered_map ino_map; -static std::unordered_map path_map; -static std::unordered_map local_fifos; -static fuse_ino_t next_ino = 2; // reserve 1 for root -static const std::string fifo_path = "/tmp/gekkofs_fifos/"; +static std::unordered_map + local_fifos; // TODO shard if needed +static std::mutex fifos_mutex; -#ifdef GKFS_DEBUG_BUILD -#define DEBUG_INFO(ud, fmt, ...) \ - do { \ - if((ud) && (ud)->debug) { \ - fuse_log(FUSE_LOG_DEBUG, "[DEBUG] " fmt "\n", ##__VA_ARGS__); \ - } \ - } while(0) -#else -#define DEBUG_INFO(...) /* No debug output */ -#endif +static std::atomic next_ino{2}; // reserve 1 for root +static const std::string fifo_path = "/tmp/gekkofs_fifos/"; #include @@ -68,16 +71,21 @@ gkfs_statfs(struct statfs* buf); static fuse_ino_t alloc_inode(const std::string& path) { - std::lock_guard lk(ino_mutex); fuse_ino_t ino; if(gkfs::config::fuse::pointertrick) { Inode* inode = new Inode{path, 1}; ino = (fuse_ino_t) inode; } else { ino = next_ino++; - ino_map[ino] = {path, 1}; - } - path_map[path] = ino; + auto& i_shard = ino_shards[std::hash{}(ino) % SHARD_COUNT]; + std::lock_guard lk(i_shard.mutex); + i_shard.ino_map.emplace(std::piecewise_construct, + std::forward_as_tuple(ino), + std::forward_as_tuple(path, 1)); + } + auto& p_shard = path_shards[std::hash{}(path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + p_shard.path_map[path] = ino; return ino; } @@ -89,23 +97,40 @@ get_inode(fuse_ino_t ino) { if(gkfs::config::fuse::pointertrick) { return (Inode*) ino; } else { - std::lock_guard lk(ino_mutex); - auto it = ino_map.find(ino); - return it != ino_map.end() ? &it->second : nullptr; + auto& i_shard = ino_shards[std::hash{}(ino) % SHARD_COUNT]; + std::lock_guard lk(i_shard.mutex); + auto it = i_shard.ino_map.find(ino); + return it != i_shard.ino_map.end() ? &it->second : nullptr; } } static void remove_inode_by_path(const std::string path) { - std::lock_guard lk(ino_mutex); - auto it_src = path_map.find(path); - if(it_src != path_map.end()) { - if(gkfs::config::fuse::pointertrick) { - delete get_inode(it_src->second); - } else { - ino_map.erase(it_src->second); - } - path_map.erase(it_src); + auto& p_shard = path_shards[std::hash{}(path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + auto it_src = p_shard.path_map.find(path); + if(it_src != p_shard.path_map.end()) { + p_shard.path_map.erase(it_src); + } +} + +static void +remove_inode_by_ino(fuse_ino_t ino) { + auto* inode = get_inode(ino); + if(!inode) + return; + + // Remove from path_map if it was still linked + if(!inode->path.empty()) { + remove_inode_by_path(inode->path); + } + + if(gkfs::config::fuse::pointertrick) { + delete inode; + } else { + auto& i_shard = ino_shards[std::hash{}(ino) % SHARD_COUNT]; + std::lock_guard lk(i_shard.mutex); + i_shard.ino_map.erase(ino); } } @@ -121,33 +146,42 @@ fill_fuse_entry_param(const u_data* ud, const std::string path, fuse_ino_t ino = 0; bool found = false; + Inode* inode = nullptr; // CRITICAL SECTION { - std::lock_guard lk(ino_mutex); + auto& p_shard = + path_shards[std::hash{}(path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); if(check_path_map) { - auto it = path_map.find(path); - if(it != path_map.end()) { + auto it = p_shard.path_map.find(path); + if(it != p_shard.path_map.end()) { ino = it->second; found = true; + if(gkfs::config::fuse::pointertrick) { + inode = (Inode*) ino; + } else { + auto& i_shard = ino_shards[std::hash{}(ino) % + SHARD_COUNT]; + std::lock_guard i_lk(i_shard.mutex); + auto it_ino = i_shard.ino_map.find(ino); + if(it_ino != i_shard.ino_map.end()) { + inode = &it_ino->second; + inode->lookup_count++; + } + } } } } // Lock releases here - if(found) { - auto inode = get_inode(ino); - if(inode) { - // this could be critical because get_inode is not locked - inode->lookup_count++; - } - } else { + if(!found) { ino = alloc_inode(path); } fep.ino = ino; fep.attr = st; - fep.attr_timeout = ud->timeout; - fep.entry_timeout = ud->timeout; + fep.attr_timeout = ud->attr_timeout; + fep.entry_timeout = ud->entry_timeout; return 0; } @@ -225,23 +259,28 @@ passthrough_ll_help(void) { static void init_handler(void* userdata, struct fuse_conn_info* conn) { struct u_data* ud = (struct u_data*) userdata; - DEBUG_INFO(ud, - "init handler: requested readahead %i direct_io %i max_read2 %i", - ud->max_readahead, ud->direct_io, ud->max_read2); - DEBUG_INFO(ud, "current values: readahead %i direct_io %i max_read2 %i", - conn->max_readahead, conn->max_read, conn->max_write); + LOG(DEBUG, "init handler: requested readahead {} direct_io {} max_read2 {}", + ud->max_readahead, ud->direct_io, ud->max_read2); + LOG(DEBUG, "current values: readahead {} direct_io {} max_read2 {}", + conn->max_readahead, conn->max_read, conn->max_write); #if FUSE_MAJOR_VERSION > 3 || \ (FUSE_MAJOR_VERSION == 3 && FUSE_MINOR_VERSION >= 12) if(ud->writeback) { - fuse_set_feature_flag(conn, FUSE_CAP_WRITEBACK_CACHE); - DEBUG_INFO(ud, "init_handler: try to activate writeback"); + conn->want |= FUSE_CAP_WRITEBACK_CACHE; + LOG(DEBUG, "init_handler: try to activate writeback"); } + if(ud->max_background > 0) { + conn->want |= FUSE_CAP_ASYNC_DIO; + } + conn->want |= FUSE_CAP_PARALLEL_DIROPS; + conn->want |= FUSE_CAP_ATOMIC_O_TRUNC; + // TODO copy the other feature flags here #else if(ud->writeback) { conn->want |= FUSE_CAP_WRITEBACK_CACHE; - DEBUG_INFO(ud, "init_handler: try to activate writeback"); + LOG(DEBUG, "init_handler: try to activate writeback"); } // TODO make this optional!!! // default activated if kernel supports it: FUSE_CAP_PARALLEL_DIROPS, @@ -285,22 +324,21 @@ init_handler(void* userdata, struct fuse_conn_info* conn) { static void destroy_handler(void* userdata) { - struct u_data* ud = (struct u_data*) userdata; - DEBUG_INFO(ud, "destroy handler"); + LOG(DEBUG, "destroy handler"); // userdata is GekkoFuse* if passed } static void lookup_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { auto* ud = udata(req); - DEBUG_INFO(ud, "lookup handler ino %u", parent); + LOG(DEBUG, "lookup handler ino {}", parent); auto* parent_inode = get_inode(parent); if(!parent_inode) { fuse_reply_err(req, ENOENT); return; } std::string child = get_path(parent_inode, name); - DEBUG_INFO(ud, "lookup %s", child.c_str()); + LOG(DEBUG, "lookup {}", child); if(ud->fifo) { auto iit = local_fifos.find(child); @@ -330,7 +368,7 @@ lookup_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { static void getattr_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "getattr handler"); + LOG(DEBUG, "getattr handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -340,7 +378,7 @@ getattr_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { struct stat st; int rc = gkfs::syscall::gkfs_stat(inode->path, &st, false); if(rc) { - DEBUG_INFO(ud, "getattr error %u", rc); + LOG(DEBUG, "getattr error {}", rc); fuse_reply_err(req, errno); return; } @@ -351,7 +389,7 @@ static void setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "setattr handler ino %u", ino); + LOG(DEBUG, "setattr handler ino {}", ino); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -362,8 +400,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, off_t new_size = attr->st_size; int res = gkfs::syscall::gkfs_truncate(inode->path, new_size); if(res < 0) { - DEBUG_INFO(ud, "setattr truncate failed on %s", - inode->path.c_str()); + LOG(DEBUG, "setattr truncate failed on {}", inode->path); fuse_reply_err(req, EIO); return; } @@ -372,7 +409,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, struct stat st; int rc = gkfs::syscall::gkfs_stat(inode->path, &st, false); if(rc) { - DEBUG_INFO(ud, "getattr error %u", rc); + LOG(DEBUG, "getattr error {}", rc); fuse_reply_err(req, errno); return; } @@ -386,7 +423,7 @@ setattr_handler(fuse_req_t req, fuse_ino_t ino, struct stat* attr, int to_set, static void open_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "open handler"); + LOG(DEBUG, "open handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -407,8 +444,7 @@ open_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { static void lseek_handler(fuse_req_t req, fuse_ino_t ino, off_t off, int whence, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "lseek handler"); + LOG(DEBUG, "lseek handler"); int lc = gkfs::syscall::gkfs_lseek(fi->fh, off, whence); if(lc < 0) { fuse_reply_err(req, errno); @@ -420,8 +456,7 @@ lseek_handler(fuse_req_t req, fuse_ino_t ino, off_t off, int whence, static void read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "read handler"); + LOG(DEBUG, "read handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); if(!inode) { @@ -429,21 +464,22 @@ read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } } - std::vector buf(size); - int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.data(), size, off); + + // Allocate buffer without zero-initialization for performance + auto buf = std::make_unique(size); + int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.get(), size, off); if(rc < 0) { - DEBUG_INFO(ud, "read fail"); + LOG(DEBUG, "read fail"); fuse_reply_err(req, errno); return; } - fuse_reply_buf(req, buf.data(), rc); + fuse_reply_buf(req, buf.get(), rc); } static void write_handler(fuse_req_t req, fuse_ino_t ino, const char* buf, size_t size, off_t off, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "write handler"); + LOG(DEBUG, "write handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); if(!inode) { @@ -453,7 +489,7 @@ write_handler(fuse_req_t req, fuse_ino_t ino, const char* buf, size_t size, } int rc = gkfs::syscall::gkfs_pwrite(fi->fh, buf, size, off); if(rc < 0) { - DEBUG_INFO(ud, "write fail"); + LOG(DEBUG, "write fail"); fuse_reply_err(req, errno); return; } @@ -477,16 +513,15 @@ create_handler(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t mode, int rc = gkfs::syscall::gkfs_create(path, mode); int errno_bu = errno; if(rc == -1) { - fuse_log(FUSE_LOG_DEBUG, - "create failed, here here mode %i flags %i errno %i\n", mode, + LOG(DEBUG, "create failed, here here mode {} flags {} errno {}", mode, fi->flags, errno); } int fd = gkfs::syscall::gkfs_open(path, mode, fi->flags); */ - DEBUG_INFO(ud, "create handler %s", path.c_str()); + LOG(DEBUG, "create handler {}", path); int fd = gkfs::syscall::gkfs_open(path, mode, fi->flags | O_CREAT); if(fd < 0) { - DEBUG_INFO(ud, "create -> open failed errno %i", errno); + LOG(DEBUG, "create -> open failed errno {}", errno); fuse_reply_err(req, errno); return; } @@ -507,7 +542,7 @@ create_handler(fuse_req_t req, fuse_ino_t parent, const char* name, mode_t mode, static void unlink_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { auto* ud = udata(req); - DEBUG_INFO(ud, "unlink handler"); + LOG(DEBUG, "unlink handler"); auto* parent_inode = get_inode(parent); if(!parent_inode) { fuse_reply_err(req, ENOENT); @@ -537,18 +572,17 @@ unlink_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { static void opendir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "opendir handler"); + LOG(DEBUG, "opendir handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOTDIR); return; } - DEBUG_INFO(ud, "open dir %s", inode->path.c_str()); + LOG(DEBUG, "open dir {}", inode->path); const int fd = gkfs::syscall::gkfs_opendir(inode->path); - DEBUG_INFO(ud, "\t with fd %i \n", fd); + LOG(DEBUG, "\t with fd {} ", fd); if(fd < 0) { fuse_reply_err(req, ENOTDIR); @@ -563,7 +597,7 @@ static void readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "readdir handler"); + LOG(DEBUG, "readdir handler"); auto open_dir = CTX->file_map()->get_dir(fi->fh); if(open_dir == nullptr) { @@ -571,7 +605,7 @@ readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } - DEBUG_INFO(ud, "read dir %s", open_dir->path().c_str()); + LOG(DEBUG, "read dir {}", open_dir->path()); // Allocate a buffer to accumulate entries char* buf = static_cast(malloc(size)); @@ -644,8 +678,7 @@ readdir_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, static void statfs_handler(fuse_req_t req, fuse_ino_t ino) { - auto* ud = udata(req); - DEBUG_INFO(ud, "statfs handler ino %lu", ino); + LOG(DEBUG, "statfs handler ino {}", ino); struct statfs stfs{}; int rc = gkfs::syscall::gkfs_statfs(&stfs); if(rc < 0) { @@ -672,7 +705,7 @@ static void readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { auto* ud = udata(req); - DEBUG_INFO(ud, "readdirplus handler"); + LOG(DEBUG, "readdirplus handler"); auto open_dir = CTX->file_map()->get_dir(fi->fh); if(open_dir == nullptr) { @@ -680,7 +713,7 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, return; } - DEBUG_INFO(ud, "read dirplus %s", open_dir->path().c_str()); + LOG(DEBUG, "read dirplus {}", open_dir->path()); // Allocate a buffer to accumulate entries char* buf = static_cast(malloc(size)); @@ -699,7 +732,7 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, std::string child = open_dir->path() == "/" ? "/" + de.name() : open_dir->path() + "/" + de.name(); - DEBUG_INFO(ud, "read dirplus child %s", child.c_str()); + LOG(DEBUG, "read dirplus child {}", child); fuse_entry_param e = {}; @@ -777,10 +810,10 @@ readdirplus_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, static void releasedir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "releasedir handler"); + LOG(DEBUG, "releasedir handler"); if(CTX->interception_enabled() && CTX->file_map()->exist(fi->fh)) { - int ret = gkfs::syscall::gkfs_close(fd); // Close GekkoFS internal FD + int ret = + gkfs::syscall::gkfs_close(fi->fh); // Close GekkoFS internal FD fuse_reply_err(req, ret); return; @@ -791,8 +824,7 @@ releasedir_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { /// releases file descriptor, not connected to lookup_count static void release_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "release handler"); + LOG(DEBUG, "release handler"); auto* inode = get_inode(ino); if(!inode) { fuse_reply_err(req, ENOENT); @@ -809,8 +841,7 @@ release_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { /// decrement lookup count static void forget_handler(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) { - auto* ud = udata(req); - DEBUG_INFO(ud, "forget handler"); + LOG(DEBUG, "forget handler"); Inode* inode = get_inode(ino); if(!inode) { @@ -818,24 +849,28 @@ forget_handler(fuse_req_t req, fuse_ino_t ino, uint64_t nlookup) { return; } - if(inode->lookup_count > nlookup) - inode->lookup_count -= nlookup; - else - inode->lookup_count = 0; + uint64_t current = inode->lookup_count.load(); + while(current >= nlookup) { + if(inode->lookup_count.compare_exchange_weak(current, + current - nlookup)) { + break; + } + } + if(current < nlookup) { + inode->lookup_count.store(0); + } - if(inode->lookup_count == 0) { // && inode.open_count == 0 - remove_inode_by_path(inode->path); - DEBUG_INFO(ud, "reached lookup_count 0"); + if(inode->lookup_count.load() == 0) { // && inode.open_count == 0 + LOG(DEBUG, "reached lookup_count 0 for ino {}", ino); + remove_inode_by_ino(ino); } fuse_reply_none(req); - // fuse_reply_err(req, 0); } static void flush_handler(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - auto* ud = udata(req); - DEBUG_INFO(ud, "flush handler"); + LOG(DEBUG, "flush handler"); if(gkfs::config::fuse::check_inode) { auto* inode = get_inode(ino); if(!inode) { @@ -861,7 +896,7 @@ fsync_handler(fuse_req_t req, fuse_ino_t ino, int datasync, static void access_handler(fuse_req_t req, fuse_ino_t ino, int mask) { auto* ud = udata(req); - DEBUG_INFO(ud, "access handler"); + LOG(DEBUG, "access handler"); if(ud->access && !ud->fifo) { auto* inode = get_inode(ino); if(!inode) { @@ -891,7 +926,7 @@ mkdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name, return; } std::string path = get_path(parent_inode, name); - DEBUG_INFO(ud, "mkdir parent %s name %s", parent_inode->path.c_str(), name); + LOG(DEBUG, "mkdir parent {} name {}", parent_inode->path, name); int rc = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); if(rc == -1) { fuse_reply_err(req, errno); @@ -909,14 +944,13 @@ mkdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name, static void rmdir_handler(fuse_req_t req, fuse_ino_t parent, const char* name) { - auto* ud = udata(req); auto* parent_inode = get_inode(parent); if(!parent_inode) { fuse_reply_err(req, ENOENT); return; } std::string path = get_path(parent_inode, name); - DEBUG_INFO(ud, "rmdir %s", path.c_str()); + LOG(DEBUG, "rmdir {}", path); int rc = gkfs::syscall::gkfs_rmdir(path); if(rc == -1) { fuse_reply_err(req, errno); @@ -987,7 +1021,7 @@ symlink_handler(fuse_req_t req, const char* linkname, fuse_ino_t parent, // this shows the link on ls -l e.attr.st_mode = S_IFLNK | 0777; // mark as symlink + full perms e.attr.st_size = strlen(target.c_str()); - // DEBUG_INFO(ud, "stat mode %i, iflink %i", e.attr.st_mode, S_IFLNK); + // LOG(DEBUG, "stat mode {}, iflink {}", e.attr.st_mode, S_IFLNK); fuse_reply_entry(req, &e); } @@ -1039,40 +1073,51 @@ rename_handler(fuse_req_t req, fuse_ino_t old_parent, const char* old_name, return; } - ino_mutex.lock(); - auto it_src = path_map.find(old_path); fuse_ino_t src_ino = 0; - - if(it_src != path_map.end()) { - src_ino = it_src->second; - path_map.erase(it_src); - path_map[new_path] = src_ino; - } else { - src_ino = alloc_inode(new_path); - path_map[new_path] = src_ino; + { + auto& p_shard = + path_shards[std::hash{}(old_path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + auto it_src = p_shard.path_map.find(old_path); + if(it_src != p_shard.path_map.end()) { + src_ino = it_src->second; + p_shard.path_map.erase(it_src); + } } - ino_mutex.unlock(); - auto* inode = get_inode(src_ino); - if(inode) { - inode->path = new_path; - } + if(src_ino == 0) { + src_ino = alloc_inode(new_path); + } else { + auto& p_shard = + path_shards[std::hash{}(new_path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + + fuse_ino_t old_dst_ino = 0; + auto it_dst = p_shard.path_map.find(new_path); + if(it_dst != p_shard.path_map.end()) { + old_dst_ino = it_dst->second; + } - // If destination existed and was overwritten, detach its mapping - if(dst_exists) { - auto it_dst = path_map.find(new_path); - if(it_dst != path_map.end()) { - fuse_ino_t dst_ino = it_dst->second; - path_map.erase(it_dst); + p_shard.path_map[new_path] = src_ino; - // Mark the old dst inode as disconnected (no pathname). - // Keep it alive until lookup_count==0 and open_count==0, then free - // in forget(). - auto& dst_rec = ino_map[dst_ino]; - dst_rec.path.clear(); + if(old_dst_ino != 0) { + auto& i_shard = ino_shards[std::hash{}(old_dst_ino) % + SHARD_COUNT]; + std::lock_guard i_lk(i_shard.mutex); + auto it_ino = i_shard.ino_map.find(old_dst_ino); + if(it_ino != i_shard.ino_map.end()) { + it_ino->second.path.clear(); + } } } + auto& i_shard = ino_shards[std::hash{}(src_ino) % SHARD_COUNT]; + std::lock_guard i_lk(i_shard.mutex); + auto it_ino = i_shard.ino_map.find(src_ino); + if(it_ino != i_shard.ino_map.end()) { + it_ino->second.path = new_path; + } + fuse_reply_err(req, 0); } @@ -1142,14 +1187,48 @@ init_gekkofs() { struct stat st; int rc = gkfs::syscall::gkfs_stat(root_path, &st); if(rc < 0) { - fuse_log(FUSE_LOG_ERR, "failed to open root\n"); + LOG(ERROR, "failed to open root"); exit(1); } - root_inode = {root_path, 1}; - path_map[root_path] = FUSE_ROOT_ID; + root_inode.path = root_path; + auto& p_shard = + path_shards[std::hash{}(root_path) % SHARD_COUNT]; + std::lock_guard lk(p_shard.mutex); + p_shard.path_map[root_path] = FUSE_ROOT_ID; std::cout << "root node allocated" << std::endl; } +static void +forget_multi_handler(fuse_req_t req, size_t count, + struct fuse_forget_data* forgets) { + LOG(DEBUG, "forget_multi handler count {}", count); + for(size_t i = 0; i < count; i++) { + fuse_ino_t ino = forgets[i].ino; + uint64_t nlookup = forgets[i].nlookup; + + Inode* inode = get_inode(ino); + if(!inode) + continue; + + uint64_t current = inode->lookup_count.load(); + while(current >= nlookup) { + if(inode->lookup_count.compare_exchange_weak(current, + current - nlookup)) { + break; + } + } + if(current < nlookup) { + inode->lookup_count.store(0); + } + + if(inode->lookup_count.load() == 0) { + LOG(DEBUG, "reached lookup_count 0 for ino {} (multi)", ino); + remove_inode_by_ino(ino); + } + } + fuse_reply_none(req); +} + static void init_ll_ops(fuse_lowlevel_ops* ops) { // file @@ -1159,7 +1238,7 @@ init_ll_ops(fuse_lowlevel_ops* ops) { ops->create = create_handler; ops->unlink = unlink_handler; ops->forget = forget_handler; - // ops->forget_multi + ops->forget_multi = forget_multi_handler; ops->readlink = readlink_handler; ops->mknod = mknod_handler; ops->symlink = symlink_handler; @@ -1295,10 +1374,33 @@ main(int argc, char* argv[]) { break; } } else if(ud.timeout < 0) { - fuse_log(FUSE_LOG_ERR, "timeout is negative (%lf)\n", ud.timeout); + LOG(ERROR, "timeout is negative ({})", ud.timeout); exit(1); } + ud.entry_timeout = ud.timeout; + ud.attr_timeout = ud.timeout; + ud.negative_timeout = ud.timeout; + + if(const char* env_e = std::getenv("GKFS_FUSE_ENTRY_TIMEOUT")) { + ud.entry_timeout = std::stod(env_e); + LOG(INFO, "FUSE entry_timeout set to {}", ud.entry_timeout); + } + if(const char* env_a = std::getenv("GKFS_FUSE_ATTR_TIMEOUT")) { + ud.attr_timeout = std::stod(env_a); + LOG(INFO, "FUSE attr_timeout set to {}", ud.attr_timeout); + } + if(const char* env_n = std::getenv("GKFS_FUSE_NEGATIVE_TIMEOUT")) { + ud.negative_timeout = std::stod(env_n); + LOG(INFO, "FUSE negative_timeout set to {}", ud.negative_timeout); + } + + if(const char* env_w = std::getenv("GKFS_FUSE_WRITEBACK")) { + ud.writeback = + (std::string(env_w) == "ON" || std::string(env_w) == "1"); + LOG(INFO, "FUSE writeback set to {}", ud.writeback); + } + ud.mountpoint = strdup(opts.mountpoint); init_gekkofs(); diff --git a/src/client/gkfs_metadata.cpp b/src/client/gkfs_metadata.cpp index 4530885aaae2777b85fb5050458588713fcffd93..ef0e1ed53a3a366d4280ceca649ecdb16a336cf1 100644 --- a/src/client/gkfs_metadata.cpp +++ b/src/client/gkfs_metadata.cpp @@ -1165,6 +1165,19 @@ gkfs_opendir(const std::string& path) { ret.second->add("..", gkfs::filemap::FileType::directory); for(auto& fut : dcache_futures) { auto res = fut.get(); // Wait for the RPC result + + if(res.first != 0) { + ret.first = res.first; + LOG(ERROR, "{}() RPC failed with error: {}", __func__, + res.first); + continue; + } + if(!res.second) { + LOG(ERROR, "{}() RPC returned null entries vector", __func__); + ret.first = EIO; + continue; + } + auto& open_dir = *res.second; for(auto& dentry : open_dir) { // type returns as unsigned char @@ -1186,7 +1199,6 @@ gkfs_opendir(const std::string& path) { get<3>(dentry)}); cnt++; } - ret.first = res.first; } LOG(DEBUG, "{}() Unpacked dirents for path '{}' counted '{}' entries", __func__, path, cnt); diff --git a/tests/integration/fuse/test_metadata_bench.py b/tests/integration/fuse/test_metadata_bench.py new file mode 100644 index 0000000000000000000000000000000000000000..abbf9e94023e299a3d31a50f8449898d33a2d364 --- /dev/null +++ b/tests/integration/fuse/test_metadata_bench.py @@ -0,0 +1,98 @@ +import os +import time +import threading +import pytest +import statistics + +def benchmark_metadata_ops(mountdir, num_threads, num_iters): + def worker(tid, results): + thread_dir = os.path.join(mountdir, f"bench_t{tid}") + os.makedirs(thread_dir, exist_ok=True) + + start = time.perf_counter() + for i in range(num_iters): + # Create/Stat/Remove + p = os.path.join(thread_dir, f"f{i}") + with open(p, "w") as f: + f.write("x") + os.stat(p) + os.remove(p) + end = time.perf_counter() + results.append(end - start) + + results = [] + threads = [] + for i in range(num_threads): + t = threading.Thread(target=worker, args=(i, results)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + total_time = max(results) + ops_per_sec = (num_threads * num_iters * 3) / total_time + return ops_per_sec + +def benchmark_read_latency(mountdir, num_iters): + p = os.path.join(mountdir, "baseline_file") + with open(p, "w") as f: + f.write("data") + + # Warm up + os.stat(p) + + latencies = [] + for i in range(num_iters): + start = time.perf_counter() + os.stat(p) + end = time.perf_counter() + latencies.append(end - start) + + return statistics.median(latencies) * 1e6 # in microseconds + +def benchmark_write_throughput(mountdir, num_threads, size_mb): + def worker(tid, results): + p = os.path.join(mountdir, f"write_bench_t{tid}") + data = b"x" * 1024 * 1024 # 1MB + + start = time.perf_counter() + with open(p, "wb") as f: + for _ in range(size_mb): + f.write(data) + f.flush() + os.fsync(f.fileno()) + end = time.perf_counter() + results.append(end - start) + + results = [] + threads = [] + for i in range(num_threads): + t = threading.Thread(target=worker, args=(i, results)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + total_time = max(results) + throughput_mb = (num_threads * size_mb) / total_time + return throughput_mb + +@pytest.mark.parametrize("num_threads", [1, 8]) +def test_benchmark_performance(gkfs_daemon, fuse_client, num_threads): + mountdir = fuse_client.mountdir + iters = 100 + + print(f"\n--- Benchmarking with {num_threads} threads ---") + + # Run with current settings + ops = benchmark_metadata_ops(mountdir, num_threads, iters) + print(f"Metadata Throughput: {ops:.2f} ops/sec") + + lat = benchmark_read_latency(mountdir, iters) + print(f"Stat Latency (Median): {lat:.2f} us") + + if num_threads == 1: + tp = benchmark_write_throughput(mountdir, 1, 100) + print(f"Write Throughput (1 thread): {tp:.2f} MB/s") diff --git a/tests/integration/fuse/test_stress.py b/tests/integration/fuse/test_stress.py new file mode 100644 index 0000000000000000000000000000000000000000..cbf74ea57e371009669455ff7f67bc81706eeb9c --- /dev/null +++ b/tests/integration/fuse/test_stress.py @@ -0,0 +1,45 @@ +import os +import threading +import pytest +import sh + +def thread_task(mountdir, thread_id, num_iters): + thread_dir = os.path.join(mountdir, f"thread_{thread_id}") + os.makedirs(thread_dir, exist_ok=True) + + for i in range(num_iters): + file_path = os.path.join(thread_dir, f"file_{i}") + # Create + with open(file_path, "w") as f: + f.write("data") + # Stat + os.stat(file_path) + # Unlink + os.remove(file_path) + + # Subdir create/rmdir + subdir = os.path.join(thread_dir, f"dir_{i}") + os.mkdir(subdir) + os.rmdir(subdir) + +@pytest.mark.parametrize("num_threads", [10]) +@pytest.mark.parametrize("iters_per_thread", [100]) +def test_metadata_stress(gkfs_daemon, fuse_client, num_threads, iters_per_thread): + mountdir = fuse_client.mountdir + threads = [] + + for i in range(num_threads): + t = threading.Thread(target=thread_task, args=(mountdir, i, iters_per_thread)) + threads.append(t) + t.start() + + for t in threads: + t.join() + + # Final check of the root directory + assert "thread_0" in os.listdir(mountdir) + + # Cleanup + for i in range(num_threads): + thread_dir = os.path.join(mountdir, f"thread_{i}") + sh.rm("-rf", thread_dir)