diff --git a/CHANGELOG.md b/CHANGELOG.md index 51c44810e538f929be51e9da1d3a025b31280688..3fe49a59e307d16625908e70eba06256ee5ad822 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,9 +33,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Added tests for FUSE support - Optimized and fixed fuse ([!293](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/293)) - New optimization options - - Less contention with threading - - + - Less contention with threading using sharding + + ### Changed @@ -51,6 +51,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - Fix pytorch mmap ([!291](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/291)) - Fix cuda in syscall ([!292](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/292)) - mmap and dangling fd issues + - Fix remove chunk bug ([!294](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/294)) ## [0.9.5] - 2025-08 diff --git a/include/client/open_file_map.hpp b/include/client/open_file_map.hpp index 15c4245a9c8504c67abae9522f1598b0e3790571..54a2fdb20c8e933a4647a9bb5060736235b51023 100644 --- a/include/client/open_file_map.hpp +++ b/include/client/open_file_map.hpp @@ -138,10 +138,18 @@ private: class OpenFileMap { - private: - std::map> files_; - std::recursive_mutex files_mutex_; + struct MapShard { + std::map> files; + mutable std::recursive_mutex mutex; + }; + + static constexpr size_t num_shards = 32; + std::array shards_; + std::atomic total_files_{0}; + + MapShard& + get_shard(int fd); int safe_generate_fd_idx_(); @@ -194,6 +202,9 @@ public: std::vector get_range(unsigned int first, unsigned int last); + + bool + empty() const; }; } // namespace gkfs::filemap diff --git a/src/client/open_file_map.cpp b/src/client/open_file_map.cpp index ebc4d86ca78191eb8f056b0ea0455f27e90b2f8b..71a32d1136cdbd689c628bc71c6c467178726b61 100644 --- a/src/client/open_file_map.cpp +++ b/src/client/open_file_map.cpp @@ -145,11 +145,22 @@ OpenFile::inline_data_size(size_t size) { OpenFile::inline_data_size_ = size; } +OpenFileMap::MapShard& +OpenFileMap::get_shard(int fd) { + if(fd < 0) { + return shards_[0]; // Default shard for invalid FDs, or we could throw + } + return shards_[fd % num_shards]; +} + shared_ptr OpenFileMap::get(int fd) { - lock_guard lock(files_mutex_); - auto f = files_.find(fd); - if(f == files_.end()) { + if(fd < 0) + return nullptr; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + auto f = shard.files.find(fd); + if(f == shard.files.end()) { return nullptr; } else { return f->second; @@ -167,9 +178,12 @@ OpenFileMap::get_dir(int dirfd) { bool OpenFileMap::exist(const int fd) { - lock_guard lock(files_mutex_); - auto f = files_.find(fd); - return !(f == files_.end()); + if(fd < 0) + return false; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + auto f = shard.files.find(fd); + return !(f == shard.files.end()); } int @@ -224,19 +238,27 @@ OpenFileMap::safe_generate_fd_idx_() { int OpenFileMap::add(std::shared_ptr open_file) { auto fd = safe_generate_fd_idx_(); - lock_guard lock(files_mutex_); - files_[fd] = open_file; + if(fd < 0) + return fd; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + shard.files[fd] = open_file; + total_files_++; return fd; } bool OpenFileMap::remove(const int fd) { - lock_guard lock(files_mutex_); - auto f = files_.find(fd); - if(f == files_.end()) { + if(fd < 0) + return false; + auto& shard = get_shard(fd); + lock_guard lock(shard.mutex); + auto f = shard.files.find(fd); + if(f == shard.files.end()) { return false; } - files_.erase(fd); + shard.files.erase(fd); + total_files_--; if(!CTX->protect_fds()) { if(!CTX->range_fd()) { @@ -245,7 +267,7 @@ OpenFileMap::remove(const int fd) { return true; } } - if(fd_validation_needed && files_.empty()) { + if(fd_validation_needed && empty()) { fd_validation_needed = false; LOG(DEBUG, "fd_validation flag reset"); } @@ -254,20 +276,20 @@ OpenFileMap::remove(const int fd) { int OpenFileMap::dup(const int oldfd) { - lock_guard lock(files_mutex_); auto open_file = get(oldfd); if(open_file == nullptr) { errno = EBADF; return -1; } auto newfd = safe_generate_fd_idx_(); - files_.insert(make_pair(newfd, open_file)); + auto& shard = get_shard(newfd); + lock_guard lock(shard.mutex); + shard.files.insert(make_pair(newfd, open_file)); return newfd; } int OpenFileMap::dup2(const int oldfd, const int newfd) { - lock_guard lock(files_mutex_); auto open_file = get(oldfd); if(open_file == nullptr) { errno = EBADF; @@ -283,7 +305,10 @@ OpenFileMap::dup2(const int oldfd, const int newfd) { // by os streams that we do not overwrite if(get_fd_idx() < newfd && newfd != 0 && newfd != 1 && newfd != 2) fd_validation_needed = true; - files_.insert(make_pair(newfd, open_file)); + + auto& shard = get_shard(newfd); + lock_guard lock(shard.mutex); + shard.files.insert(make_pair(newfd, open_file)); return newfd; } @@ -318,15 +343,23 @@ OpenFileMap::get_fd_idx() { std::vector OpenFileMap::get_range(unsigned int first, unsigned int last) { - std::lock_guard lock(files_mutex_); std::vector result; - // files_ is a sorted std::map, so lower_bound gives us an efficient start - auto it = files_.lower_bound(static_cast(first)); - while(it != files_.end() && static_cast(it->first) <= last) { - result.push_back(it->first); - ++it; + for(auto& shard : shards_) { + lock_guard lock(shard.mutex); + auto it = shard.files.lower_bound(static_cast(first)); + while(it != shard.files.end() && + static_cast(it->first) <= last) { + result.push_back(it->first); + ++it; + } } + std::sort(result.begin(), result.end()); return result; } +bool +OpenFileMap::empty() const { + return total_files_ == 0; +} + } // namespace gkfs::filemap \ No newline at end of file diff --git a/src/client/preload.cpp b/src/client/preload.cpp index 11a7b4423d3fc5b473b50ff24ccc0527570eeff6..c08388ec41b7fd1ebe74be8a08f43b02a5f11243 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -385,12 +385,16 @@ init_preload() { #ifdef ENABLE_USER return; #endif + static std::recursive_mutex init_mutex; + std::lock_guard lock(init_mutex); + if(init) { + return; + } // The original errno value will be restored after initialization to not // leak internal error codes auto oerrno = errno; - if(atomic_exchange(&init, 1) == 0) { - pthread_atfork(&at_fork, &at_parent, &at_child); - } + init = true; + pthread_atfork(&at_fork, &at_parent, &at_child); #ifndef BYPASS_SYSCALL CTX->enable_interception(); diff --git a/src/client/rpc/forward_metadata.cpp b/src/client/rpc/forward_metadata.cpp index 23434fc3e68e1f9f700e30241aeadf75e8c77961..5baf556ec26116e5325d96fcb1839bcf901da70b 100644 --- a/src/client/rpc/forward_metadata.cpp +++ b/src/client/rpc/forward_metadata.cpp @@ -118,6 +118,7 @@ int forward_remove(const std::string& path, bool rm_dir, int8_t num_copies) { int err = 0; + // Step 1: Remove metadata from the responsible server(s) // We iterate over replicas for(auto copy = 0; copy < num_copies + 1; copy++) { auto endp = CTX->hosts().at( @@ -135,6 +136,36 @@ forward_remove(const std::string& path, bool rm_dir, int8_t num_copies) { err = out.err; } } + + // If metadata removal failed, bail out early + if(err != 0) + return err; + + // Step 2: Broadcast remove_data to ALL daemons so that every host can + // clean up whatever chunks it holds for this file. + // NOTE: When using the proxy, the proxy handles this broadcast itself, so + // we skip it here (CTX->use_proxy() is checked by the caller). + // We send remove_data unconditionally for non-directory removes. + // Daemons handle the no-op case safely (empty chunk dir == nothing to do). + if(!rm_dir) { + auto rpc_remove_data = + CTX->rpc_engine()->define(gkfs::rpc::tag::remove_data); + + gkfs::rpc::rpc_rm_node_in_t rm_in; + rm_in.path = path; + rm_in.rm_dir = true; // tells daemon to remove the whole chunk directory + + for(std::size_t i = 0; i < CTX->hosts().size(); i++) { + try { + rpc_remove_data.on(CTX->hosts().at(i)).async(rm_in); + } catch(const std::exception& e) { + LOG(WARNING, + "{}() Failed to send remove_data RPC to host {}: {}", + __func__, i, e.what()); + } + } + } + return err; } diff --git a/src/daemon/handler/srv_metadata.cpp b/src/daemon/handler/srv_metadata.cpp index 3b9132f591e9a132ab5117acb8c13cff812cb31a..f98798aae75582de4ee8d4b8c9a8a3010a781b2e 100644 --- a/src/daemon/handler/srv_metadata.cpp +++ b/src/daemon/handler/srv_metadata.cpp @@ -270,18 +270,6 @@ rpc_srv_remove_metadata(const tl::request& req, } } -/** - * @brief Serves a request to remove all file data chunks on this daemon. - * @internal - * The handler simply issues the removal of all chunk files on the local file - * system. - * - * All exceptions must be caught here and dealt with accordingly. Any errors are - * placed in the response. - * @endinteral - * @param handle Mercury RPC handle - * @return Mercury error code to Mercury - */ /** * @brief Serves a request to remove all file data chunks on this daemon. * @internal diff --git a/tests/integration/directories/test_backend_cleanup.py b/tests/integration/directories/test_backend_cleanup.py index dcb293794a6e7339d05f74d85ebe789b0a8fb13f..b544cc0c2692d5b3f6a8a625452ed353173f7405 100644 --- a/tests/integration/directories/test_backend_cleanup.py +++ b/tests/integration/directories/test_backend_cleanup.py @@ -3,33 +3,93 @@ from pathlib import Path import errno import stat import os +import time import pytest from harness.logger import logger +from harness.gkfs import Daemon + +# --------------------------------------------------------------------------- +# Helper: a thin workspace adapter so that each daemon in a multi-server +# test can have its own rootdir and logdir while sharing everything else +# (mountdir, twd / hosts file, bindirs, libdirs). +# --------------------------------------------------------------------------- +class _DaemonWorkspaceAdapter: + """Proxy workspace that redirects rootdir and logdir to per-daemon + sub-directories while forwarding everything else to the real workspace.""" + + def __init__(self, real_workspace, suffix): + self._ws = real_workspace + # Per-daemon directories + self._rootdir = real_workspace.twd / f"root_{suffix}" + self._logdir = real_workspace.logdir / f"daemon_{suffix}" + self._rootdir.mkdir(parents=True, exist_ok=True) + self._logdir.mkdir(parents=True, exist_ok=True) + + # --- overridden per-daemon paths ---------------------------------------- + @property + def rootdir(self): + return self._rootdir + + @property + def logdir(self): + return self._logdir + + # --- forwarded from real workspace -------------------------------------- + @property + def twd(self): + return self._ws.twd + + @property + def mountdir(self): + return self._ws.mountdir + + @property + def bindirs(self): + return self._ws.bindirs + + @property + def libdirs(self): + return self._ws.libdirs + + @property + def tmpdir(self): + return self._ws.tmpdir + + +def _wait_until(predicate, timeout_s=8.0, poll_s=0.2): + deadline = time.time() + timeout_s + while time.time() < deadline: + if predicate(): + return True + time.sleep(poll_s) + return predicate() + + +# --------------------------------------------------------------------------- +# Single-daemon test (existing behaviour) +# --------------------------------------------------------------------------- @pytest.mark.parametrize("client_fixture", ["gkfs_client", "gkfs_clientLibc"]) def test_backend_cleanup(client_fixture, request, gkfs_daemon): gkfs_client = request.getfixturevalue(client_fixture) mountdir = gkfs_daemon.mountdir rootdir = gkfs_daemon.rootdir - + file_path = mountdir / "cleanup_file" dir_path = mountdir / "cleanup_dir" - + # 1. Create a directory ret = gkfs_client.mkdir(dir_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval == 0 - + # 2. Create a file ret = gkfs_client.open(file_path, os.O_CREAT | os.O_WRONLY, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) assert ret.retval != -1 - + # 3. Write some data to generate chunks ret = gkfs_client.write_validate(file_path, 600000) assert ret.retval == 0 - - # Let's see what's in the backend rootdir - # Usually chunks are in `chunks` directory - # We will just walk the rootdir and count files/directories to see what exists + def get_backend_contents(directory): contents = [] for root, dirs, files in os.walk(directory): @@ -38,32 +98,167 @@ def test_backend_cleanup(client_fixture, request, gkfs_daemon): for f in files: contents.append(os.path.join(root, f)) return contents - + contents_before = get_backend_contents(rootdir) logger.info(f"Backend contents before deletion: {contents_before}") - - # Check that chunks exist somewhere in rootdir - # GekkoFS stores chunks typically in `/chunks/` + chunk_dir = rootdir / "chunks" - - # Ensure properties exist assert chunk_dir.exists() - - # Count chunks in chunk_dir + chunks_before = get_backend_contents(chunk_dir) - assert len(chunks_before) > 0, f"Expected chunks in {chunk_dir}, but found none. Total backend: {contents_before}" - + assert len(chunks_before) > 0, \ + f"Expected chunks in {chunk_dir}, but found none. Total backend: {contents_before}" + # 4. Remove file and directory ret = gkfs_client.unlink(file_path) assert ret.retval == 0 - + ret = gkfs_client.rmdir(dir_path) assert ret.retval == 0 - - # 5. Check backend again + + # 5. Check backend again – all chunks must be gone contents_after = get_backend_contents(rootdir) logger.info(f"Backend contents after deletion: {contents_after}") - + + emptied = _wait_until(lambda: len(get_backend_contents(chunk_dir)) == 0) chunks_after = get_backend_contents(chunk_dir) - # The chunk directory corresponding to the file should be deleted or empty - assert len(chunks_after) == 0, f"Expected data directory to be empty, but found: {chunks_after}" + assert emptied, \ + f"Expected data directory to be empty within timeout, but found: {chunks_after}" + + +# --------------------------------------------------------------------------- +# Multi-daemon test – reproduces the bug where chunks are not removed from +# all servers when a file that spans multiple daemons is deleted. +# --------------------------------------------------------------------------- +@pytest.mark.parametrize("client_fixture", ["gkfs_client", "gkfs_clientLibc"]) +def test_backend_cleanup_multi_daemon(client_fixture, request, test_workspace): + """ + Starts NUM_DAEMONS GekkoFS daemons that share a single mountdir and + hosts file. A file large enough to be distributed across all servers is + written and then deleted. After deletion *every* server's chunk + directory must be empty. + + This test reproduces the bug reported in March 2026 where file chunks + remained on workers 1 and 2 after a removal issued from worker 0. + """ + + NUM_DAEMONS = 3 + # Number of 64 KiB chunks to write – enough to distribute across 3 servers + # (GekkoFS default chunk size is 524288 B; 64 chunks = 32 MiB total) + WRITE_BYTES = 64 * 524288 + + interface = request.config.getoption('--interface') + + daemons = [] + adapters = [] + + try: + # ---------------------------------------------------------------- + # Start NUM_DAEMONS daemons, all sharing the same hosts file (twd) + # and mountdir but each with its own rootdir / logdir. + # ---------------------------------------------------------------- + for idx in range(NUM_DAEMONS): + adapter = _DaemonWorkspaceAdapter(test_workspace, idx) + adapters.append(adapter) + + daemon = Daemon(interface, "rocksdb", adapter) + daemon.run() + daemons.append(daemon) + logger.info(f"Daemon {idx} started (rootdir={adapter.rootdir})") + + gkfs_client = request.getfixturevalue(client_fixture) + mountdir = test_workspace.mountdir + + # All chunk dirs – one per daemon + chunk_dirs = [a.rootdir / "chunks" for a in adapters] + + # ---------------------------------------------------------------- + # Create and write a large file so chunks land on multiple servers + # ---------------------------------------------------------------- + file_path = mountdir / "multi_server_file" + + ret = gkfs_client.open( + file_path, + os.O_CREAT | os.O_WRONLY, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO, + ) + assert ret.retval != -1, "Failed to open file for writing" + + ret = gkfs_client.write_validate(file_path, WRITE_BYTES) + assert ret.retval == 0, "Failed to write data to file" + + # ---------------------------------------------------------------- + # Verify that at least SOME chunks exist across the cluster + # ---------------------------------------------------------------- + def get_chunks(chunk_dir): + if not chunk_dir.exists(): + return [] + result = [] + for root, dirs, files in os.walk(chunk_dir): + result.extend(os.path.join(root, f) for f in files) + return result + + all_chunks_before = [] + for i, cd in enumerate(chunk_dirs): + c = get_chunks(cd) + logger.info(f"Daemon {i} chunks before delete: {len(c)} chunks in {cd}") + all_chunks_before.extend(c) + + assert len(all_chunks_before) > 0, \ + "No chunks found on any daemon after write – something went wrong" + + # Check that data is distributed (at least 2 daemons should have chunks + # for a sufficiently large file). + daemons_with_chunks = sum(1 for cd in chunk_dirs if len(get_chunks(cd)) > 0) + logger.info(f"{daemons_with_chunks}/{NUM_DAEMONS} daemons have chunks before delete") + assert daemons_with_chunks > 1, ( + f"Expected chunks to be distributed across >1 daemon, " + f"but only {daemons_with_chunks} daemon(s) have data. " + f"The file may be too small to trigger distribution." + ) + + # ---------------------------------------------------------------- + # Delete the file + # ---------------------------------------------------------------- + ret = gkfs_client.unlink(file_path) + assert ret.retval == 0, f"unlink failed with retval={ret.retval}" + + # ---------------------------------------------------------------- + # Bug check: ALL daemons must have zero chunks after deletion. + # Cleanup is asynchronous, so allow a grace period. + # ---------------------------------------------------------------- + failed_daemons = [] + + def all_chunk_dirs_empty(): + nonlocal failed_daemons + failed_daemons = [] + for i, cd in enumerate(chunk_dirs): + remaining = get_chunks(cd) + if remaining: + failed_daemons.append((i, remaining)) + return len(failed_daemons) == 0 + + emptied = _wait_until(all_chunk_dirs_empty) + + if not emptied: + for i, remaining in failed_daemons: + logger.error( + f"Daemon {i} still has {len(remaining)} chunk(s) after delete: " + f"{remaining[:10]}{'...' if len(remaining) > 10 else ''}" + ) + else: + for i, _cd in enumerate(chunk_dirs): + logger.info(f"Daemon {i}: chunk directory is clean after delete ✓") + + assert emptied, ( + f"Bug reproduced! {len(failed_daemons)} daemon(s) still have residual chunks " + f"after the file was deleted. Affected daemons: " + f"{[idx for idx, _ in failed_daemons]}" + ) + + finally: + for daemon in reversed(daemons): + try: + daemon.shutdown() + except Exception as e: + logger.warning(f"Error shutting down daemon: {e}") diff --git a/tests/integration/harness/gkfs.py b/tests/integration/harness/gkfs.py index 3de7d75871818b38770c83be7115b20dae9802ee..947c93626f978cb1e7cbffad042f6c62ad085d4c 100644 --- a/tests/integration/harness/gkfs.py +++ b/tests/integration/harness/gkfs.py @@ -204,6 +204,36 @@ def _find_search_paths(additional_paths=None): return paths_to_search + +def _compose_ld_library_path(preferred_dirs, workspace_dirs, inherited_ld_path): + """ + Build LD_LIBRARY_PATH with deterministic priority and de-duplicated entries. + The preferred directories are always placed first. + """ + + ordered_entries = [] + seen = set() + + def add_entry(entry): + if not entry: + return + normalized = str(Path(entry).resolve()) + if normalized in seen: + return + seen.add(normalized) + ordered_entries.append(normalized) + + for d in preferred_dirs: + add_entry(d) + + for d in workspace_dirs: + add_entry(d) + + for d in inherited_ld_path.split(':'): + add_entry(d) + + return ':'.join(ordered_entries) + class FwdDaemonCreator: """ Factory that allows tests to create forwarding daemons in a workspace. @@ -588,10 +618,6 @@ class Client: self._env = os.environ.copy() self._proxy = proxy - libdirs = ':'.join( - filter(None, [str(p) for p in self._workspace.libdirs] + - [os.environ.get('LD_LIBRARY_PATH', '')])) - # ensure the client interception library is available: # to avoid running code with potentially installed libraries, # it must be found in one (and only one) of the workspace's bindirs @@ -614,6 +640,12 @@ class Client: self._preload_library = preloads[0] + libdirs = _compose_ld_library_path( + preferred_dirs=[self._preload_library.parent], + workspace_dirs=self._workspace.libdirs, + inherited_ld_path=os.environ.get('LD_LIBRARY_PATH', ''), + ) + self._patched_env = { 'LD_LIBRARY_PATH': libdirs, 'LD_PRELOAD': str(self._preload_library), @@ -707,10 +739,6 @@ class ClientLibc: self._cmd = find_command(gkfs_client_cmd, self._workspace.bindirs) self._env = os.environ.copy() - libdirs = ':'.join( - filter(None, [str(p) for p in self._workspace.libdirs] + - [os.environ.get('LD_LIBRARY_PATH', '')])) - # ensure the client interception library is available: # to avoid running code with potentially installed libraries, # it must be found in one (and only one) of the workspace's bindirs @@ -724,15 +752,17 @@ class ClientLibc: logger.error(f'No client libraries found in the test\'s binary directories:') pytest.exit("Aborted due to initialization error. Check test logs.") - if len(preloads) != 1: - logger.error(f'Multiple client libraries found in the test\'s binary directories:') - for p in preloads: - logger.error(f' {p}') - logger.error(f'Make sure that only one copy of the client library is available.') - pytest.exit("Aborted due to initialization error. Check test logs.") + if len(preloads) > 1: + logger.warning(f'Multiple client libraries found. Using the first one: {preloads[0]}') self._preload_library = preloads[0] + libdirs = _compose_ld_library_path( + preferred_dirs=[self._preload_library.parent], + workspace_dirs=self._workspace.libdirs, + inherited_ld_path=os.environ.get('LD_LIBRARY_PATH', ''), + ) + self._patched_env = { 'LD_LIBRARY_PATH': libdirs, 'LD_PRELOAD': str(self._preload_library),