Commit 72fe4064 authored by David Auer's avatar David Auer
Browse files

Create shuffle_start rpc

parent a5413e71
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -46,6 +46,8 @@ DECLARE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink)

#endif

DECLARE_MARGO_RPC_HANDLER(rpc_srv_shuffle_start)


// data
DECLARE_MARGO_RPC_HANDLER(rpc_srv_remove_data)
+1 −0
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ constexpr auto get_dirents_extended = "rpc_srv_get_dirents_extended";
#ifdef HAS_SYMLINKS
constexpr auto mk_symlink = "rpc_srv_mk_symlink";
#endif
constexpr auto shuffle_start = "rpc_srv_shuffle_start";
constexpr auto write = "rpc_srv_write_data";
constexpr auto read = "rpc_srv_read_data";
constexpr auto truncate = "rpc_srv_trunc_data";
+30 −0
Original line number Diff line number Diff line
@@ -123,6 +123,8 @@ register_server_rpcs(margo_instance_id mid) {
                   rpc_srv_truncate);
    MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
                   rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat);
    MARGO_REGISTER(mid, gkfs::rpc::tag::shuffle_start, void, void,
                   rpc_srv_shuffle_start);
}

void
@@ -463,6 +465,34 @@ parse_input(const po::variables_map& vm) {
    }
}

void test_d2d_rpc() {
    auto hosts = gkfs::util::read_hosts_file();

    cout << "number of hosts: " << hosts.size() << "\n";
    // TODO need to pull correct addr_str
    auto mid = margo_init("ofi+sockets", MARGO_CLIENT_MODE, 0, 0);
    assert(mid);


    hg_id_t shuffle_start_id = MARGO_REGISTER(mid, gkfs::rpc::tag::shuffle_start, void, void, rpc_srv_shuffle_start);


    for (auto host : hosts) {
        cout << fmt::format("Host {} bla {}\n", host.first, host.second);
        hg_addr_t host_addr;
        auto ret = margo_addr_lookup(mid, host.second.c_str(), &host_addr);
        assert(ret == HG_SUCCESS);

        // let's do this sequential first
        hg_handle_t handle;
        ret = margo_create(mid, host_addr, shuffle_start_id, &handle);
        assert(ret == HG_SUCCESS);

        ret = margo_forward(handle, nullptr); // blocking
        assert(ret == HG_SUCCESS);
    }
}

int
main(int argc, const char* argv[]) {

+15 −0
Original line number Diff line number Diff line
@@ -16,6 +16,7 @@
#include <daemon/handler/rpc_defs.hpp>

#include <global/rpc/rpc_types.hpp>
#include <iostream>

using namespace std;

@@ -55,6 +56,20 @@ rpc_srv_get_fs_config(hg_handle_t handle) {
    return HG_SUCCESS;
}

hg_return_t rpc_srv_shuffle_start(hg_handle_t handle) {
    cout << "TODO shuffle start received\n";
    auto hret = margo_respond(handle, nullptr);
    if (hret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error("{}() Failed to respond to client to serve file system configurations",
                                      __func__);
    }

    margo_destroy(handle);
    return HG_SUCCESS;
}

} // namespace

DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_fs_config)

DEFINE_MARGO_RPC_HANDLER(rpc_srv_shuffle_start)