Verified Commit ced527c4 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

trunc_data RPC now uses Hermes instead of Margo

parent 094fb3b8
Loading
Loading
Loading
Loading
+114 −0
Original line number Diff line number Diff line
@@ -1680,6 +1680,120 @@ struct read_data {
    };
};

//==============================================================================
// definitions for trunc_data
struct trunc_data {

    // forward declarations of public input/output types for this RPC
    class input;
    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = trunc_data;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = rpc_trunc_in_t;
    using mercury_output_type = rpc_err_out_t;

    // RPC public identifier
    // (N.B: we reuse the same IDs assigned by Margo so that the daemon 
    // understands Hermes RPCs)
    constexpr static const uint64_t public_id = 1850933248;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = public_id;

    // RPC name
    constexpr static const auto name = hg_tag::trunc_data;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb = 
        HG_GEN_PROC_NAME(rpc_trunc_in_t);

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb = 
        HG_GEN_PROC_NAME(rpc_err_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input(const std::string& path, 
              uint64_t length) :
            m_path(path),
            m_length(length) { }

        input(input&& rhs) = default;
        input(const input& other) = default;
        input& operator=(input&& rhs) = default;
        input& operator=(const input& other) = default;

        std::string
        path() const {
            return m_path;
        }

        uint64_t
        length() const {
            return m_length;
        }

        explicit
        input(const rpc_trunc_in_t& other) :
            m_path(other.path),
            m_length(other.length) { }

        explicit
        operator rpc_trunc_in_t() {
            return {
                m_path.c_str(), 
                m_length,
            };
        }

    private:
        std::string m_path;
        uint64_t m_length;
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() :
            m_err() {}

        output(int32_t err) :
            m_err(err) {}

        output(output&& rhs) = default;
        output(const output& other) = default;
        output& operator=(output&& rhs) = default;
        output& operator=(const output& other) = default;

        explicit 
        output(const rpc_err_out_t& out) {
            m_err = out.err;
        }

        int32_t
        err() const {
            return m_err;
        }

    private:
        int32_t m_err;
    };
};

} // namespace rpc
} // namespace gkfs

+1 −0
Original line number Diff line number Diff line
@@ -205,6 +205,7 @@ bool init_hermes_client(const std::string& transport_prefix) {

    rpc_write_data_id = gkfs::rpc::write_data::public_id;
    rpc_read_data_id = gkfs::rpc::read_data::public_id;
    rpc_trunc_data_id = gkfs::rpc::trunc_data::public_id;

    return true;
}
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ register_user_request_types() {

    (void) registered_requests().add<gkfs::rpc::write_data>();
    (void) registered_requests().add<gkfs::rpc::read_data>();
    (void) registered_requests().add<gkfs::rpc::trunc_data>();

}

+45 −56
Original line number Diff line number Diff line
@@ -365,84 +365,73 @@ ssize_t read(const string& path, void* buf, const off64_t offset, const size_t r
}

int trunc_data(const std::string& path, size_t current_size, size_t new_size) {
    assert(current_size > new_size);
    hg_return_t ret;
    rpc_trunc_in_t in;
    in.path = path.c_str();
    in.length = new_size;

    assert(current_size > new_size);
    bool error = false;

    // Find out which data server needs to delete chunks in order to contact only them
    // Find out which data servers need to delete data chunks in order to 
    // contact only them
    const unsigned int chunk_start = chnk_id_for_offset(new_size, CHUNKSIZE);
    const unsigned int chunk_end = chnk_id_for_offset(current_size - new_size - 1, CHUNKSIZE);
    const unsigned int chunk_end = 
        chnk_id_for_offset(current_size - new_size - 1, CHUNKSIZE);

    std::unordered_set<unsigned int> hosts;
    for(unsigned int chunk_id = chunk_start; chunk_id <= chunk_end; ++chunk_id) {
        hosts.insert(CTX->distributor()->locate_data(path, chunk_id));
    }

    std::vector<hg_handle_t> rpc_handles(hosts.size());
    std::vector<margo_request> rpc_waiters(hosts.size());
    unsigned int req_num = 0;
    std::vector<hermes::rpc_handle<gkfs::rpc::trunc_data>> handles;

    for (const auto& host: hosts) {
        ret = margo_create_wrap_helper(rpc_trunc_data_id, host, rpc_handles[req_num]);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to create Mercury handle for host: ", __func__, host);
            break;
        }

        // send async rpc
        ret = margo_iforward(rpc_handles[req_num], &in, &rpc_waiters[req_num]);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Failed to send request to host: {}", __func__, host);
            break;
        }
        ++req_num;
    }
        auto endp = CTX->hosts2().at(host);

    if(req_num < hosts.size()) {
        // An error occurred. Cleanup and return
        CTX->log()->error("{}() Error -> sent only some requests {}/{}. Cancelling request...", __func__, req_num, hosts.size());
        for(unsigned int i = 0; i < req_num; ++i) {
            margo_destroy(rpc_handles[i]);
        }
        try {
            CTX->log()->debug("{}() Sending RPC ...", __func__);

            gkfs::rpc::trunc_data::input in(path, new_size);

            // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
            // we can retry for RPC_TRIES (see old commits with margo)
            // TODO(amiranda): hermes will eventually provide a post(endpoint) 
            // returning one result and a broadcast(endpoint_set) returning a 
            // result_set. When that happens we can remove the .at(0) :/
            handles.emplace_back(
                ld_network_service->post<gkfs::rpc::trunc_data>(endp, in));

        } catch (const std::exception& ex) {
            // TODO(amiranda): we should cancel all previously posted requests 
            // here, unfortunately, Hermes does not support it yet :/
            CTX->log()->error("{}() Failed to send request to host: {}", 
                              __func__, host);
            errno = EIO;
            return -1;
        }

    assert(req_num == hosts.size());
    }

    // Wait for RPC responses and then get response
    rpc_err_out_t out{};
    for (unsigned int i = 0; i < hosts.size(); ++i) {
        ret = margo_wait(rpc_waiters[i]);
        if (ret == HG_SUCCESS) {
            ret = margo_get_output(rpc_handles[i], &out);
            if (ret == HG_SUCCESS) {
                if(out.err){
                    CTX->log()->error("{}() received error response: {}", __func__, out.err);
    for(const auto& h : handles) {

        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            auto out = h.get().at(0);

            if(out.err() != 0) {
                CTX->log()->error("{}() received error response: {}", 
                        __func__, out.err());
                error = true;
                errno = EIO;
            }
            } else {
                // Get output failed
        } catch(const std::exception& ex) {
            CTX->log()->error("{}() while getting rpc output", __func__);
            error = true;
            errno = EIO;
        }
        } else {
            // Wait failed
            CTX->log()->error("{}() Failed while waiting for response", __func__);
            error = true;
    }

        /* clean up resources consumed by this rpc */
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }

    if(error) {
        errno = EIO;
        return -1;
    }
    return 0;
    return error ? -1 : 0;
}

ChunkStat chunk_stat() {