Program Listing for File msgpack_util.hpp

Return to documentation for file (include/common/msgpack_util.hpp)

/*
  Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain
  Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany

  This software was partially supported by the
  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).

  This software was partially supported by the
  ADA-FS project under the SPPEXA project funded by the DFG.

  This software was partially supported by the
  the European Union’s Horizon 2020 JTI-EuroHPC research and
  innovation programme, by the project ADMIRE (Project ID: 956748,
  admire-eurohpc.eu)

  This project was partially promoted by the Ministry for Digital Transformation
  and the Civil Service, within the framework of the Recovery,
  Transformation and Resilience Plan - Funded by the European Union
  -NextGenerationEU.

  This file is part of GekkoFS.

  GekkoFS is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  GekkoFS is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with GekkoFS.  If not, see <https://www.gnu.org/licenses/>.

  SPDX-License-Identifier: GPL-3.0-or-later
*/

#ifndef GKFS_COMMON_MSGPACK_HPP
#define GKFS_COMMON_MSGPACK_HPP

#include <msgpack/msgpack.hpp>
#include <string>
#include <vector>
#include <chrono>
#include <csignal>
#include <thread>
#include <mutex>
#include <atomic>
#include <memory>

#include <zmq.hpp>
#include <condition_variable>

namespace gkfs::messagepack {

enum class client_metric_io_type { write, read };
enum class client_metric_flush_type { file, socket };

class ClientMetrics {

public:
    /*
     * MessagePack data structure for client metrics. Includes only what is
     * actually sent
     */
    struct msgpack_data {
        uint32_t flush_t_;
        std::string hostname_;
        int pid_;
        std::string io_type_;
        std::vector<uint32_t> start_t_{};
        std::vector<uint32_t> end_t_{};
        std::vector<uint32_t> req_size_{};
        uint32_t total_bytes_{};
        int total_iops_{0};

        template <class T>
        void
        pack(T& pack) {
            pack(flush_t_, hostname_, pid_, io_type_, start_t_, end_t_,
                 req_size_, total_iops_, total_bytes_);
        }

        std::vector<uint8_t>
        pack_msgpack() {
            return msgpack::pack(*this);
        }
    };

private:
    bool metrics_enabled_{false};

    msgpack_data msgpack_data_{};

    // Initialization time used to compute relative timestamps
    std::chrono::time_point<std::chrono::system_clock> init_t_;

    std::mutex data_mtx_{};
    std::thread flush_thread_{};
    std::condition_variable flush_thread_cv_{};
    std::mutex flush_thread_cv_mutex_{};
    std::atomic<bool> flush_thread_running_{false};

    client_metric_flush_type flush_type_{client_metric_flush_type::file};
    int flush_interval_{};
    std::unique_ptr<zmq::context_t> zmq_flush_context_ = nullptr;
    std::unique_ptr<zmq::socket_t> zmq_flush_socket_ = nullptr;
    std::string flush_path_{};
    int flush_count_{0};


public:
    ClientMetrics() = default;

    explicit ClientMetrics(client_metric_io_type io_type,
                           client_metric_flush_type flush_type =
                                   client_metric_flush_type::file,
                           int flush_interval = 5);

    ~ClientMetrics();

    void
    add_event(size_t size,
              std::chrono::time_point<std::chrono::system_clock> start);

    void
    reset_metrics();

    void
    flush_msgpack();

    void
    flush_loop();

    void
    enable();

    void
    disable();

    void
    zmq_connect(const std::string& ip_port);

    bool
    zmq_is_connected();

    [[nodiscard]] const std::string&
    path() const;

    void
    path(const std::string& path, const std::string prefix = "");

    int
    flush_count() const;
};

} // namespace gkfs::messagepack

#endif // GKFS_COMMON_MSGPACK_HPP