Verified Commit 0235d3f0 authored by Marc Vef's avatar Marc Vef
Browse files

Periodic flushing of client metrics added

New env variable: `LIBGKFS_METRICS_FLUSH_INTERVAL` (defaults to 5 seconds)
parent 96c4a14b
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@ library_options:
	#should the prediction module try to predict aggregations? (requires predict_read_traces = true)
	predict_request_aggregation = false ;

	#should the prediction module create simplified traces with information (the metrics) it obtained from the real traces?
	#should the prediction module create simplified traces with information (the client_metrics) it obtained from the real traces?
	predict_write_simplified_traces = false;

	#the tolerance for arrival times difference when checking if two predicted requests are the same (in %)
+2 −1
Original line number Diff line number Diff line
@@ -52,7 +52,8 @@ static constexpr auto HOSTS_FILE = ADD_PREFIX("HOSTS_FILE");
static constexpr auto FORWARDING_MAP_FILE = ADD_PREFIX("FORWARDING_MAP_FILE");
#ifdef GKFS_ENABLE_CLIENT_METRICS
static constexpr auto ENABLE_METRICS = ADD_PREFIX("ENABLE_METRICS");
static constexpr auto METRICS_INTERVAL = ADD_PREFIX("METRICS_INTERVAL");
static constexpr auto METRICS_FLUSH_INTERVAL =
        ADD_PREFIX("METRICS_FLUSH_INTERVAL");
static constexpr auto METRICS_PATH = ADD_PREFIX("METRICS_PATH");
static constexpr auto METRICS_IP_PORT = ADD_PREFIX("METRICS_IP_PORT");
#endif
+26 −8
Original line number Diff line number Diff line
@@ -36,8 +36,11 @@
#include <csignal>
#include <thread>
#include <mutex>
#include <atomic>
#include <memory>

#include <zmq.hpp>
#include <condition_variable>

namespace gkfs::messagepack {

@@ -76,25 +79,31 @@ public:
    };

private:
    msgpack_data msgpack_data_{};
    bool metrics_enabled_{false};

    zmq::context_t zmq_context_;
    zmq::socket_t zmq_socket_;
    msgpack_data msgpack_data_{};

    //    std::mutex mtx_{};
    //    std::thread thread_{};
    std::mutex data_mtx_{};
    std::thread flush_thread_{};
    std::condition_variable flush_thread_cv_{};
    std::mutex flush_thread_cv_mutex_{};
    std::atomic<bool> flush_thread_running_{false};

    client_metric_flush_type flush_type_{client_metric_flush_type::file};
    int flush_interval_{};
    std::unique_ptr<zmq::context_t> zmq_flush_context_ = nullptr;
    std::unique_ptr<zmq::socket_t> zmq_flush_socket_ = nullptr;
    std::string flush_path_{};
    int flush_count_{0};

    bool is_enabled_{false};
    std::string path_{};

public:
    ClientMetrics() = default;

    explicit ClientMetrics(client_metric_io_type io_type,
                           client_metric_flush_type flush_type =
                                   client_metric_flush_type::file);
                                   client_metric_flush_type::file,
                           int flush_interval = 5);

    ~ClientMetrics();

@@ -102,9 +111,15 @@ public:
    add_event(size_t size,
              std::chrono::time_point<std::chrono::system_clock> start);

    void
    reset_metrics();

    void
    flush_msgpack();

    void
    flush_loop();

    void
    enable();

@@ -122,6 +137,9 @@ public:

    void
    path(const std::string& path, const std::string prefix = "");

    int
    flush_count() const;
};

} // namespace gkfs::messagepack
+4 −3
Original line number Diff line number Diff line
@@ -42,12 +42,13 @@ constexpr auto hostfile_path = "./gkfs_hosts.txt";
// We do not default this, ENV variable always required.
constexpr auto forwarding_file_path = "";

namespace metrics {
namespace client_metrics {
// Default directory where client metrics are stored. Can be set via
// LIBGKFS_METRICS_PATH. Filename consists of starting time, pid, and hostname
// Note: when LIBGKFS_METRICS_IP is given, ZeroMQ is used instead
constexpr auto client_metrics_path = "/tmp/gkfs_client_metrics";
} // namespace metrics
constexpr auto flush_path = "/tmp/gkfs_client_metrics";
constexpr auto flush_interval = 5; // in seconds
} // namespace client_metrics

namespace io {
/*
+3 −2
Original line number Diff line number Diff line
@@ -318,10 +318,11 @@ destroy_preload() {
        destroy_forwarding_mapper();
    }
#ifdef GKFS_ENABLE_CLIENT_METRICS
    LOG(INFO, "Flushing metrics...");
    LOG(INFO, "Flushing final metrics...");
    CTX->write_metrics().flush_msgpack();
    CTX->read_metrics().flush_msgpack();
    LOG(INFO, "Metrics flushed.");
    LOG(INFO, "Metrics flushed. Total flush operations: {}",
        CTX->write_metrics().flush_count());
#endif
    CTX->clear_hosts();
    LOG(DEBUG, "Peer information deleted");
Loading