Commit 528f2813 authored by Marc Vef's avatar Marc Vef
Browse files

ibverbs and MPI broken interaction with ZMQ: Fix

parent ef865b0f
Loading
Loading
Loading
Loading
Loading
+3 −2
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@
#include <thread>
#include <mutex>
#include <atomic>
#include <memory>

#include <zmq.hpp>
#include <condition_variable>
@@ -90,8 +91,8 @@ private:

    client_metric_flush_type flush_type_{client_metric_flush_type::file};
    int flush_interval_{};
    zmq::context_t zmq_flush_context_;
    zmq::socket_t zmq_flush_socket_;
    std::unique_ptr<zmq::context_t> zmq_flush_context_ = nullptr;
    std::unique_ptr<zmq::socket_t> zmq_flush_socket_ = nullptr;
    std::string flush_path_{};
    int flush_count_{0};

+8 −7
Original line number Diff line number Diff line
@@ -65,8 +65,9 @@ ClientMetrics::ClientMetrics(client_metric_io_type io_type,
        flush_path_ = gkfs::config::client_metrics::flush_path;
    } else {
        flush_type_ = client_metric_flush_type::socket;
        zmq_flush_context_ = zmq::context_t(1);
        zmq_flush_socket_ = zmq::socket_t(zmq_flush_context_, ZMQ_PUSH);
        zmq_flush_context_ = std::make_unique<zmq::context_t>(1);
        zmq_flush_socket_ =
                std::make_unique<zmq::socket_t>(*zmq_flush_context_, ZMQ_PUSH);
    }
    flush_thread_running_ = true;
    flush_thread_ = std::thread(&ClientMetrics::flush_loop, this);
@@ -78,8 +79,8 @@ ClientMetrics::~ClientMetrics() {
    if(flush_thread_.joinable())
        flush_thread_.join();
    if(flush_type_ == client_metric_flush_type::socket) {
        zmq_flush_socket_.close();
        zmq_flush_context_.close();
        zmq_flush_socket_->close();
        zmq_flush_context_->close();
    }
}

@@ -140,7 +141,7 @@ ClientMetrics::flush_msgpack() {
        // copy data from serialized msgpack to zmq message
        memcpy(message.data(), data.data(), data.size());
        // non-blocking zmq send
        if(zmq_flush_socket_.send(message, zmq::send_flags::none) == -1) {
        if(zmq_flush_socket_->send(message, zmq::send_flags::none) == -1) {
            std::cerr << "Failed to send zmq message" << std::endl;
        }
    }
@@ -173,12 +174,12 @@ ClientMetrics::disable() {
void
ClientMetrics::zmq_connect(const string& ip_port) {
    auto address = "tcp://" + ip_port;
    zmq_flush_socket_.connect(address);
    zmq_flush_socket_->connect(address);
}

bool
ClientMetrics::zmq_is_connected() {
    return zmq_flush_socket_.handle() != nullptr;
    return zmq_flush_socket_->handle() != nullptr;
}

const string&