Program Listing for File srv_metadata.cpp
↰ Return to documentation for file (src/proxy/rpc/srv_metadata.cpp)
/*
Copyright 2018-2025, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2025, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This software was partially supported by the
the European Union’s Horizon 2020 JTI-EuroHPC research and
innovation programme, by the project ADMIRE (Project ID: 956748,
admire-eurohpc.eu)
This project was partially promoted by the Ministry for Digital Transformation
and the Civil Service, within the framework of the Recovery,
Transformation and Resilience Plan - Funded by the European Union
-NextGenerationEU.
SPDX-License-Identifier: MIT
*/
#include <proxy/proxy.hpp>
#include <proxy/rpc/rpc_defs.hpp>
#include <proxy/rpc/forward_metadata.hpp>
#include <proxy/rpc/rpc_util.hpp>
#include <common/rpc/rpc_types.hpp>
static hg_return_t
proxy_rpc_srv_create(hg_handle_t handle) {
rpc_mk_node_in_t client_in{};
rpc_err_out_t client_out{};
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug("{}() Got RPC with path '{}'", __func__,
client_in.path);
client_out.err = gkfs::rpc::forward_create(client_in.path, client_in.mode);
PROXY_DATA->log()->debug("{}() Sending output err '{}'", __func__,
client_out.err);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_create)
static hg_return_t
proxy_rpc_srv_stat(hg_handle_t handle) {
rpc_path_only_in_t client_in{};
rpc_stat_out_t client_out{};
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug("{}() Got RPC with path '{}'", __func__,
client_in.path);
auto out = gkfs::rpc::forward_stat(client_in.path);
client_out.err = out.first;
client_out.db_val = out.second.c_str();
PROXY_DATA->log()->debug("{}() Sending output err '{}'", __func__,
client_out.err);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_stat)
static hg_return_t
proxy_rpc_srv_remove(hg_handle_t handle) {
rpc_rm_node_in_t client_in{};
rpc_err_out_t client_out{};
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug("{}() Got RPC with path '{}'", __func__,
client_in.path);
client_out.err =
gkfs::rpc::forward_remove(client_in.path, client_in.rm_dir);
PROXY_DATA->log()->debug("{}() Sending output err '{}'", __func__,
client_out.err);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_remove)
static hg_return_t
proxy_rpc_srv_decr_size(hg_handle_t handle) {
rpc_trunc_in_t client_in{};
rpc_err_out_t client_out{};
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug("{}() Got RPC with path '{}' length '{}'",
__func__, client_in.path, client_in.length);
client_out.err =
gkfs::rpc::forward_decr_size(client_in.path, client_in.length);
PROXY_DATA->log()->debug("{}() Sending output err '{}'", __func__,
client_out.err);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_decr_size)
static hg_return_t
proxy_rpc_srv_get_metadentry_size(hg_handle_t handle) {
rpc_path_only_in_t client_in{};
rpc_get_metadentry_size_out_t client_out{};
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug("{}() path: '{}'", __func__, client_in.path);
try {
auto [err, ret_size] =
gkfs::rpc::forward_get_metadentry_size(client_in.path);
client_out.err = 0;
client_out.ret_size = ret_size;
} catch(const std::exception& e) {
PROXY_DATA->log()->error("{}() Failed to get metadentry size RPC: '{}'",
__func__, e.what());
client_out.err = EBUSY;
}
PROXY_DATA->log()->debug("{}() Sending output err '{}' ret_size '{}'",
__func__, client_out.err, client_out.ret_size);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_get_metadentry_size)
static hg_return_t
proxy_rpc_srv_update_metadentry_size(hg_handle_t handle) {
rpc_update_metadentry_size_in_t client_in{};
rpc_update_metadentry_size_out_t client_out{};
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug(
"{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__,
client_in.path, client_in.size, client_in.offset, client_in.append);
try {
auto [err, ret_offset] = gkfs::rpc::forward_update_metadentry_size(
client_in.path, client_in.size, client_in.offset,
client_in.append);
client_out.err = 0;
client_out.ret_offset = ret_offset;
} catch(const std::exception& e) {
PROXY_DATA->log()->error(
"{}() Failed to update metadentry size RPC: '{}'", __func__,
e.what());
client_out.err = EBUSY;
}
PROXY_DATA->log()->debug("{}() Sending output err '{}' ret_offset '{}'",
__func__, client_out.err, client_out.ret_offset);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_update_metadentry_size)
static hg_return_t
proxy_rpc_srv_get_dirents_extended(hg_handle_t handle) {
rpc_proxy_get_dirents_in_t client_in{};
rpc_get_dirents_out_t client_out{};
hg_bulk_t bulk_handle = nullptr;
auto ret = margo_get_input(handle, &client_in);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to retrieve input from handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
PROXY_DATA->log()->debug("{}() path: '{}', server: '{}'", __func__,
client_in.path, client_in.server);
auto hgi = margo_get_info(handle);
auto mid = margo_hg_handle_get_instance(handle);
auto bulk_size = margo_bulk_get_size(client_in.bulk_handle);
PROXY_DATA->log()->debug("{}() Got RPC with path '{}' bulk_size '{}'",
__func__, client_in.path, bulk_size);
/*
* Set up buffer for push from daemon
*/
void* bulk_buf; // buffer for bulk transfer
// create bulk handle and allocated memory for buffer with buf_sizes
// information
ret = margo_bulk_create(mid, 1, nullptr, &bulk_size, HG_BULK_READWRITE,
&bulk_handle);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error("{}() Failed to create bulk handle", __func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out);
}
// access the internally allocated memory buffer
uint32_t actual_count; // number of segments. we use one here because we
// pull the whole buffer at once
ret = margo_bulk_access(bulk_handle, 0, bulk_size, HG_BULK_READWRITE, 1,
&bulk_buf, &bulk_size, &actual_count);
if(ret != HG_SUCCESS || actual_count != 1) {
PROXY_DATA->log()->error(
"{}() Failed to access allocated buffer from bulk handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
&bulk_handle);
}
// Forward request to daemon, using bulk_buf, containing the allocated
// buffer (which is pushed the data by the daemon)
auto daemon_out = gkfs::rpc::forward_get_dirents_single(
client_in.path, client_in.server, bulk_buf, bulk_size);
if(daemon_out.first != 0) {
PROXY_DATA->log()->error(
"{}() Failure when forwarding to daemon with err '{}'",
__func__, daemon_out.first);
client_out.err = daemon_out.first;
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
&bulk_handle);
}
// Push data to client here if no error was reported by the daemon
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr,
client_in.bulk_handle, 0, bulk_handle, 0,
bulk_size);
if(ret != HG_SUCCESS) {
PROXY_DATA->log()->error(
"{}() Failed to push data from client for path '{}' with size '{}'",
__func__, client_in.path, bulk_size);
client_out.err = EBUSY;
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
&bulk_handle);
}
client_out.err = daemon_out.first;
client_out.dirents_size = daemon_out.second;
PROXY_DATA->log()->debug("{}() Sending output err '{}' dirents_size '{}'",
__func__, client_out.err, client_out.dirents_size);
return gkfs::rpc::cleanup_respond(&handle, &client_in, &client_out,
&bulk_handle);
}
DEFINE_MARGO_RPC_HANDLER(proxy_rpc_srv_get_dirents_extended)