Verified Commit 5dfc9820 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

write_data RPC now uses Hermes instead of Margo

parent f0e87d93
Loading
Loading
Loading
Loading
+191 −0
Original line number Diff line number Diff line
@@ -1276,6 +1276,197 @@ struct mk_symlink {

#endif // HAS_SYMLINKS

//==============================================================================
// definitions for write_data
struct write_data {

    // forward declarations of public input/output types for this RPC
    class input;
    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = write_data;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = rpc_write_data_in_t;
    using mercury_output_type = rpc_data_out_t;

    // RPC public identifier
    constexpr static const uint64_t public_id = 3716481024;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = public_id;

    // RPC name
    constexpr static const auto name = hg_tag::write_data;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb = 
        HG_GEN_PROC_NAME(rpc_write_data_in_t);

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb = 
        HG_GEN_PROC_NAME(rpc_data_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input(const std::string& path, 
              int64_t offset,
              uint64_t host_id,
              uint64_t host_size,
              uint64_t chunk_n,
              uint64_t chunk_start,
              uint64_t chunk_end,
              uint64_t total_chunk_size,
              const hermes::exposed_memory& buffers) :
            m_path(path),
            m_offset(offset),
            m_host_id(host_id),
            m_host_size(host_size),
            m_chunk_n(chunk_n),
            m_chunk_start(chunk_start),
            m_chunk_end(chunk_end),
            m_total_chunk_size(total_chunk_size),
            m_buffers(buffers) { }

        input(input&& rhs) = default;
        input(const input& other) = default;
        input& operator=(input&& rhs) = default;
        input& operator=(const input& other) = default;

        std::string
        path() const {
            return m_path;
        }

        int64_t
        offset() const {
            return m_offset;
        }

        uint64_t
        host_id() const {
            return m_host_id;
        }

        uint64_t
        host_size() const {
            return m_host_size;
        }

        uint64_t
        chunk_n() const {
            return m_chunk_n;
        }

        uint64_t
        chunk_start() const {
            return m_chunk_start;
        }

        uint64_t
        chunk_end() const {
            return m_chunk_end;
        }

        uint64_t
        total_chunk_size() const {
            return m_total_chunk_size;
        }

        hermes::exposed_memory
        buffers() const {
            return m_buffers;
        }

        explicit
        input(const rpc_write_data_in_t& other) :
            m_path(other.path),
            m_offset(other.offset),
            m_host_id(other.host_id),
            m_host_size(other.host_size),
            m_chunk_n(other.chunk_n),
            m_chunk_start(other.chunk_start),
            m_chunk_end(other.chunk_end),
            m_total_chunk_size(other.total_chunk_size),
            m_buffers(other.bulk_handle) { }

        explicit
        operator rpc_write_data_in_t() {
            return {
                m_path.c_str(), 
                m_offset,
                m_host_id,
                m_host_size,
                m_chunk_n,
                m_chunk_start,
                m_chunk_end,
                m_total_chunk_size,
                hg_bulk_t(m_buffers)
            };
        }

    private:
        std::string m_path;
        int64_t m_offset;
        uint64_t m_host_id;
        uint64_t m_host_size;
        uint64_t m_chunk_n;
        uint64_t m_chunk_start;
        uint64_t m_chunk_end;
        uint64_t m_total_chunk_size;
        hermes::exposed_memory m_buffers;
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() :
            m_err(),
            m_io_size() {}

        output(int32_t err, size_t io_size) :
            m_err(err),
            m_io_size(io_size) {}

        output(output&& rhs) = default;
        output(const output& other) = default;
        output& operator=(output&& rhs) = default;
        output& operator=(const output& other) = default;

        explicit 
        output(const rpc_data_out_t& out) {
            m_err = out.err;
            m_io_size = out.io_size;
        }

        int32_t
        err() const {
            return m_err;
        }

        int64_t
        io_size() const {
            return m_io_size;
        }

    private:
        int32_t m_err;
        size_t m_io_size;
    };
};

} // namespace rpc
} // namespace gkfs

+2 −0
Original line number Diff line number Diff line
@@ -203,6 +203,8 @@ bool init_hermes_client(const std::string& transport_prefix) {
    rpc_mk_symlink_id = gkfs::rpc::mk_symlink::public_id;
#endif // HAS_SYMLINKS

    rpc_write_data_id = gkfs::rpc::write_data::public_id;

    return true;
}

+1 −0
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ register_user_request_types() {
    (void) registered_requests().add<gkfs::rpc::mk_symlink>();
#endif // HAS_SYMLINKS

    (void) registered_requests().add<gkfs::rpc::write_data>();

}

