Program Listing for File srv_metadata.cpp
↰ Return to documentation for file (src/daemon/handler/srv_metadata.cpp
)
/*
Copyright 2018-2024, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2024, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS.
GekkoFS is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
GekkoFS is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with GekkoFS. If not, see <https://www.gnu.org/licenses/>.
SPDX-License-Identifier: GPL-3.0-or-later
*/
#include <daemon/handler/rpc_defs.hpp>
#include <daemon/handler/rpc_util.hpp>
#include <daemon/backend/metadata/db.hpp>
#include <daemon/backend/data/chunk_storage.hpp>
#include <daemon/ops/metadentry.hpp>
#include <common/rpc/rpc_types.hpp>
#include <common/statistics/stats.hpp>
using namespace std;
namespace {
hg_return_t
rpc_srv_create(hg_handle_t handle) {
rpc_mk_node_in_t in;
rpc_err_out_t out;
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug("{}() Got RPC with path '{}'", __func__,
in.path);
gkfs::metadata::Metadata md(in.mode);
try {
// create metadentry
gkfs::metadata::create(in.path, md);
out.err = 0;
} catch(const gkfs::metadata::ExistsException& e) {
out.err = EEXIST;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to create metadentry: '{}'",
__func__, e.what());
out.err = -1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
if(GKFS_DATA->enable_stats()) {
GKFS_DATA->stats()->add_value_iops(
gkfs::utils::Stats::IopsOp::iops_create);
}
return HG_SUCCESS;
}
hg_return_t
rpc_srv_stat(hg_handle_t handle) {
rpc_path_only_in_t in{};
rpc_stat_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug("{}() path: '{}'", __func__, in.path);
std::string val;
try {
// get the metadata
val = gkfs::metadata::get_str(in.path);
out.db_val = val.c_str();
out.err = 0;
GKFS_DATA->spdlogger()->debug("{}() Sending output mode '{}'", __func__,
out.db_val);
} catch(const gkfs::metadata::NotFoundException& e) {
GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
in.path);
out.err = ENOENT;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to get metadentry from DB: '{}'", __func__,
e.what());
out.err = EBUSY;
}
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
if(GKFS_DATA->enable_stats()) {
GKFS_DATA->stats()->add_value_iops(
gkfs::utils::Stats::IopsOp::iops_stats);
}
return HG_SUCCESS;
}
hg_return_t
rpc_srv_decr_size(hg_handle_t handle) {
rpc_trunc_in_t in{};
rpc_err_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
throw runtime_error("Failed to retrieve input from handle");
}
GKFS_DATA->spdlogger()->debug("{}() path: '{}', length: '{}'", __func__,
in.path, in.length);
try {
GKFS_DATA->mdb()->decrease_size(in.path, in.length);
out.err = 0;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Failed to decrease size: '{}'",
__func__, e.what());
out.err = EIO;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
throw runtime_error("Failed to respond");
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
hg_return_t
rpc_srv_remove_metadata(hg_handle_t handle) {
rpc_rm_node_in_t in{};
rpc_rm_metadata_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug("{}() Got remove metadata RPC with path '{}'",
__func__, in.path);
// Remove metadentry if exists on the node
try {
auto md = gkfs::metadata::get(in.path);
gkfs::metadata::remove(in.path);
out.err = 0;
out.mode = md.mode();
out.size = md.size();
if constexpr(gkfs::config::metadata::implicit_data_removal) {
if(S_ISREG(md.mode()) && (md.size() != 0))
GKFS_DATA->storage()->destroy_chunk_space(in.path);
}
} catch(const gkfs::metadata::DBException& e) {
GKFS_DATA->spdlogger()->error("{}(): path '{}' message '{}'", __func__,
in.path, e.what());
out.err = EIO;
} catch(const gkfs::data::ChunkStorageException& e) {
GKFS_DATA->spdlogger()->error(
"{}(): path '{}' errcode '{}' message '{}'", __func__, in.path,
e.code().value(), e.what());
out.err = e.code().value();
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__,
in.path, e.what());
out.err = EBUSY;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
if(GKFS_DATA->enable_stats()) {
GKFS_DATA->stats()->add_value_iops(
gkfs::utils::Stats::IopsOp::iops_remove);
}
return HG_SUCCESS;
}
hg_return_t
rpc_srv_remove_data(hg_handle_t handle) {
rpc_rm_node_in_t in{};
rpc_err_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug("{}() Got remove data RPC with path '{}'",
__func__, in.path);
// Remove all chunks for that file
try {
GKFS_DATA->storage()->destroy_chunk_space(in.path);
out.err = 0;
} catch(const gkfs::data::ChunkStorageException& e) {
GKFS_DATA->spdlogger()->error(
"{}(): path '{}' errcode '{}' message '{}'", __func__, in.path,
e.code().value(), e.what());
out.err = e.code().value();
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error("{}() path '{}' message '{}'", __func__,
in.path, e.what());
out.err = EBUSY;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
hg_return_t
rpc_srv_update_metadentry(hg_handle_t handle) {
// Note: Currently this handler is not called by the client.
rpc_update_metadentry_in_t in{};
rpc_err_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug(
"{}() Got update metadentry RPC with path '{}'", __func__, in.path);
// do update
try {
gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
if(in.block_flag == HG_TRUE)
md.blocks(in.blocks);
if(in.nlink_flag == HG_TRUE)
md.link_count(in.nlink);
if(in.size_flag == HG_TRUE)
md.size(in.size);
if(in.atime_flag == HG_TRUE)
md.atime(in.atime);
if(in.mtime_flag == HG_TRUE)
md.mtime(in.mtime);
if(in.ctime_flag == HG_TRUE)
md.ctime(in.ctime);
gkfs::metadata::update(in.path, md);
out.err = 0;
} catch(const std::exception& e) {
// TODO handle NotFoundException
GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
out.err = 1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
hg_return_t
rpc_srv_update_metadentry_size(hg_handle_t handle) {
rpc_update_metadentry_size_in_t in{};
rpc_update_metadentry_size_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug(
"{}() path: '{}', size: '{}', offset: '{}', append: '{}'", __func__,
in.path, in.size, in.offset, in.append);
try {
out.ret_offset = gkfs::metadata::update_size(
in.path, in.size, in.offset, (in.append == HG_TRUE));
out.err = 0;
} catch(const gkfs::metadata::NotFoundException& e) {
GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
in.path);
out.err = ENOENT;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to update metadentry size on DB: '{}'", __func__,
e.what());
out.err = EBUSY;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
hg_return_t
rpc_srv_get_metadentry_size(hg_handle_t handle) {
rpc_path_only_in_t in{};
rpc_get_metadentry_size_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS)
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
assert(ret == HG_SUCCESS);
GKFS_DATA->spdlogger()->debug(
"{}() Got update metadentry size RPC with path '{}'", __func__,
in.path);
// do update
try {
out.ret_size = gkfs::metadata::get_size(in.path);
out.err = 0;
} catch(const gkfs::metadata::NotFoundException& e) {
GKFS_DATA->spdlogger()->debug("{}() Entry not found: '{}'", __func__,
in.path);
out.err = ENOENT;
} catch(const std::exception& e) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to get metadentry size from DB: '{}'", __func__,
e.what());
out.err = EBUSY;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
hg_return_t
rpc_srv_get_dirents(hg_handle_t handle) {
rpc_get_dirents_in_t in{};
rpc_get_dirents_out_t out{};
out.err = EIO;
out.dirents_size = 0;
hg_bulk_t bulk_handle = nullptr;
// Get input parmeters
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Could not get RPC input data with err '{}'", __func__,
ret);
out.err = EBUSY;
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
// Retrieve size of source buffer
auto hgi = margo_get_info(handle);
auto mid = margo_hg_info_get_instance(hgi);
auto bulk_size = margo_bulk_get_size(in.bulk_handle);
GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ",
__func__, in.path, bulk_size);
// Get directory entries from local DB
vector<pair<string, bool>> entries{};
try {
entries = gkfs::metadata::get_dirents(in.path);
} catch(const ::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
__func__, e.what());
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
GKFS_DATA->spdlogger()->trace(
"{}() path '{}' Read database with '{}' entries", __func__, in.path,
entries.size());
if(entries.empty()) {
out.err = 0;
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
// Calculate total output size
// TODO OPTIMIZATION: this can be calculated inside db_get_dirents
size_t tot_names_size = 0;
for(auto const& e : entries) {
tot_names_size += e.first.size();
}
// tot_names_size (# characters in entry) + # entries * (bool size + char
// size for \0 character)
size_t out_size =
tot_names_size + entries.size() * (sizeof(bool) + sizeof(char));
if(bulk_size < out_size) {
// Source buffer is smaller than total output size
GKFS_DATA->spdlogger()->error(
"{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
__func__, bulk_size, out_size);
out.err = ENOBUFS;
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
void* bulk_buf; // buffer for bulk transfer
// create bulk handle and allocated memory for buffer with out_size
// information
ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY,
&bulk_handle);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
// access the internally allocated memory buffer and put it into bulk_buf
uint32_t actual_count; // number of segments. we use one here because we
// push the whole buffer at once
ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1,
&bulk_buf, &out_size, &actual_count);
if(ret != HG_SUCCESS || actual_count != 1) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to access allocated buffer from bulk handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
GKFS_DATA->spdlogger()->trace(
"{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'",
__func__, in.path, entries.size(), out_size, out_size);
// Serialize output data on local buffer
auto out_buff_ptr = static_cast<char*>(bulk_buf);
auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
auto names_ptr = out_buff_ptr + entries.size();
for(auto const& e : entries) {
if(e.first.empty()) {
GKFS_DATA->spdlogger()->warn(
"{}() Entry in readdir() empty. If this shows up, something else is very wrong.",
__func__);
}
*bool_ptr = e.second;
bool_ptr++;
const auto name = e.first.c_str();
::strcpy(names_ptr, name);
// number of characters + \0 terminator
names_ptr += e.first.size() + 1;
}
GKFS_DATA->spdlogger()->trace(
"{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer",
__func__, in.path, entries.size(), out_size);
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, out_size);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'",
__func__, entries.size(), in.path, bulk_size, out_size);
out.err = EBUSY;
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
out.dirents_size = entries.size();
out.err = 0;
GKFS_DATA->spdlogger()->debug(
"{}() Sending output response err '{}' dirents_size '{}'. DONE",
__func__, out.err, out.dirents_size);
if(GKFS_DATA->enable_stats()) {
GKFS_DATA->stats()->add_value_iops(
gkfs::utils::Stats::IopsOp::iops_dirent);
}
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
/* Sends the name-size-ctime of a specific directory
* Used to accelerate find
* It mimics get_dirents, but uses a tuple
*/
hg_return_t
rpc_srv_get_dirents_extended(hg_handle_t handle) {
rpc_get_dirents_in_t in{};
rpc_get_dirents_out_t out{};
out.err = EIO;
out.dirents_size = 0;
hg_bulk_t bulk_handle = nullptr;
// Get input parmeters
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Could not get RPC input data with err '{}'", __func__,
ret);
out.err = EBUSY;
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
// Retrieve size of source buffer
auto hgi = margo_get_info(handle);
auto mid = margo_hg_info_get_instance(hgi);
auto bulk_size = margo_bulk_get_size(in.bulk_handle);
GKFS_DATA->spdlogger()->debug("{}() Got RPC: path '{}' bulk_size '{}' ",
__func__, in.path, bulk_size);
// Get directory entries from local DB
vector<tuple<string, bool, size_t, time_t>> entries{};
try {
entries = gkfs::metadata::get_dirents_extended(in.path);
} catch(const ::exception& e) {
GKFS_DATA->spdlogger()->error("{}() Error during get_dirents(): '{}'",
__func__, e.what());
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
GKFS_DATA->spdlogger()->trace(
"{}() path '{}' Read database with '{}' entries", __func__, in.path,
entries.size());
if(entries.empty()) {
out.err = 0;
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
// Calculate total output size
// TODO OPTIMIZATION: this can be calculated inside db_get_dirents
size_t tot_names_size = 0;
for(auto const& e : entries) {
tot_names_size += (get<0>(e)).size();
}
// tot_names_size (# characters in entry) + # entries * (bool size + char
// size for \0 character)
size_t out_size =
tot_names_size + entries.size() * (sizeof(bool) + sizeof(char) +
sizeof(size_t) + sizeof(time_t));
if(bulk_size < out_size) {
// Source buffer is smaller than total output size
GKFS_DATA->spdlogger()->error(
"{}() Entries do not fit source buffer. bulk_size '{}' < out_size '{}' must be satisfied!",
__func__, bulk_size, out_size);
out.err = ENOBUFS;
return gkfs::rpc::cleanup_respond(&handle, &in, &out);
}
void* bulk_buf; // buffer for bulk transfer
// create bulk handle and allocated memory for buffer with out_size
// information
ret = margo_bulk_create(mid, 1, nullptr, &out_size, HG_BULK_READ_ONLY,
&bulk_handle);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to create bulk handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
// access the internally allocated memory buffer and put it into bulk_buf
uint32_t actual_count; // number of segments. we use one here because we
// push the whole buffer at once
ret = margo_bulk_access(bulk_handle, 0, out_size, HG_BULK_READ_ONLY, 1,
&bulk_buf, &out_size, &actual_count);
if(ret != HG_SUCCESS || actual_count != 1) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to access allocated buffer from bulk handle",
__func__);
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
GKFS_DATA->spdlogger()->trace(
"{}() path '{}' entries '{}' out_size '{}'. Set up local read only bulk handle and allocated buffer with size '{}'",
__func__, in.path, entries.size(), out_size, out_size);
// Serialize output data on local buffer
// The parenthesis are extremely important, if not the + will be size_t or
// time_t size and not char
auto out_buff_ptr = static_cast<char*>(bulk_buf);
auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
auto size_ptr = reinterpret_cast<size_t*>((out_buff_ptr) +
(entries.size() * sizeof(bool)));
auto ctime_ptr = reinterpret_cast<time_t*>(
(out_buff_ptr) +
(entries.size() * (sizeof(bool) + sizeof(size_t))));
auto names_ptr =
out_buff_ptr +
(entries.size() * (sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
for(auto const& e : entries) {
if((get<0>(e)).empty()) {
GKFS_DATA->spdlogger()->warn(
"{}() Entry in readdir() empty. If this shows up, something else is very wrong.",
__func__);
}
*bool_ptr = (get<1>(e));
bool_ptr++;
*size_ptr = (get<2>(e));
size_ptr++;
*ctime_ptr = (get<3>(e));
ctime_ptr++;
const auto name = (get<0>(e)).c_str();
::strcpy(names_ptr, name);
// number of characters + \0 terminator
names_ptr += ((get<0>(e)).size() + 1);
}
GKFS_DATA->spdlogger()->trace(
"{}() path '{}' entries '{}' out_size '{}'. Copied data to bulk_buffer. NEXT bulk_transfer",
__func__, in.path, entries.size(), out_size);
ret = margo_bulk_transfer(mid, HG_BULK_PUSH, hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, out_size);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to push '{}' dirents on path '{}' to client with bulk size '{}' and out_size '{}'",
__func__, entries.size(), in.path, bulk_size, out_size);
out.err = EBUSY;
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
out.dirents_size = entries.size();
out.err = 0;
GKFS_DATA->spdlogger()->debug(
"{}() Sending output response err '{}' dirents_size '{}'. DONE",
__func__, out.err, out.dirents_size);
return gkfs::rpc::cleanup_respond(&handle, &in, &out, &bulk_handle);
}
#if defined(HAS_SYMLINKS) || defined(HAS_RENAME)
hg_return_t
rpc_srv_mk_symlink(hg_handle_t handle) {
rpc_mk_symlink_in_t in{};
rpc_err_out_t out{};
auto ret = margo_get_input(handle, &in);
if(ret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error(
"{}() Failed to retrieve input from handle", __func__);
}
GKFS_DATA->spdlogger()->debug(
"{}() Got RPC with path '{}' and target path '{}'", __func__,
in.path, in.target_path);
// do update
try {
gkfs::metadata::Metadata md = gkfs::metadata::get(in.path);
#ifdef HAS_RENAME
if(md.blocks() == -1) {
// We need to fill the rename path as this is an inverse path
// old -> new
md.rename_path(in.target_path);
} else {
#endif // HAS_RENAME
md.target_path(in.target_path);
#ifdef HAS_RENAME
}
#endif // HAS_RENAME
GKFS_DATA->spdlogger()->debug(
"{}() Updating path '{}' with metadata '{}'", __func__, in.path,
md.serialize());
gkfs::metadata::update(in.path, md);
out.err = 0;
} catch(const std::exception& e) {
// TODO handle NotFoundException
GKFS_DATA->spdlogger()->error("{}() Failed to update entry", __func__);
out.err = 1;
}
GKFS_DATA->spdlogger()->debug("{}() Sending output err '{}'", __func__,
out.err);
auto hret = margo_respond(handle, &out);
if(hret != HG_SUCCESS) {
GKFS_DATA->spdlogger()->error("{}() Failed to respond", __func__);
}
// Destroy handle when finished
margo_free_input(handle, &in);
margo_destroy(handle);
return HG_SUCCESS;
}
#endif // HAS_SYMLINKS || HAS_RENAME
} // namespace
DEFINE_MARGO_RPC_HANDLER(rpc_srv_create)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_stat)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_decr_size)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove_metadata)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_remove_data)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_update_metadentry_size)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_metadentry_size)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents)
DEFINE_MARGO_RPC_HANDLER(rpc_srv_get_dirents_extended)
#ifdef HAS_SYMLINKS
DEFINE_MARGO_RPC_HANDLER(rpc_srv_mk_symlink)
#endif