Program Listing for File srv_malleability.cpp
↰ Return to documentation for file (src/daemon/handler/srv_malleability.cpp)
/*
Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This software was partially supported by the
the European Union’s Horizon 2020 JTI-EuroHPC research and
innovation programme, by the project ADMIRE (Project ID: 956748,
admire-eurohpc.eu)
This project was partially promoted by the Ministry for Digital Transformation
and the Civil Service, within the framework of the Recovery,
Transformation and Resilience Plan - Funded by the European Union
-NextGenerationEU.
This file is part of GekkoFS' POSIX interface.
GekkoFS' POSIX interface is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GekkoFS' POSIX interface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GekkoFS' POSIX interface. If not, see
<https://www.gnu.org/licenses/>.
SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include <daemon/daemon.hpp>
#include <daemon/handler/rpc_defs.hpp>
#include <daemon/handler/rpc_util.hpp>
#include <daemon/malleability/malleable_manager.hpp>
#include <daemon/backend/metadata/db.hpp>
#include <common/rpc/rpc_types.hpp>
extern "C" {
#include <unistd.h>
}
using namespace std;
namespace {
hg_return_t
rpc_srv_expand_start(hg_handle_t handle) {
rpc_expand_start_in_t in;
rpc_err_out_t out;
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
GKFS_DATA->spdlogger()->debug(
"{}() Got RPC with old conf '{}' new conf '{}'", __func__,
in.old_server_conf, in.new_server_conf);
try {
// if maintenance mode is already set, error is thrown -- not allowed
GKFS_DATA->maintenance_mode(true);
GKFS_DATA->malleable_manager()->expand_start(in.old_server_conf,
in.new_server_conf);
out.err = 0;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to start expansion: '{}' ",
__func__, e.what());
GKFS_DATA->maintenance_mode(false);
out.err = -1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
hg_return_t
rpc_srv_expand_status(hg_handle_t handle) {
rpc_err_out_t out;
GKFS_DATA->spdlogger()->debug("{}() Got RPC ", __func__);
try {
// return 1 if redistribution is running, 0 otherwise.
out.err = GKFS_DATA->redist_running() ? 1 : 0;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to check status for expansion: '{}'", __func__,
e.what());
out.err = -1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
return gkfs::rpc::cleanup_respond(&handle, &out);
}
hg_return_t
rpc_srv_expand_finalize(hg_handle_t handle) {
rpc_err_out_t out;
GKFS_DATA->spdlogger()->debug("{}() Got RPC ", __func__);
try {
GKFS_DATA->maintenance_mode(false);
out.err = 0;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to finalize expansion: '{}'",
__func__, e.what());
out.err = -1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
return gkfs::rpc::cleanup_respond(&handle, &out);
}
hg_return_t
rpc_srv_migrate_metadata(hg_handle_t handle) {
rpc_migrate_metadata_in_t in{};
rpc_err_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
GKFS_DATA->spdlogger()->debug("{}() Got RPC with key '{}' value '{}'",
__func__, in.key, in.value);
try {
// create metadentry
GKFS_DATA->mdb()->put(in.key, in.value);
out.err = 0;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to create KV entry: '{}'",
__func__, e.what());
out.err = -1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
} // namespace
DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_start)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_status)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_expand_finalize)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_migrate_metadata)