Commit 669e3cc0 authored by Ahmad Tarraf's avatar Ahmad Tarraf
Browse files

feat(fuse): add client-side metrics via MessagePack/ZeroMQ

  - Add GKFS_ENABLE_CLIENT_METRICS to fuse_client and gkfs_user_lib targets
  - Instrument read/write handlers with add_event() (skip on error to avoid heap corruption)
  - Call init_metrics() on startup and flush on shutdown
  - Same env vars as intercept client: LIBGKFS_ENABLE_METRICS, LIBGKFS_METRICS_IP_PORT, etc.
parent f4006c8d
Loading
Loading
Loading
Loading
Loading
+9 −2
Original line number Diff line number Diff line
@@ -133,11 +133,19 @@ target_include_directories(fuse_client
    PRIVATE
    ${FUSE3_INCLUDE_DIRS}
    )
if (GKFS_ENABLE_CLIENT_METRICS)
    target_link_libraries(fuse_client PRIVATE msgpack_util)
    target_compile_definitions(fuse_client PUBLIC GKFS_ENABLE_CLIENT_METRICS)
endif ()
endif()

if (GKFS_BUILD_USER_LIB)
target_compile_definitions(gkfs_user_lib PUBLIC BYPASS_SYSCALL ENABLE_USER)
target_link_options(gkfs_user_lib PRIVATE -z noexecstack)
if (GKFS_ENABLE_CLIENT_METRICS)
    target_link_libraries(gkfs_user_lib PRIVATE msgpack_util)
    target_compile_definitions(gkfs_user_lib PUBLIC GKFS_ENABLE_CLIENT_METRICS)
endif ()
endif()

if (GKFS_BUILD_LIBC_INTERCEPTION)
@@ -161,7 +169,7 @@ target_link_libraries(
    Syscall_intercept::Syscall_intercept
    Microsoft.GSL::GSL
)
# Enable MSGPack metrics for intercept only
# Enable MSGPack metrics 
if (GKFS_ENABLE_CLIENT_METRICS)
    target_link_libraries(
        gkfs_intercept
@@ -193,7 +201,6 @@ target_link_libraries(
    Threads::Threads
    Microsoft.GSL::GSL
)
# Enable MSGPack metrics for intercept only
if (GKFS_ENABLE_CLIENT_METRICS)
    target_link_libraries(
        gkfs_libc_intercept
+33 −0
Original line number Diff line number Diff line
@@ -38,6 +38,9 @@
*/

#include <client/fuse/fuse_client.hpp>
#ifdef GKFS_ENABLE_CLIENT_METRICS
#include <common/msgpack_util.hpp>
#endif

static struct fuse_lowlevel_ops ll_ops;
struct InodeShard {
@@ -467,7 +470,14 @@ read_handler(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,

    // Allocate buffer without zero-initialization for performance
    auto buf = std::make_unique<char[]>(size);
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.get(), size, off);
    if(rc > 0)
        CTX->read_metrics()->add_event(rc, start_t);
#else
    int rc = gkfs::syscall::gkfs_pread(fi->fh, buf.get(), size, off);
#endif
    if(rc < 0) {
        LOG(DEBUG, "read fail");
        fuse_reply_err(req, errno);
@@ -487,7 +497,14 @@ write_handler(fuse_req_t req, fuse_ino_t ino, const char* buf, size_t size,
            return;
        }
    }
#ifdef GKFS_ENABLE_CLIENT_METRICS
    auto start_t = std::chrono::high_resolution_clock::now();
    int rc = gkfs::syscall::gkfs_pwrite(fi->fh, buf, size, off);
    if(rc > 0)
        CTX->write_metrics()->add_event(rc, start_t);
#else
    int rc = gkfs::syscall::gkfs_pwrite(fi->fh, buf, size, off);
#endif
    if(rc < 0) {
        LOG(DEBUG, "write fail");
        fuse_reply_err(req, errno);
@@ -1405,6 +1422,12 @@ main(int argc, char* argv[]) {
    ud.mountpoint = strdup(opts.mountpoint);
    init_gekkofs();

#ifdef GKFS_ENABLE_CLIENT_METRICS
    if(!CTX->init_metrics()) {
        fprintf(stderr, "FUSE client failed to initialize client metrics. Exit.\n");
        return 1;
    }
#endif

    se = fuse_session_new(&args, &ll_ops, sizeof(ll_ops), &ud);
    if(se == nullptr) {
@@ -1451,5 +1474,15 @@ main(int argc, char* argv[]) {
    }

    fuse_session_unmount(se);

#ifdef GKFS_ENABLE_CLIENT_METRICS
    LOG(INFO, "Flushing final metrics...");
    CTX->write_metrics()->flush_msgpack();
    CTX->read_metrics()->flush_msgpack();
    LOG(INFO, "Metrics flushed. Total flush operations: {}",
        CTX->write_metrics()->flush_count());
    CTX->destroy_metrics();
#endif

    return ret < 0 ? 1 : 0;
}