Commit 91679852 authored by Marc Vef's avatar Marc Vef
Browse files

Adding metadata redistribution receiving function

parent 0190d1cf
Loading
Loading
Loading
Loading
+32 −26
Original line number Diff line number Diff line
@@ -28,7 +28,9 @@
*/
#include <daemon/daemon.hpp>
#include <daemon/handler/rpc_defs.hpp>
#include <daemon/handler/rpc_util.hpp>
#include <daemon/malleability/malleable_manager.hpp>
#include <daemon/backend/metadata/db.hpp>

#include <common/rpc/rpc_types.hpp>

@@ -46,10 +48,11 @@ rpc_srv_expand_start(hg_handle_t handle) {
    rpc_err_out_t out;

    auto ret = margo_get_input(handle, &in);
    if(ret != HG_SUCCESS)
    if(ret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to retrieve input from handle", __func__);
    assert(ret == HG_SUCCESS);
        return gkfs::rpc::cleanup_respond(&handle, &in, &out);
    }
    GKFS_DATA->spdlogger()->debug(
            "{}() Got RPC with old conf '{}' new conf '{}'", __func__,
            in.old_server_conf, in.new_server_conf);
@@ -68,14 +71,7 @@ rpc_srv_expand_start(hg_handle_t handle) {

    GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
                                  out.err);
    auto hret = margo_respond(handle, &out);
    if(hret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
    }
    // Destroy handle when finished
    margo_free_input(handle, &in);
    margo_destroy(handle);
    return HG_SUCCESS;
    return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}

hg_return_t
@@ -93,13 +89,7 @@ rpc_srv_expand_status(hg_handle_t handle) {
    }
    GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
                                  out.err);
    auto hret = margo_respond(handle, &out);
    if(hret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
    }
    // Destroy handle when finished
    margo_destroy(handle);
    return HG_SUCCESS;
    return gkfs::rpc::cleanup_respond(&handle, &out);
}

hg_return_t
@@ -117,19 +107,35 @@ rpc_srv_expand_finalize(hg_handle_t handle) {

    GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
                                  out.err);
    auto hret = margo_respond(handle, &out);
    if(hret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
    }

    // Destroy handle when finished
    margo_destroy(handle);
    return HG_SUCCESS;
    return gkfs::rpc::cleanup_respond(&handle, &out);
}

hg_return_t
rpc_srv_migrate_metadata(hg_handle_t handle) {
    return HG_SUCCESS;
    rpc_migrate_metadata_in_t in{};
    rpc_err_out_t out{};

    auto ret = margo_get_input(handle, &in);
    if(ret != HG_SUCCESS) {
        GKFS_DATA->spdlogger()->error(
                "{}() Failed to retrieve input from handle", __func__);
        return gkfs::rpc::cleanup_respond(&handle, &in, &out);
    }
    GKFS_DATA->spdlogger()->debug("{}() Got RPC with key '{}' value '{}'",
                                  __func__, in.key, in.value);
    try {
        // create metadentry
        GKFS_DATA->mdb()->put(in.key, in.value);
        out.err = 0;
    } catch(const std::exception& e) {
        GKFS_DATA->spdlogger()->error("{}() Failed to create KV entry: '{}'",
                                      __func__, e.what());
        out.err = -1;
    }

    GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
                                  out.err);
    return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}

hg_return_t