Commit b31d656a authored by David Auer's avatar David Auer
Browse files

Transmit hosts data on relocation start RPC

parent 04577716
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -100,6 +100,10 @@ MERCURY_GEN_PROC(rpc_config_out_t,
                         (hg_uint32_t)(gid)))


MERCURY_GEN_PROC(rpc_relocation_start_in_t,
                 ((hg_uint64_t)(host_id))((hg_uint64_t)(host_size)))


MERCURY_GEN_PROC(rpc_chunk_stat_in_t, ((hg_int32_t)(dummy)))

MERCURY_GEN_PROC(rpc_chunk_stat_out_t,
+2 −1
Original line number Diff line number Diff line
@@ -125,7 +125,8 @@ register_server_rpcs(margo_instance_id mid) {
    MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
                   rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat);
    if(gkfs::config::dynamic_placement) {
        MARGO_REGISTER(mid, gkfs::rpc::tag::relocation_start, void, void,
        MARGO_REGISTER(mid, gkfs::rpc::tag::relocation_start,
                       rpc_relocation_start_in_t, rpc_err_out_t,
                       rpc_srv_relocation_start);
    }
}
+24 −11
Original line number Diff line number Diff line
@@ -14,8 +14,10 @@

#include <daemon/daemon.hpp>
#include <daemon/handler/rpc_defs.hpp>
#include <daemon/handler/rpc_util.hpp>
#include <daemon/backend/metadata/db.hpp>

#include <global/rpc/distributor.hpp>
#include <global/rpc/rpc_types.hpp>
#include <iostream>

@@ -60,24 +62,35 @@ rpc_srv_get_fs_config(hg_handle_t handle) {
hg_return_t
rpc_srv_relocation_start(hg_handle_t handle) {
    cout << "TODO relocation start received\n";
    auto hret = margo_respond(handle, nullptr);

    rpc_relocation_start_in_t in{};
    rpc_err_out_t out{};
    // out.err = EIO;
    // Getting some information from margo
    auto ret = margo_get_input(handle, &in);
    if(ret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error(
                "{}() Could not get RPC input data with err {}", __func__, ret);
        return gkfs::rpc::cleanup_respond(&handle, &in, &out);
    }
    // GKFS_DATA->mdb()->print_all();


    auto const host_id = in.host_id;
    auto const host_size = in.host_size;
    gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size);
    // TODO make use of distributor
    cout << fmt::format("Got host_id = {} and host_size = {}\n", host_id,
                        host_size);

    auto metadata_dump = GKFS_DATA->mdb()->get_all();

    for(auto metadatum : metadata_dump) {
        cout << 
    }
    
    if(hret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to respond to relocation start", __func__);
        cout << "Checking " << metadatum.first << " Val: " << metadatum.second
             << "\n";
    }

    margo_destroy(handle);

    return HG_SUCCESS;
    out.err = 0; // TODO
    return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}

} // namespace
+21 −4
Original line number Diff line number Diff line
@@ -27,6 +27,7 @@
// #include <config.hpp>
#include <global/global_defs.hpp>
#include <global/hostsfile_util.hpp>
#include <global/rpc/rpc_types.hpp>


#include <fmt/format.h>
@@ -78,14 +79,20 @@ test_d2d_rpc() {
    assert(mid);


    hg_id_t relocation_start_id =
            MARGO_REGISTER(mid, gkfs::rpc::tag::relocation_start, void, void,
                           rpc_srv_relocation_start);
    hg_id_t relocation_start_id = MARGO_REGISTER(
            mid, gkfs::rpc::tag::relocation_start, rpc_relocation_start_in_t,
            rpc_err_out_t, rpc_srv_relocation_start);


    uint64_t host_counter = 0;
    for(auto host : hosts) {
        cout << fmt::format("Host {} bla {}\n", host.first, host.second);
        hg_addr_t host_addr;
        // TODO
        rpc_relocation_start_in_t in{};
        rpc_err_out_t out{};
        in.host_id = host_counter; // TODO is this consistent with client?
        in.host_size = hosts.size();
        auto ret = margo_addr_lookup(mid, host.second.c_str(), &host_addr);
        assert(ret == HG_SUCCESS);

@@ -94,8 +101,18 @@ test_d2d_rpc() {
        ret = margo_create(mid, host_addr, relocation_start_id, &handle);
        assert(ret == HG_SUCCESS);

        ret = margo_forward(handle, nullptr); // blocking
        ret = margo_forward(handle, &in); // blocking
        assert(ret == HG_SUCCESS);

        ret = margo_get_output(handle, &out);
        assert(ret == HG_SUCCESS);

        // TODO process output

        ret = margo_free_output(handle, &out);
        assert(ret == HG_SUCCESS);

        host_counter++;
    }
}