Skip to content
Snippets Groups Projects
Commit 310b469d authored by David Auer's avatar David Auer
Browse files

Transmit hosts data on relocation start RPC

parent 4b860ce1
No related branches found
No related tags found
No related merge requests found
......@@ -97,6 +97,10 @@ MERCURY_GEN_PROC(rpc_config_out_t,
(hg_uint32_t)(gid)))
MERCURY_GEN_PROC(rpc_relocation_start_in_t,
((hg_uint64_t)(host_id))((hg_uint64_t)(host_size)))
MERCURY_GEN_PROC(rpc_chunk_stat_in_t, ((hg_int32_t)(dummy)))
MERCURY_GEN_PROC(rpc_chunk_stat_out_t,
......
......@@ -120,7 +120,8 @@ register_server_rpcs(margo_instance_id mid) {
MARGO_REGISTER(mid, gkfs::rpc::tag::get_chunk_stat, rpc_chunk_stat_in_t,
rpc_chunk_stat_out_t, rpc_srv_get_chunk_stat);
if(gkfs::config::dynamic_placement) {
MARGO_REGISTER(mid, gkfs::rpc::tag::relocation_start, void, void,
MARGO_REGISTER(mid, gkfs::rpc::tag::relocation_start,
rpc_relocation_start_in_t, rpc_err_out_t,
rpc_srv_relocation_start);
}
}
......
......@@ -14,8 +14,10 @@
#include <daemon/daemon.hpp>
#include <daemon/handler/rpc_defs.hpp>
#include <daemon/handler/rpc_util.hpp>
#include <daemon/backend/metadata/db.hpp>
#include <global/rpc/distributor.hpp>
#include <global/rpc/rpc_types.hpp>
#include <iostream>
......@@ -60,24 +62,35 @@ rpc_srv_get_fs_config(hg_handle_t handle) {
hg_return_t
rpc_srv_relocation_start(hg_handle_t handle) {
cout << "TODO relocation start received\n";
auto hret = margo_respond(handle, nullptr);
rpc_relocation_start_in_t in{};
rpc_err_out_t out{};
// out.err = EIO;
// Getting some information from margo
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Could not get RPC input data with err {}", __func__, ret);
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
// GKFS_DATA->mdb()->print_all();
auto const host_id = in.host_id;
auto const host_size = in.host_size;
gkfs::rpc::SimpleHashDistributor distributor(host_id, host_size);
// TODO make use of distributor
cout << fmt::format("Got host_id = {} and host_size = {}\n", host_id,
host_size);
auto metadata_dump = GKFS_DATA->mdb()->get_all();
for(auto metadatum : metadata_dump) {
cout <<
}
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to respond to relocation start", __func__);
cout << "Checking " << metadatum.first << " Val: " << metadatum.second
<< "\n";
}
margo_destroy(handle);
return HG_SUCCESS;
out.err = 0; // TODO
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
} // namespace
......
......@@ -27,6 +27,7 @@
// #include <config.hpp>
#include <global/global_defs.hpp>
#include <global/hostsfile_util.hpp>
#include <global/rpc/rpc_types.hpp>
#include <fmt/format.h>
......@@ -78,14 +79,20 @@ test_d2d_rpc() {
assert(mid);
hg_id_t relocation_start_id =
MARGO_REGISTER(mid, gkfs::rpc::tag::relocation_start, void, void,
rpc_srv_relocation_start);
hg_id_t relocation_start_id = MARGO_REGISTER(
mid, gkfs::rpc::tag::relocation_start, rpc_relocation_start_in_t,
rpc_err_out_t, rpc_srv_relocation_start);
uint64_t host_counter = 0;
for(auto host : hosts) {
cout << fmt::format("Host {} bla {}\n", host.first, host.second);
hg_addr_t host_addr;
// TODO
rpc_relocation_start_in_t in{};
rpc_err_out_t out{};
in.host_id = host_counter; // TODO is this consistent with client?
in.host_size = hosts.size();
auto ret = margo_addr_lookup(mid, host.second.c_str(), &host_addr);
assert(ret == HG_SUCCESS);
......@@ -94,8 +101,18 @@ test_d2d_rpc() {
ret = margo_create(mid, host_addr, relocation_start_id, &handle);
assert(ret == HG_SUCCESS);
ret = margo_forward(handle, nullptr); // blocking
ret = margo_forward(handle, &in); // blocking
assert(ret == HG_SUCCESS);
ret = margo_get_output(handle, &out);
assert(ret == HG_SUCCESS);
// TODO process output
ret = margo_free_output(handle, &out);
assert(ret == HG_SUCCESS);
host_counter++;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment