Verified Commit 378887d8 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

get_dirents RPC now uses Hermes instead of Margo

parent ced527c4
Loading
Loading
Loading
Loading
+123 −0
Original line number Diff line number Diff line
@@ -1794,6 +1794,129 @@ struct trunc_data {
    };
};

//==============================================================================
// definitions for get_dirents
struct get_dirents {

    // forward declarations of public input/output types for this RPC
    class input;
    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = get_dirents;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = rpc_get_dirents_in_t;
    using mercury_output_type = rpc_get_dirents_out_t;

    // RPC public identifier
    // (N.B: we reuse the same IDs assigned by Margo so that the daemon 
    // understands Hermes RPCs)
    constexpr static const uint64_t public_id = 4121034752;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = public_id;

    // RPC name
    constexpr static const auto name = hg_tag::get_dirents;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb = 
        HG_GEN_PROC_NAME(rpc_get_dirents_in_t);

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb = 
        HG_GEN_PROC_NAME(rpc_get_dirents_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input(const std::string& path, 
              const hermes::exposed_memory& buffers) :
            m_path(path),
            m_buffers(buffers) { }

        input(input&& rhs) = default;
        input(const input& other) = default;
        input& operator=(input&& rhs) = default;
        input& operator=(const input& other) = default;

        std::string
        path() const {
            return m_path;
        }

        hermes::exposed_memory
        buffers() const {
            return m_buffers;
        }

        explicit
        input(const rpc_get_dirents_in_t& other) :
            m_path(other.path),
            m_buffers(other.bulk_handle) { }

        explicit
        operator rpc_get_dirents_in_t() {
            return {
                m_path.c_str(), 
                hg_bulk_t(m_buffers)
            };
        }

    private:
        std::string m_path;
        hermes::exposed_memory m_buffers;
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() :
            m_err(),
            m_dirents_size() {}

        output(int32_t err, size_t dirents_size) :
            m_err(err),
            m_dirents_size(dirents_size) {}

        output(output&& rhs) = default;
        output(const output& other) = default;
        output& operator=(output&& rhs) = default;
        output& operator=(const output& other) = default;

        explicit 
        output(const rpc_get_dirents_out_t& out) {
            m_err = out.err;
            m_dirents_size = out.dirents_size;
        }

        int32_t
        err() const {
            return m_err;
        }

        int64_t
        dirents_size() const {
            return m_dirents_size;
        }

    private:
        int32_t m_err;
        size_t m_dirents_size;
    };
};

} // namespace rpc
} // namespace gkfs

+1 −0
Original line number Diff line number Diff line
@@ -206,6 +206,7 @@ bool init_hermes_client(const std::string& transport_prefix) {
    rpc_write_data_id = gkfs::rpc::write_data::public_id;
    rpc_read_data_id = gkfs::rpc::read_data::public_id;
    rpc_trunc_data_id = gkfs::rpc::trunc_data::public_id;
    rpc_get_dirents_id = gkfs::rpc::get_dirents::public_id;

    return true;
}
+1 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@ register_user_request_types() {
    (void) registered_requests().add<gkfs::rpc::write_data>();
    (void) registered_requests().add<gkfs::rpc::read_data>();
    (void) registered_requests().add<gkfs::rpc::trunc_data>();
    (void) registered_requests().add<gkfs::rpc::get_dirents>();

}

