Verified Commit b223f6eb authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

chunk_stat RPC now uses Hermes instead of Margo

parent 378887d8
Loading
Loading
Loading
Loading
+120 −0
Original line number Diff line number Diff line
@@ -1917,6 +1917,126 @@ struct get_dirents {
    };
};

//==============================================================================
// definitions for chunk_stat
struct chunk_stat {

    // forward declarations of public input/output types for this RPC
    class input;
    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = chunk_stat;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = rpc_chunk_stat_in_t;
    using mercury_output_type = rpc_chunk_stat_out_t;

    // RPC public identifier
    // (N.B: we reuse the same IDs assigned by Margo so that the daemon 
    // understands Hermes RPCs)
    constexpr static const uint64_t public_id = 532742144;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = public_id;

    // RPC name
    constexpr static const auto name = hg_tag::chunk_stat;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb = 
        HG_GEN_PROC_NAME(rpc_chunk_stat_in_t);

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb = 
        HG_GEN_PROC_NAME(rpc_chunk_stat_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input(int32_t dummy) :
            m_dummy(dummy) { }

        input(input&& rhs) = default;
        input(const input& other) = default;
        input& operator=(input&& rhs) = default;
        input& operator=(const input& other) = default;

        int32_t
        dummy() const {
            return m_dummy;
        }

        explicit
        input(const rpc_chunk_stat_in_t& other) :
            m_dummy(other.dummy) { }

        explicit
        operator rpc_chunk_stat_in_t() {
            return { m_dummy };
        }

    private:
        int32_t m_dummy;
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() :
            m_chunk_size(),
            m_chunk_total(),
            m_chunk_free() {}

        output(uint64_t chunk_size, uint64_t chunk_total, uint64_t chunk_free) :
            m_chunk_size(chunk_size),
            m_chunk_total(chunk_total),
            m_chunk_free(chunk_free) {}

        output(output&& rhs) = default;
        output(const output& other) = default;
        output& operator=(output&& rhs) = default;
        output& operator=(const output& other) = default;

        explicit 
        output(const rpc_chunk_stat_out_t& out) {
            m_chunk_size = out.chunk_size;
            m_chunk_total = out.chunk_total;
            m_chunk_free = out.chunk_free;
        }

        uint64_t
        chunk_size() const {
            return m_chunk_size;
        }

        uint64_t
        chunk_total() const {
            return m_chunk_total;
        }

        uint64_t
        chunk_free() const {
            return m_chunk_free;
        }

    private:
        uint64_t m_chunk_size;
        uint64_t m_chunk_total;
        uint64_t m_chunk_free;
    };
};

} // namespace rpc
} // namespace gkfs

+1 −0
Original line number Diff line number Diff line
@@ -207,6 +207,7 @@ bool init_hermes_client(const std::string& transport_prefix) {
    rpc_read_data_id = gkfs::rpc::read_data::public_id;
    rpc_trunc_data_id = gkfs::rpc::trunc_data::public_id;
    rpc_get_dirents_id = gkfs::rpc::get_dirents::public_id;
    rpc_chunk_stat_id = gkfs::rpc::chunk_stat::public_id;

    return true;
}
+1 −0
Original line number Diff line number Diff line
@@ -38,6 +38,7 @@ register_user_request_types() {
    (void) registered_requests().add<gkfs::rpc::read_data>();
    (void) registered_requests().add<gkfs::rpc::trunc_data>();
    (void) registered_requests().add<gkfs::rpc::get_dirents>();
    (void) registered_requests().add<gkfs::rpc::chunk_stat>();

}

+38 −37
Original line number Diff line number Diff line
@@ -435,56 +435,57 @@ int trunc_data(const std::string& path, size_t current_size, size_t new_size) {
}

ChunkStat chunk_stat() {

    CTX->log()->trace("{}()", __func__);
    rpc_chunk_stat_in_t in;

    auto const host_size =  CTX->hosts().size();
    std::vector<hg_handle_t> rpc_handles(host_size);
    std::vector<margo_request> rpc_waiters(host_size);

    hg_return_t hg_ret;

    for (unsigned int target_host = 0; target_host < host_size; ++target_host) {
        //Setup rpc input parameters for each host
        hg_ret = margo_create_wrap_helper(rpc_chunk_stat_id, target_host,
                                          rpc_handles[target_host]);
        if (hg_ret != HG_SUCCESS) {
            throw std::runtime_error("Failed to create margo handle");
        }
        // Send RPC
        CTX->log()->trace("{}() Sending RPC to host: {}", __func__, target_host);
        hg_ret = margo_iforward(rpc_handles[target_host],
                                &in,
                                &rpc_waiters[target_host]);
        if (hg_ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to send non-blocking chunk_stat to recipient {}", __func__, target_host);
            for (unsigned int i = 0; i <= target_host; i++) {
                margo_destroy(rpc_handles[i]);
            }

    std::vector<hermes::rpc_handle<gkfs::rpc::chunk_stat>> handles;

    for (const auto& endp : CTX->hosts2()) {
        try {
            CTX->log()->trace("{}() Sending RPC to host: {}", 
                              __func__, endp.to_string());

            gkfs::rpc::chunk_stat::input in(0);

            // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
            // we can retry for RPC_TRIES (see old commits with margo)
            // TODO(amiranda): hermes will eventually provide a post(endpoint) 
            // returning one result and a broadcast(endpoint_set) returning a 
            // result_set. When that happens we can remove the .at(0) :/
            handles.emplace_back(
                ld_network_service->post<gkfs::rpc::chunk_stat>(endp, in));

        } catch (const std::exception& ex) {
            // TODO(amiranda): we should cancel all previously posted requests 
            // here, unfortunately, Hermes does not support it yet :/
            CTX->log()->error("{}() Failed to send request to host: {}", 
                              __func__, endp.to_string());
            throw std::runtime_error("Failed to forward non-blocking rpc request");
        }
    }

    unsigned long chunk_size = CHUNKSIZE;
    unsigned long chunk_total = 0;
    unsigned long chunk_free = 0;

    for (unsigned int target_host = 0; target_host < host_size; ++target_host) {
        hg_ret = margo_wait(rpc_waiters[target_host]);
        if (hg_ret != HG_SUCCESS) {
            throw std::runtime_error(fmt::format("Failed while waiting for rpc completion. target host: {}", target_host));
        }
        rpc_chunk_stat_out_t out{};
        hg_ret = margo_get_output(rpc_handles[target_host], &out);
        if (hg_ret != HG_SUCCESS) {
            throw std::runtime_error(fmt::format("Failed to get rpc output for target host: {}", target_host));
        }
    // wait for RPC responses
    for(std::size_t i = 0; i < handles.size(); ++i) {

        assert(out.chunk_size == chunk_size);
        chunk_total += out.chunk_total;
        chunk_free  += out.chunk_free;
        gkfs::rpc::chunk_stat::output out;

        margo_free_output(rpc_handles[target_host], &out);
        margo_destroy(rpc_handles[target_host]);
        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            out = handles[i].get().at(0);

            assert(out.chunk_size() == chunk_size);
            chunk_total += out.chunk_total();
            chunk_free  += out.chunk_free();

        } catch(const std::exception& ex) {
            throw std::runtime_error(
                fmt::format("Failed to get rpc output for target host: {}]", i));
        }
    }

    return {chunk_size, chunk_total, chunk_free};