Verified Commit 55da3cba authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

remove RPC now uses Hermes instead of Margo

parent 3a17e07a
Loading
Loading
Loading
Loading
+100 −0
Original line number Diff line number Diff line
@@ -440,6 +440,106 @@ struct stat {
    };
};

//==============================================================================
// definitions for remove
struct remove {

    // forward declarations of public input/output types for this RPC
    class input;
    class output;

    // traits used so that the engine knows what to do with the RPC
    using self_type = remove;
    using handle_type = hermes::rpc_handle<self_type>;
    using input_type = input;
    using output_type = output;
    using mercury_input_type = rpc_rm_node_in_t;
    using mercury_output_type = rpc_err_out_t;

    // RPC public identifier
    constexpr static const uint64_t public_id = 2549415936;

    // RPC internal Mercury identifier
    constexpr static const hg_id_t mercury_id = public_id;

    // RPC name
    constexpr static const auto name = hg_tag::remove;

    // requires response?
    constexpr static const auto requires_response = true;

    // Mercury callback to serialize input arguments
    constexpr static const auto mercury_in_proc_cb = 
        HG_GEN_PROC_NAME(rpc_rm_node_in_t);

    // Mercury callback to serialize output arguments
    constexpr static const auto mercury_out_proc_cb = 
        HG_GEN_PROC_NAME(rpc_err_out_t);

    class input {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        input(const std::string& path) :
            m_path(path) { }

        input(input&& rhs) = default;
        input(const input& other) = default;
        input& operator=(input&& rhs) = default;
        input& operator=(const input& other) = default;

        std::string
        path() const {
            return m_path;
        }

        explicit
        input(const rpc_rm_node_in_t& other) :
            m_path(other.path) { }

        explicit
        operator rpc_rm_node_in_t() {
            return {m_path.c_str()};
        }

    private:
        std::string m_path;
    };

    class output {

        template <typename ExecutionContext>
        friend hg_return_t hermes::detail::post_to_mercury(ExecutionContext*);

    public:
        output() :
            m_err() {}

        output(int32_t err) :
            m_err(err) {}

        output(output&& rhs) = default;
        output(const output& other) = default;
        output& operator=(output&& rhs) = default;
        output& operator=(const output& other) = default;

        explicit 
        output(const rpc_err_out_t& out) {
            m_err = out.err;
        }

        int32_t
        err() const {
            return m_err;
        }

    private:
        int32_t m_err;
    };
};

} // namespace rpc
} // namespace gkfs

+1 −0
Original line number Diff line number Diff line
@@ -193,6 +193,7 @@ bool init_hermes_client(const std::string& transport_prefix) {
    rpc_config_id = gkfs::rpc::fs_config::public_id;
    rpc_mk_node_id = gkfs::rpc::create::public_id;
    rpc_stat_id = gkfs::rpc::stat::public_id;
    rpc_rm_node_id = gkfs::rpc::remove::public_id;

    return true;
}
+1 −0
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ register_user_request_types() {
    (void) registered_requests().add<gkfs::rpc::fs_config>();
    (void) registered_requests().add<gkfs::rpc::create>();
    (void) registered_requests().add<gkfs::rpc::stat>();
    (void) registered_requests().add<gkfs::rpc::remove>();
}

}} // namespace hermes::detail
+58 −57
Original line number Diff line number Diff line
@@ -137,70 +137,71 @@ int decr_size(const std::string& path, size_t length) {
}