+128 −81
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#include "global/rpc/rpc_types.hpp"
#include <global/rpc/distributor.hpp>
#include <global/chunk_calc_util.hpp>
#include <client/rpc/hg_rpcs.hpp>

#include <unordered_set>

@@ -32,122 +33,168 @@ using namespace std;
/**
 * Sends an RPC request to a specific node to pull all chunks that belong to him
 */
ssize_t write(const string& path, const void* buf, const bool append_flag, const off64_t in_offset,
                       const size_t write_size, const int64_t updated_metadentry_size) {
ssize_t write(const string& path, const void* buf, const bool append_flag, 
              const off64_t in_offset, const size_t write_size, 
              const int64_t updated_metadentry_size) {

    assert(write_size > 0);
    // Calculate chunkid boundaries and numbers so that daemons know in which interval to look for chunks
    off64_t offset = in_offset;
    if (append_flag)
        offset = updated_metadentry_size - write_size;

    // Calculate chunkid boundaries and numbers so that daemons know in 
    // which interval to look for chunks
    off64_t offset = append_flag ? 
                        in_offset : 
                        (updated_metadentry_size - write_size);

    auto chnk_start = chnk_id_for_offset(offset, CHUNKSIZE);
    auto chnk_end = chnk_id_for_offset((offset + write_size) - 1, CHUNKSIZE);

    // Collect all chunk ids within count that have the same destination so that those are send in one rpc bulk transfer
    map<uint64_t, vector<uint64_t>> target_chnks{};
    // contains the target ids, used to access the target_chnks map. First idx is chunk with potential offset
    vector<uint64_t> targets{};
    // Collect all chunk ids within count that have the same destination so 
    // that those are send in one rpc bulk transfer
    std::map<uint64_t, std::vector<uint64_t>> target_chnks{};
    // contains the target ids, used to access the target_chnks map. 
    // First idx is chunk with potential offset
    std::vector<uint64_t> targets{};

    // targets for the first and last chunk as they need special treatment
    uint64_t chnk_start_target = 0;
    uint64_t chnk_end_target = 0;

    for (uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
        auto target = CTX->distributor()->locate_data(path, chnk_id);

        if (target_chnks.count(target) == 0) {
            target_chnks.insert(make_pair(target, vector<uint64_t>{chnk_id}));
            target_chnks.insert(
                    std::make_pair(target, std::vector<uint64_t>{chnk_id}));
            targets.push_back(target);
        } else
        } else {
            target_chnks[target].push_back(chnk_id);
        }

        // set first and last chnk targets
        if (chnk_id == chnk_start)
        if (chnk_id == chnk_start) {
            chnk_start_target = target;
        if (chnk_id == chnk_end)
        }

        if (chnk_id == chnk_end) {
            chnk_end_target = target;
        }
    }

    // some helper variables for async RPC
    auto target_n = targets.size();
    vector<hg_handle_t> rpc_handles(target_n);
    vector<margo_request> rpc_waiters(target_n);
    vector<rpc_write_data_in_t> rpc_in(target_n);
    // register local target buffer for bulk access for margo instance
    auto bulk_buf = const_cast<void*>(buf);
    hg_bulk_t rpc_bulk_handle = nullptr;
    auto size = make_shared<size_t>(write_size);
    auto ret = margo_bulk_create(ld_margo_rpc_id, 1, &bulk_buf, size.get(), HG_BULK_READ_ONLY, &rpc_bulk_handle);
    if (ret != HG_SUCCESS) {
        CTX->log()->error("{}() Failed to create rpc bulk handle", __func__);

    std::vector<hermes::mutable_buffer> bufseq{
        hermes::mutable_buffer{const_cast<void*>(buf), write_size},
    };

    hermes::exposed_memory local_buffers;

    try {
        local_buffers = 
            ld_network_service->expose(bufseq, hermes::access_mode::read_only);

    } catch (const std::exception& ex) {
        CTX->log()->error("{}() Failed to expose buffers for RMA", __func__);
        errno = EBUSY;
        return -1;
    }

    std::vector<hermes::rpc_handle<gkfs::rpc::write_data>> handles;

    // Issue non-blocking RPC requests and wait for the result later
    for (uint64_t i = 0; i < target_n; i++) {
        auto target = targets[i];
        auto total_chunk_size = target_chnks[target].size() * CHUNKSIZE; // total chunk_size for target
        if (target == chnk_start_target) // receiver of first chunk must subtract the offset from first chunk
    for(const auto& target : targets) {

        // total chunk_size for target
        auto total_chunk_size = target_chnks[target].size() * CHUNKSIZE;

        // receiver of first chunk must subtract the offset from first chunk
        if (target == chnk_start_target) {
            total_chunk_size -= chnk_lpad(offset, CHUNKSIZE);
        if (target == chnk_end_target) // receiver of last chunk must subtract
        }

        // receiver of last chunk must subtract
        if (target == chnk_end_target) {
            total_chunk_size -= chnk_rpad(offset + write_size, CHUNKSIZE);
        // Fill RPC input
        rpc_in[i].path = path.c_str();
        rpc_in[i].host_id = target;
        rpc_in[i].host_size = CTX->hosts().size();
        rpc_in[i].offset = chnk_lpad(offset, CHUNKSIZE);// first offset in targets is the chunk with a potential offset
        rpc_in[i].chunk_n = target_chnks[target].size(); // number of chunks handled by that destination
        rpc_in[i].chunk_start = chnk_start; // chunk start id of this write
        rpc_in[i].chunk_end = chnk_end; // chunk end id of this write
        rpc_in[i].total_chunk_size = total_chunk_size; // total size to write
        rpc_in[i].bulk_handle = rpc_bulk_handle;
        margo_create_wrap_helper(rpc_write_data_id, target, rpc_handles[i]);
        // Send RPC
        CTX->log()->trace("{}() host: {}, path: {}, chunks: {}, size: {}, offset: {}", __func__,
                           target, path, rpc_in[i].chunk_n, total_chunk_size, rpc_in[i].offset);
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to send non-blocking rpc for path {} and recipient {}", __func__, path,
        }

        auto endp = CTX->hosts2().at(
            CTX->distributor()->locate_file_metadata(path));

        try {

            CTX->log()->debug("{}() Sending RPC ...", __func__);

            gkfs::rpc::write_data::input in(
                path,
                // first offset in targets is the chunk with 
                // a potential offset
                chnk_lpad(offset, CHUNKSIZE),
                target,
                CTX->hosts2().size(),
                // number of chunks handled by that destination
                target_chnks[target].size(),
                // chunk start id of this write
                chnk_start,
                // chunk end id of this write
                chnk_end,
                // total size to write
                total_chunk_size,
                local_buffers);

            // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that
            // we can retry for RPC_TRIES (see old commits with margo)
            // TODO(amiranda): hermes will eventually provide a post(endpoint) 
            // returning one result and a broadcast(endpoint_set) returning a 
            // result_set. When that happens we can remove the .at(0) :/
            handles.emplace_back(
                ld_network_service->post<gkfs::rpc::write_data>(endp, in));

            CTX->log()->trace("{}() host: {}, path: {}, chunks: {}, size: {}, "
                              "offset: {}", __func__,
                              target, path, in.chunk_n(), 
                              total_chunk_size, in.offset());

        } catch(const std::exception& ex) {
            CTX->log()->error("{}() Unable to send non-blocking rpc for "
                              "path {} and recipient {}", __func__, path,
                              target);
            errno = EBUSY;
            for (uint64_t j = 0; j < i + 1; j++) {
                margo_destroy(rpc_handles[j]);
            }
            // free bulk handles for buffer
            margo_bulk_free(rpc_bulk_handle);
            return -1;
        }
    }

    // Wait for RPC responses and then get response and add it to out_size which is the written size
    // All potential outputs are served to free resources regardless of errors, although an errorcode is set.
    ssize_t out_size = 0;
    // Wait for RPC responses and then get response and add it to out_size
    // which is the written size All potential outputs are served to free
    // resources regardless of errors, although an errorcode is set.
    bool error = false;
    for (unsigned int i = 0; i < target_n; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path,
                             targets[i]);
    ssize_t out_size = 0;
    std::size_t idx = 0;

    for(const auto& h : handles) {
        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            auto out = h.get().at(0);

            if(out.err() != 0) {
                CTX->log()->error("{}() Daemon reported error: {}", 
                                  __func__, out.err());
                error = true;
            errno = EBUSY;
                errno = out.err();
            }
        // decode response
        rpc_data_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret != HG_SUCCESS) {
            CTX->log()->error("{}() Failed to get rpc output for path {} recipient {}", __func__, path, targets[i]);

            out_size += static_cast<size_t>(out.io_size());

        } catch(const std::exception& ex) {
            CTX->log()->error("{}() Failed to get rpc output for path {} "
                              "recipient {}", __func__, path, targets[idx]);
            error = true;
            errno = EIO;
        }
        if (out.err != 0) {
            CTX->log()->error("{}() Daemon reported error: {}", __func__, out.err);
            error = true;
            errno = out.err;
        }
        out_size += static_cast<size_t>(out.io_size);
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);

        ++idx;
    }
    // free bulk handles for buffer
    margo_bulk_free(rpc_bulk_handle);
    return (error) ? -1 : out_size;

    return error ? -1 : out_size;
}

/**