Commit adb93af4 authored by Ramon Nou's avatar Ramon Nou
Browse files

async writes

parent 01a86b08
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -7,6 +7,9 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### New
  - Client-side asynchronous write cache with async flushing ([!306](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/306))
    - Added client-side asynchronous write caching for data writes to improve IO500 performance.
    - Introduced new environment variable: `LIBGKFS_ASYNC_WRITE`.
  - Metadata batching ([!305](https://storage.bsc.es/gitlab/hpc/gekkofs/-/merge_requests/305))
    - Added client-side metadata batching for file/node creation to reduce metadata RPC bottlenecks.
    - Introduced new environment variables: `LIBGKFS_METADATA_BATCH` and `LIBGKFS_METADATA_BATCH_THRESHOLD`.
+6 −0
Original line number Diff line number Diff line
@@ -703,6 +703,7 @@ Client-metrics require the CMake argument `-DGKFS_ENABLE_CLIENT_METRICS=ON` (see
- `LIBGKFS_READ_INLINE_PREFETCH` - Prefetch inline data when opening files (default: OFF).
- `LIBGKFS_USE_DIRENTS_COMPRESSION` - Enable compression for directory entries (default: OFF).
- `LIBGKFS_DIRENTS_BUFF_SIZE` - Buffer size for directory entries (default: 8MB).
- `LIBGKFS_ASYNC_WRITE` - Enable client-side asynchronous write cache (default: OFF).
- `GKFS_FUSE_ENTRY_TIMEOUT` - Caching timeout for dentry entries in the FUSE client (default: 1.0).
- `GKFS_FUSE_ATTR_TIMEOUT` - Caching timeout for file attributes in the FUSE client (default: 1.0).
- `GKFS_FUSE_NEGATIVE_TIMEOUT` - Caching timeout for negative lookups in the FUSE client (default: 1.0).
@@ -746,6 +747,11 @@ Remaining buffered creation requests are automatically flushed when the applicat
- `LIBGKFS_METADATA_BATCH=ON` - Enable client-side metadata batching for file creation (default: OFF).
- `LIBGKFS_METADATA_BATCH_THRESHOLD` - Set the number of file creation operations per host after which the batch is flushed (default: 64).

##### Asynchronous write cache
During write/pwrite operations, when the asynchronous write cache is enabled, the data writes are enqueued on the client side and return immediately. A background worker thread handles sending the writes asynchronously to the daemons. The pending writes are flushed and waited on during `close()`, `fsync()`, or client process teardown.

- `LIBGKFS_ASYNC_WRITE=ON` - Enable client-side asynchronous write caching for data writes (default: OFF).

### Daemon
#### Core
- `GKFS_DAEMON_CREATE_CHECK_PARENTS` - Enable checking parent directory for existence before creating children.
+1 −0
Original line number Diff line number Diff line
@@ -98,6 +98,7 @@ static constexpr auto ENABLE_FORK = ADD_PREFIX("ENABLE_FORK");
static constexpr auto METADATA_BATCH = ADD_PREFIX("METADATA_BATCH");
static constexpr auto METADATA_BATCH_THRESHOLD =
        ADD_PREFIX("METADATA_BATCH_THRESHOLD");
static constexpr auto ASYNC_WRITE = ADD_PREFIX("ASYNC_WRITE");

} // namespace gkfs::env

+45 −0
Original line number Diff line number Diff line
@@ -42,6 +42,10 @@

#include <map>
#include <unordered_map>
#include <queue>
#include <future>
#include <thread>
#include <condition_variable>
#include <thallium.hpp>
#include <memory>
#include <vector>
@@ -52,6 +56,17 @@

#include <bitset>

namespace gkfs::preload {
struct WriteTask {
    std::string path;
    std::vector<char> buf;
    off64_t offset;
    size_t count;
    int8_t num_replicas;
    std::shared_ptr<std::promise<std::pair<int, long>>> promise;
};
} // namespace gkfs::preload

/* Forward declarations */
namespace gkfs {
namespace filemap {
@@ -163,6 +178,14 @@ private:
            metadata_batch_buffer_;
    mutable std::mutex metadata_batch_mutex_;

    bool use_async_write_{false};
    std::queue<WriteTask> async_write_queue_;
    std::vector<std::future<std::pair<int, long>>> async_write_futures_;
    std::mutex async_write_mutex_;
    std::condition_variable async_write_cv_;
    std::thread async_write_thread_;
    bool async_write_stop_{false};


public:
    static PreloadContext*
@@ -404,6 +427,28 @@ public:
    void
    add_metadata_batch_entry(uint64_t host_id, const std::string& path,
                             mode_t mode);

    bool
    use_async_write() const;

    void
    use_async_write(bool use_async_write);

    void
    start_async_write_thread();

    void
    stop_async_write_thread();

    void
    async_write_worker();

    void
    enqueue_async_write(const std::string& path, const void* buf,
                        off64_t offset, size_t count, int8_t num_copies);

    void
    wait_async_writes();
};

} // namespace preload
+19 −0
Original line number Diff line number Diff line
@@ -277,6 +277,17 @@ gkfs_do_write(gkfs::filemap::OpenFile& file, const char* buf, size_t count,
        }
    }

    if(CTX->use_async_write()) {
        CTX->enqueue_async_write(*path, buf, offset, count, 0);
        if(num_replicas > 0) {
            CTX->enqueue_async_write(*path, buf, offset, count, num_replicas);
        }
        if(update_pos) {
            file.pos(offset + count);
        }
        return count;
    }

    pair<int, long> ret_write;
    if(gkfs::config::proxy::fwd_io && CTX->use_proxy() &&
       count > gkfs::config::proxy::fwd_io_count_threshold) {
@@ -592,6 +603,9 @@ gkfs_do_read(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
ssize_t
gkfs_read_ws(const gkfs::filemap::OpenFile& file, char* buf, size_t count,
             off64_t offset) {
    if(CTX->use_async_write()) {
        CTX->wait_async_writes();
    }
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    auto read = gkfs_do_read(file, buf, count, offset);
@@ -724,6 +738,11 @@ gkfs_fsync(unsigned int fd) {
        errno = EBADF;
        return -1;
    }

    if(CTX->use_async_write()) {
        CTX->wait_async_writes();
    }

    // flush write size cache to be server consistent
    if(CTX->use_write_size_cache()) {
        auto err = CTX->write_size_cache()->flush(file->path(), true).first;
Loading