int rm_node(const std::string& path, const bool remove_metadentry_only) {
    hg_return_t ret;
    int err = 0; // assume we succeed
    // if metadentry should only removed only, send only 1 rpc to remove the metadata
    // else send an rpc to all hosts and thus broadcast chunk_removal.
    auto rpc_target_size = remove_metadentry_only ? static_cast<uint64_t>(1) : CTX->hosts().size();

    CTX->log()->debug("{}() Creating Mercury handles for all nodes ...", __func__);
    vector<hg_handle_t> rpc_handles(rpc_target_size);
    vector<margo_request> rpc_waiters(rpc_target_size);
    vector<rpc_rm_node_in_t> rpc_in(rpc_target_size);
    // Send rpc to all nodes as all of them can have chunks for this path
    for (size_t i = 0; i < rpc_target_size; i++) {
        // fill in
        rpc_in[i].path = path.c_str();
        // create handle
        // if only the metadentry needs to removed send one rpc to metadentry's responsible node
        if (remove_metadentry_only)
            ret = margo_create_wrap(rpc_rm_node_id, path, rpc_handles[i]);
        else
            ret = margo_create_wrap_helper(rpc_rm_node_id, i, rpc_handles[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->warn("{}() Unable to create Mercury handle", __func__);
            // We use continue here to remove at least some data
            // XXX In the future we can discuss RPC retrying. This should be a function to be used in general
            errno = EBUSY;
            err = -1;
        }
        // send async rpc
        ret = margo_iforward(rpc_handles[i], &rpc_in[i], &rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->warn("{}() Unable to create Mercury handle", __func__);

    // if only the metadentry should be removed, send one rpc to the
    // metadentry's responsible node to remove the metadata
    // else, send an rpc to all hosts and thus broadcast chunk_removal.
    if(remove_metadentry_only) {

        auto idx = CTX->distributor()->locate_file_metadata(path);
        auto endp = CTX->hosts2().at(idx);

        try {

            CTX->log()->debug("{}() Sending RPC ...", __func__);
            // TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
            // retry for RPC_TRIES (see old commits with margo)
            // TODO(amiranda): hermes will eventually provide a post(endpoint) 
            // returning one result and a broadcast(endpoint_set) returning a 
            // result_set. When that happens we can remove the .at(0) :/
            auto out = 
                ld_network_service->post<gkfs::rpc::remove>(endp, path).get().at(0);

            CTX->log()->debug("{}() Got response success: {}", __func__, out.err());

            assert(out.err() == 0);

        } catch(const std::exception& ex) {
            CTX->log()->error("{}() while getting rpc output", __func__);
            errno = EBUSY;
            err = -1;
            return -1;
        }

        return 0;
    }


    std::size_t rpc_target_size2 = CTX->hosts2().size();
    std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles;

    hermes::endpoint_set endps;

    std::copy(CTX->hosts2().begin(), 
              CTX->hosts2().end(), 
              std::back_inserter(endps));

    try {

        auto output_set = 
            ld_network_service->broadcast<gkfs::rpc::remove>(endps, path).get();

        // Wait for RPC responses and then get response
    for (size_t i = 0; i < rpc_target_size; i++) {
        // XXX We might need a timeout here to not wait forever for an output that never comes?
        ret = margo_wait(rpc_waiters[i]);
        if (ret != HG_SUCCESS) {
            CTX->log()->warn("{}() Unable to wait for margo_request handle for path {} recipient {}", __func__, path, i);
            errno = EBUSY;
            err = -1;
        for (const auto& out : output_set) {
            CTX->log()->debug("{}() Got response success: {}", __func__, out.err());

            if(out.err() != 0) {
                errno = out.err();
                return -1;
            }
        rpc_err_out_t out{};
        ret = margo_get_output(rpc_handles[i], &out);
        if (ret == HG_SUCCESS) {
            CTX->log()->debug("{}() Got response success: {}", __func__, out.err);
            if (err != 0) {
                errno = out.err;
                err = -1;
        }
        } else {
            // something is wrong
            errno = EBUSY;
            err = -1;

        return 0;

    } catch(const std::exception& ex) {
        CTX->log()->error("{}() while getting rpc output", __func__);
        errno = EBUSY;
        return -1;
    }
        /* clean up resources consumed by this rpc */
        margo_free_output(rpc_handles[i], &out);
        margo_destroy(rpc_handles[i]);
    }
    return err;
}