From 5c9d7216c2c7563f5654cc13da8be09a725ff3fa Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 12 Jun 2026 14:29:13 +0200 Subject: [PATCH 1/2] async writes --- include/client/env.hpp | 1 + include/client/preload_context.hpp | 45 ++++++++++ src/client/gkfs_data.cpp | 19 ++++ src/client/gkfs_metadata.cpp | 4 + src/client/preload.cpp | 17 ++++ src/client/preload_context.cpp | 136 ++++++++++++++++++++++++++++- 6 files changed, 221 insertions(+), 1 deletion(-) diff --git a/include/client/env.hpp b/include/client/env.hpp index 5da3df9df..e7b261dd7 100644 --- a/include/client/env.hpp +++ b/include/client/env.hpp @@ -98,6 +98,7 @@ static constexpr auto ENABLE_FORK = ADD_PREFIX("ENABLE_FORK"); static constexpr auto METADATA_BATCH = ADD_PREFIX("METADATA_BATCH"); static constexpr auto METADATA_BATCH_THRESHOLD = ADD_PREFIX("METADATA_BATCH_THRESHOLD"); +static constexpr auto ASYNC_WRITE = ADD_PREFIX("ASYNC_WRITE"); } // namespace gkfs::env diff --git a/include/client/preload_context.hpp b/include/client/preload_context.hpp index 844421f70..946bdc80f 100644 --- a/include/client/preload_context.hpp +++ b/include/client/preload_context.hpp @@ -42,6 +42,10 @@ #include #include +#include +#include +#include +#include #include #include #include @@ -52,6 +56,17 @@ #include +namespace gkfs::preload { +struct WriteTask { + std::string path; + std::vector buf; + off64_t offset; + size_t count; + int8_t num_replicas; + std::shared_ptr>> promise; +}; +} // namespace gkfs::preload + /* Forward declarations */ namespace gkfs { namespace filemap { @@ -163,6 +178,14 @@ private: metadata_batch_buffer_; mutable std::mutex metadata_batch_mutex_; + bool use_async_write_{false}; + std::queue async_write_queue_; + std::vector>> async_write_futures_; + std::mutex async_write_mutex_; + std::condition_variable async_write_cv_; + std::thread async_write_thread_; + bool async_write_stop_{false}; + public: static PreloadContext* @@ -404,6 +427,28 @@ public: void add_metadata_batch_entry(uint64_t host_id, const std::string& path, mode_t mode); + + bool + use_async_write() const; + + void + use_async_write(bool use_async_write); + + void + start_async_write_thread(); + + void + stop_async_write_thread(); + + void + async_write_worker(); + + void + enqueue_async_write(const std::string& path, const void* buf, + off64_t offset, size_t count, int8_t num_copies); + + void + wait_async_writes(); }; } // namespace preload diff --git a/src/client/gkfs_data.cpp b/src/client/gkfs_data.cpp index 5632dcc0e..7dc8292a7 100644 --- a/src/client/gkfs_data.cpp +++ b/src/client/gkfs_data.cpp @@ -277,6 +277,17 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count, } } + if(CTX->use_async_write()) { + CTX->enqueue_async_write(*path, buf, offset, count, 0); + if(num_replicas > 0) { + CTX->enqueue_async_write(*path, buf, offset, count, num_replicas); + } + if(update_pos) { + file.pos(offset + count); + } + return count; + } + pair ret_write; if(gkfs::config::proxy::fwd_io && CTX->use_proxy() && count > gkfs::config::proxy::fwd_io_count_threshold) { @@ -592,6 +603,9 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count, ssize_t gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count, off64_t offset) { + if(CTX->use_async_write()) { + CTX->wait_async_writes(); + } #ifdef GKFS_ENABLE_CLIENT_METRICS auto start_t = std::chrono::high_resolution_clock::now(); auto read = gkfs_do_read(file, buf, count, offset); @@ -724,6 +738,11 @@ gkfs_fsync(unsigned int fd) { errno = EBADF; return -1; } + + if(CTX->use_async_write()) { + CTX->wait_async_writes(); + } + // flush write size cache to be server consistent if(CTX->use_write_size_cache()) { auto err = CTX->write_size_cache()->flush(file->path(), true).first; diff --git a/src/client/gkfs_metadata.cpp b/src/client/gkfs_metadata.cpp index 0dcd0ac74..3287c6365 100644 --- a/src/client/gkfs_metadata.cpp +++ b/src/client/gkfs_metadata.cpp @@ -1910,6 +1910,10 @@ gkfs_close(unsigned int fd) { if(file) { const auto path = file->path(); + if(CTX->use_async_write()) { + CTX->wait_async_writes(); + } + if(file->get_flag(gkfs::filemap::OpenFile_flags::creation_pending)) { gkfs_create(path, file->mode()); file->set_flag(gkfs::filemap::OpenFile_flags::creation_pending, diff --git a/src/client/preload.cpp b/src/client/preload.cpp index 4fdbe4262..e9bcea7bc 100644 --- a/src/client/preload.cpp +++ b/src/client/preload.cpp @@ -362,6 +362,17 @@ init_environment() { LOG(INFO, "Metadata batching disabled."); } + auto use_async_write = + gkfs::env::get_var(gkfs::env::ASYNC_WRITE, "OFF") == "ON"; + if(use_async_write) { + CTX->use_async_write(true); + CTX->start_async_write_thread(); + LOG(INFO, "Client-side async write cache enabled."); + } else { + CTX->use_async_write(false); + LOG(INFO, "Client-side async write cache disabled."); + } + LOG(INFO, "Environment initialization successful."); } @@ -530,6 +541,12 @@ destroy_preload() { LOG(INFO, "Flushing final metadata batches..."); CTX->flush_metadata_batches(); } + + if(CTX->use_async_write()) { + LOG(INFO, "Flushing final async writes..."); + CTX->wait_async_writes(); + CTX->stop_async_write_thread(); + } auto forwarding_map_file = gkfs::env::get_var( gkfs::env::FORWARDING_MAP_FILE, gkfs::config::forwarding_file_path); if(!forwarding_map_file.empty()) { diff --git a/src/client/preload_context.cpp b/src/client/preload_context.cpp index 5c27fa31a..9e90b5c56 100644 --- a/src/client/preload_context.cpp +++ b/src/client/preload_context.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include @@ -108,7 +109,9 @@ PreloadContext::PreloadContext() // Destructor set here to allow unique_ptr of forward declared classes in the // header. T must be complete at the point of deletion. -PreloadContext::~PreloadContext() = default; +PreloadContext::~PreloadContext() { + stop_async_write_thread(); +} void PreloadContext::init_logging() { @@ -887,5 +890,136 @@ PreloadContext::add_metadata_batch_entry(uint64_t host_id, } } +bool +PreloadContext::use_async_write() const { + return use_async_write_; +} + +void +PreloadContext::use_async_write(bool use_async_write) { + use_async_write_ = use_async_write; +} + +void +PreloadContext::start_async_write_thread() { + if(use_async_write_) { + std::lock_guard lock(async_write_mutex_); + if(!async_write_thread_.joinable()) { + async_write_stop_ = false; + async_write_thread_ = + std::thread(&PreloadContext::async_write_worker, this); + LOG(INFO, "Started client-side async write worker thread."); + } + } +} + +void +PreloadContext::stop_async_write_thread() { + { + std::lock_guard lock(async_write_mutex_); + if(async_write_thread_.joinable()) { + async_write_stop_ = true; + async_write_cv_.notify_all(); + } else { + return; + } + } + if(async_write_thread_.joinable()) { + async_write_thread_.join(); + LOG(INFO, "Stopped client-side async write worker thread."); + } +} + +void +PreloadContext::async_write_worker() { + while(true) { + WriteTask task; + { + std::unique_lock lock(async_write_mutex_); + async_write_cv_.wait(lock, [this]() { + return async_write_stop_ || !async_write_queue_.empty(); + }); + + if(async_write_stop_ && async_write_queue_.empty()) { + break; + } + + task = std::move(async_write_queue_.front()); + async_write_queue_.pop(); + async_write_cv_.notify_all(); + } + + std::pair res; + try { + res = gkfs::rpc::forward_write(task.path, task.buf.data(), + task.offset, task.count, + task.num_replicas); + } catch(const std::exception& e) { + LOG(ERROR, "async_write_worker error: {}", e.what()); + res = std::make_pair(EBUSY, 0); + } + + task.promise->set_value(res); + } +} + +void +PreloadContext::enqueue_async_write(const std::string& path, const void* buf, + off64_t offset, size_t count, + int8_t num_copies) { + auto pr = std::make_shared>>(); + auto fut = pr->get_future(); + + WriteTask task; + task.path = path; + task.buf.resize(count); + std::memcpy(task.buf.data(), buf, count); + task.offset = offset; + task.count = count; + task.num_replicas = num_copies; + task.promise = pr; + + { + std::unique_lock lock(async_write_mutex_); + async_write_cv_.wait(lock, [this]() { + return async_write_stop_ || async_write_queue_.size() < 1024; + }); + + if(async_write_stop_) { + pr->set_value(std::make_pair(EINVAL, 0)); + return; + } + + async_write_queue_.push(std::move(task)); + async_write_futures_.push_back(std::move(fut)); + async_write_cv_.notify_one(); + } +} + +void +PreloadContext::wait_async_writes() { + std::vector>> futures; + { + std::lock_guard lock(async_write_mutex_); + futures = std::move(async_write_futures_); + async_write_futures_.clear(); + } + + int last_err = 0; + for(auto& fut : futures) { + if(fut.valid()) { + auto res = fut.get(); + if(res.first != 0) { + last_err = res.first; + } + } + } + + if(last_err != 0) { + LOG(ERROR, "{}() one or more async writes failed: {}", __func__, + last_err); + } +} + } // namespace preload } // namespace gkfs -- GitLab From c908c524377b9c05187040f205578e86659b5022 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 12 Jun 2026 15:04:23 +0200 Subject: [PATCH 2/2] changelog and readme --- CHANGELOG.md | 3 + README.md | 6 ++ tests/integration/data/test_async_write.py | 102 +++++++++++++++++++++ 3 files changed, 111 insertions(+) create mode 100644 tests/integration/data/test_async_write.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 1264486e9..021f95cc7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] ### New + - Client-side asynchronous write cache with async flushing ([!306](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/306)) + - Added client-side asynchronous write caching for data writes to improve IO500 performance. + - Introduced new environment variable: `LIBGKFS_ASYNC_WRITE`. - Metadata batching ([!305](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/305)) - Added client-side metadata batching for file/node creation to reduce metadata RPC bottlenecks. - Introduced new environment variables: `LIBGKFS_METADATA_BATCH` and `LIBGKFS_METADATA_BATCH_THRESHOLD`. diff --git a/README.md b/README.md index 93e0c29ae..816d78775 100644 --- a/README.md +++ b/README.md @@ -703,6 +703,7 @@ Client-metrics require the CMake argument `-DGKFS_ENABLE_CLIENT_METRICS=ON` (see - `LIBGKFS_READ_INLINE_PREFETCH` - Prefetch inline data when opening files (default: OFF). - `LIBGKFS_USE_DIRENTS_COMPRESSION` - Enable compression for directory entries (default: OFF). - `LIBGKFS_DIRENTS_BUFF_SIZE` - Buffer size for directory entries (default: 8MB). +- `LIBGKFS_ASYNC_WRITE` - Enable client-side asynchronous write cache (default: OFF). - `GKFS_FUSE_ENTRY_TIMEOUT` - Caching timeout for dentry entries in the FUSE client (default: 1.0). - `GKFS_FUSE_ATTR_TIMEOUT` - Caching timeout for file attributes in the FUSE client (default: 1.0). - `GKFS_FUSE_NEGATIVE_TIMEOUT` - Caching timeout for negative lookups in the FUSE client (default: 1.0). @@ -746,6 +747,11 @@ Remaining buffered creation requests are automatically flushed when the applicat - `LIBGKFS_METADATA_BATCH=ON` - Enable client-side metadata batching for file creation (default: OFF). - `LIBGKFS_METADATA_BATCH_THRESHOLD` - Set the number of file creation operations per host after which the batch is flushed (default: 64). +##### Asynchronous write cache +During write/pwrite operations, when the asynchronous write cache is enabled, the data writes are enqueued on the client side and return immediately. A background worker thread handles sending the writes asynchronously to the daemons. The pending writes are flushed and waited on during `close()`, `fsync()`, or client process teardown. + +- `LIBGKFS_ASYNC_WRITE=ON` - Enable client-side asynchronous write caching for data writes (default: OFF). + ### Daemon #### Core - `GKFS_DAEMON_CREATE_CHECK_PARENTS` - Enable checking parent directory for existence before creating children. diff --git a/tests/integration/data/test_async_write.py b/tests/integration/data/test_async_write.py new file mode 100644 index 000000000..1cd212fc6 --- /dev/null +++ b/tests/integration/data/test_async_write.py @@ -0,0 +1,102 @@ +################################################################################ +# Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +import harness +from pathlib import Path +import errno +import stat +import os +import sys +import pytest +from harness.logger import logger + +def test_async_write(gkfs_daemon, gkfs_client): + """Test data write/read using the client-side async write cache feature""" + topdir = gkfs_daemon.mountdir / "top" + file_a = topdir / "file_a" + file_b = topdir / "file_b" + + # Create top directory + ret = gkfs_client.mkdir( + topdir, + stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + assert ret.retval == 0 + + # Environment variables to enable async write + async_env = { + "LIBGKFS_ASYNC_WRITE": "ON", + "LIBGKFS_LOG": "all", + } + + # Open file_a with O_CREAT so it exists for write_validate + ret = gkfs_client.open(file_a, os.O_CREAT, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO, env=async_env) + assert ret.retval != -1 + + # Verify write and read validation for different sizes using async write + logger.info("Testing small size (1 byte)") + ret = gkfs_client.write_validate(file_a, 1, env=async_env) + assert ret.retval == 0 + + logger.info("Testing medium size (256 bytes)") + ret = gkfs_client.write_validate(file_a, 256, env=async_env) + assert ret.retval == 0 + + logger.info("Testing chunk-boundary size (512 KB)") + ret = gkfs_client.write_validate(file_a, 524288, env=async_env) + assert ret.retval == 0 + + logger.info("Testing large size (2 MB) exceeding multiple chunks") + ret = gkfs_client.write_validate(file_a, 2097153, env=async_env) + assert ret.retval == 0 + + # Let's perform a write, fsync, and read back check using normal client calls + logger.info("Testing multi-step file writes, fsync, and close") + + # 1. Create file_b + ret = gkfs_client.open(file_b, os.O_CREAT, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO, env=async_env) + assert ret.retval != -1 + + # First pwrite + buf1 = b"Hello, async GekkoFS! " + ret = gkfs_client.pwrite(file_b, buf1, len(buf1), 0, env=async_env) + assert ret.retval == len(buf1) + + # Second pwrite (at offset len(buf1)) + buf2 = b"This is a client-side cache test." + ret = gkfs_client.pwrite(file_b, buf2, len(buf2), len(buf1), env=async_env) + assert ret.retval == len(buf2) + + # Verify size in metadata is updated (it should update size dynamically) + ret = gkfs_client.stat(file_b, env=async_env) + assert ret.retval == 0 + assert ret.statbuf.st_size == len(buf1) + len(buf2) + + # Read back and verify the written content + ret = gkfs_client.pread(file_b, len(buf1) + len(buf2), 0, env=async_env) + assert ret.retval == len(buf1) + len(buf2) + assert ret.buf == buf1 + buf2 -- GitLab