+87 −63
Original line number Diff line number Diff line
@@ -318,14 +318,11 @@ int get_metadentry_size(const std::string& path, off64_t& ret_size) {
 * Sends an RPC request to a specific node to push all chunks that belong to him
 */
void get_dirents(OpenDir& open_dir){

    CTX->log()->trace("{}() called", __func__);
    auto const root_dir = open_dir.path();
    auto const targets = CTX->distributor()->locate_directory_metadata(root_dir);
    auto const host_size = targets.size();
    std::vector<hg_handle_t> rpc_handles(host_size);
    std::vector<margo_request> rpc_waiters(host_size);
    std::vector<rpc_get_dirents_in_t> rpc_in(host_size);
    std::vector<char*> recv_buffers(host_size);
    auto const targets = 
        CTX->distributor()->locate_directory_metadata(root_dir);

    /* preallocate receiving buffer. The actual size is not known yet.
     *
@@ -333,83 +330,110 @@ void get_dirents(OpenDir& open_dir){
     * It turns out that this operation is increadibly slow for such a big
     * buffer. Moreover we don't need a zeroed buffer here.
     */
    auto recv_buff = std::unique_ptr<char[]>(new char[RPC_DIRENTS_BUFF_SIZE]);
    const unsigned long int per_host_buff_size = RPC_DIRENTS_BUFF_SIZE / host_size;
    auto large_buffer = 
        std::unique_ptr<char[]>(new char[RPC_DIRENTS_BUFF_SIZE]);

    //XXX there is a rounding error here depending on the number of targets...
    const std::size_t per_host_buff_size = 
        RPC_DIRENTS_BUFF_SIZE / targets.size();

    // expose local buffers for RMA from servers
    std::vector<hermes::exposed_memory> exposed_buffers;
    exposed_buffers.reserve(targets.size());

    for(std::size_t i = 0; i < targets.size(); ++i) {
        try {
            exposed_buffers.emplace_back(
                ld_network_service->expose(
                    std::vector<hermes::mutable_buffer>{
                        hermes::mutable_buffer{
                            large_buffer.get() + (i * per_host_buff_size),
                            per_host_buff_size
                        }
                    },
                    hermes::access_mode::write_only));
        } catch (const std::exception& ex) {
            throw std::runtime_error("Failed to expose buffers for RMA");
        }
    }

    // send RPCs
    std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents>> handles;

    hg_return_t hg_ret;
    for(std::size_t i = 0; i < targets.size(); ++i) {

    for(const auto& target_host: targets){
        CTX->log()->trace("{}() target_host: {}", __func__, targets[i]);

        CTX->log()->trace("{}() target_host: {}", __func__, target_host);
        // Setup rpc input parameters for each host
        rpc_in[target_host].path = root_dir.c_str();
        recv_buffers[target_host] = recv_buff.get() + (target_host * per_host_buff_size);
        auto endp = CTX->hosts2().at(targets[i]);

        hg_ret = margo_bulk_create(
                    ld_margo_rpc_id, 1,
                    reinterpret_cast<void**>(&recv_buffers[target_host]),
                    &per_host_buff_size,
                    HG_BULK_WRITE_ONLY, &(rpc_in[target_host].bulk_handle));
        if(hg_ret != HG_SUCCESS){
            throw std::runtime_error("Failed to create margo bulk handle");
        }
        gkfs::rpc::get_dirents::input in(root_dir, exposed_buffers[i]);

        hg_ret = margo_create_wrap_helper(rpc_get_dirents_id, target_host, rpc_handles[target_host]);
        if (hg_ret != HG_SUCCESS) {
            std::runtime_error("Failed to create margo handle");
        }
        // Send RPC
        CTX->log()->trace("{}() Sending RPC to host: {}", __func__, target_host);
        hg_ret = margo_iforward(rpc_handles[target_host],
                             &rpc_in[target_host],
                             &rpc_waiters[target_host]);
        if (hg_ret != HG_SUCCESS) {
            CTX->log()->error("{}() Unable to send non-blocking get_dirents on {} to recipient {}", __func__, root_dir, target_host);
            for (uint64_t i = 0; i <= target_host; i++) {
                margo_bulk_free(rpc_in[i].bulk_handle);
                margo_destroy(rpc_handles[i]);
            }
            throw std::runtime_error("Failed to forward non-blocking rpc request");
        try {

            CTX->log()->trace("{}() Sending RPC to host: {}",
                              __func__, targets[i]);
            handles.emplace_back(
                ld_network_service->post<gkfs::rpc::get_dirents>(endp, in));
        } catch(const std::exception& ex) {
            CTX->log()->error("{}() Unable to send non-blocking get_dirents "
                              "on {} to recipient {}", 
                              __func__, root_dir, targets[i]);
            throw std::runtime_error("Failed to post non-blocking RPC request");
        }
    }

    for(unsigned int target_host = 0; target_host < host_size; target_host++){
        hg_ret = margo_wait(rpc_waiters[target_host]);
        if (hg_ret != HG_SUCCESS) {
            throw std::runtime_error(fmt::format("Failed while waiting for rpc completion. [root dir: {}, target host: {}]", root_dir, target_host));
    // wait for RPC responses
    for(std::size_t i = 0; i < handles.size(); ++i) {

        gkfs::rpc::get_dirents::output out;

        try {
            // XXX We might need a timeout here to not wait forever for an
            // output that never comes?
            out = handles[i].get().at(0);

            if(out.err() != 0) {
                throw std::runtime_error(
                        fmt::format("Failed to retrieve dir entries from "
                                    "host '{}'. Error '{}', path '{}'", 
                                    targets[i], strerror(out.err()), root_dir));
            }
        rpc_get_dirents_out_t out{};
        hg_ret = margo_get_output(rpc_handles[target_host], &out);
        if (hg_ret != HG_SUCCESS) {
            throw std::runtime_error(fmt::format("Failed to get rpc output.. [path: {}, target host: {}]", root_dir, target_host));
        } catch(const std::exception& ex) {
            throw std::runtime_error(
                    fmt::format("Failed to get rpc output.. [path: {}, "
                                "target host: {}]", root_dir, targets[i]));
        }

        if (out.err) {
            CTX->log()->error("{}() Sending RPC to host: {}", __func__, target_host);
            throw std::runtime_error(fmt::format("Failed to retrieve dir entries from host '{}'. "
                                                 "Error '{}', path '{}'", target_host, strerror(out.err), root_dir));
        }
        bool* bool_ptr = reinterpret_cast<bool*>(recv_buffers[target_host]);
        char* names_ptr = recv_buffers[target_host] + (out.dirents_size * sizeof(bool));
        // each server wrote information to its pre-defined region in 
        // large_buffer, recover it by computing the base_address for each
        // particular server and adding the appropriate offsets
        assert(exposed_buffers[i].count() == 1);
        void* base_ptr = exposed_buffers[i].begin()->data();

        bool* bool_ptr = reinterpret_cast<bool*>(base_ptr);
        char* names_ptr = reinterpret_cast<char*>(base_ptr) + 
                            (out.dirents_size() * sizeof(bool));

        for(unsigned int i = 0; i < out.dirents_size; i++){
        for(std::size_t j = 0; j < out.dirents_size(); j++) {

            FileType ftype = (*bool_ptr)? FileType::directory : FileType::regular;
            FileType ftype = (*bool_ptr) ? 
                                FileType::directory : 
                                FileType::regular;
            bool_ptr++;

            // Check that we are not outside the recv_buff for this specific host
            assert((names_ptr - recv_buffers[target_host]) > 0);
            assert(static_cast<unsigned long int>(names_ptr - recv_buffers[target_host]) < per_host_buff_size);
            assert((names_ptr - reinterpret_cast<char*>(base_ptr)) > 0);
            assert(
                static_cast<unsigned long int>(
                    names_ptr - reinterpret_cast<char*>(base_ptr)) < 
                per_host_buff_size);

            auto name = std::string(names_ptr);
            names_ptr += name.size() + 1;

            open_dir.add(name, ftype);
        }

        margo_free_output(rpc_handles[target_host], &out);
        margo_bulk_free(rpc_in[target_host].bulk_handle);
        margo_destroy(rpc_handles[target_host]);
    }
}