/*
Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS' POSIX interface.
GekkoFS' POSIX interface is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GekkoFS' POSIX interface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GekkoFS' POSIX interface. If not, see
.
SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
namespace gkfs::rpc {
/*
* This file includes all metadata RPC calls.
* NOTE: No errno is defined here!
*/
/**
* Send an RPC for a create request
* @param path
* @param mode
* @return error code
*/
int
forward_create(const std::string& path, const mode_t mode) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service->post(endp, path, mode)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
return out.err() ? out.err() : 0;
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
/**
* Send an RPC for a stat request
* @param path
* @param attr
* @return error code
*/
int
forward_stat(const std::string& path, string& attr) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service->post(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return out.err();
attr = out.db_val();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
return 0;
}
/**
* Send an RPC for a remove request. This removes metadata and all data chunks
* possible distributed across many daemons. Optimizations are in place for
* small files (file_size / chunk_size) < number_of_daemons where no broadcast
* to all daemons is used to remove all chunks. Otherwise, a broadcast to all
* daemons is used.
*
* This function only attempts data removal if data exists (determined when
* metadata is removed)
* @param path
* @return error code
*/
int
forward_remove(const std::string& path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
int64_t size = 0;
uint32_t mode = 0;
/*
* Send one RPC to metadata destination and remove metadata while retrieving
* size and mode to determine if data needs to removed too
*/
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return out.err();
size = out.size();
mode = out.mode();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
// if file is not a regular file and it's size is 0, data does not need to
// be removed, thus, we exit
if(!(S_ISREG(mode) && (size != 0)))
return 0;
std::vector> handles;
// Small files
if(static_cast(size / gkfs::config::rpc::chunksize) <
CTX->hosts().size()) {
const auto metadata_host_id =
CTX->distributor()->locate_file_metadata(path);
const auto endp_metadata = CTX->hosts().at(metadata_host_id);
try {
LOG(DEBUG, "Sending RPC to host: {}", endp_metadata.to_string());
gkfs::rpc::remove_data::input in(path);
handles.emplace_back(
ld_network_service->post(
endp_metadata, in));
uint64_t chnk_start = 0;
uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
const auto chnk_host_id =
CTX->distributor()->locate_data(path, chnk_id);
if constexpr(gkfs::config::metadata::implicit_data_removal) {
/*
* If the chnk host matches the metadata host the remove
* request as already been sent as part of the metadata
* remove request.
*/
if(chnk_host_id == metadata_host_id)
continue;
}
const auto endp_chnk = CTX->hosts().at(chnk_host_id);
LOG(DEBUG, "Sending RPC to host: {}", endp_chnk.to_string());
handles.emplace_back(
ld_network_service->post(
endp_chnk, in));
}
} catch(const std::exception& ex) {
LOG(ERROR,
"Failed to forward non-blocking rpc request reduced remove requests");
return EBUSY;
}
} else { // "Big" files
for(const auto& endp : CTX->hosts()) {
try {
LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
gkfs::rpc::remove_data::input in(path);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post(endp,
in));
} catch(const std::exception& ex) {
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG(ERROR,
"Failed to forward non-blocking rpc request to host: {}",
endp.to_string());
return EBUSY;
}
}
}
// wait for RPC responses
auto err = 0;
for(const auto& h : handles) {
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
auto out = h.get().at(0);
if(out.err() != 0) {
LOG(ERROR, "received error response: {}", out.err());
err = out.err();
}
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
err = EBUSY;
}
}
return err;
}
/**
* Send an RPC for a decrement file size request. This is for example used
* during a truncate() call.
* @param path
* @param length
* @return error code
*/
int
forward_decr_size(const std::string& path, size_t length) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post(endp, path, length)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
return out.err() ? out.err() : 0;
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
/**
* Send an RPC for an update metadentry request.
* NOTE: Currently unused.
* @param path
* @param md
* @param md_flags
* @return error code
*/
int
forward_update_metadentry(
const string& path, const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post(
endp, path,
(md_flags.link_count ? md.link_count() : 0),
/* mode */ 0,
/* uid */ 0,
/* gid */ 0, (md_flags.size ? md.size() : 0),
(md_flags.blocks ? md.blocks() : 0),
(md_flags.atime ? md.atime() : 0),
(md_flags.mtime ? md.mtime() : 0),
(md_flags.ctime ? md.ctime() : 0),
bool_to_merc_bool(md_flags.link_count),
/* mode_flag */ false,
bool_to_merc_bool(md_flags.size),
bool_to_merc_bool(md_flags.blocks),
bool_to_merc_bool(md_flags.atime),
bool_to_merc_bool(md_flags.mtime),
bool_to_merc_bool(md_flags.ctime))
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
return out.err() ? out.err() : 0;
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
/**
* Send an RPC request for an update to the file size.
* This is called during a write() call or similar
* @param path
* @param size
* @param offset
* @param append_flag
* @return pair
*/
pair
forward_update_metadentry_size(const string& path, const size_t size,
const off64_t offset, const bool append_flag) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post(
endp, path, size, offset,
bool_to_merc_bool(append_flag))
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return make_pair(out.err(), 0);
else
return make_pair(0, out.ret_size());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return make_pair(EBUSY, 0);
}
}
/**
* Send an RPC request to get the current file size.
* This is called during a lseek() call
* @param path
* @return pair
*/
pair
forward_get_metadentry_size(const std::string& path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return make_pair(out.err(), 0);
else
return make_pair(0, out.ret_size());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return make_pair(EBUSY, 0);
}
}
/**
* Send an RPC request to receive all entries of a directory.
* @param open_dir
* @return error code
*/
pair>
forward_get_dirents(const string& path) {
LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
auto const targets = CTX->distributor()->locate_directory_metadata(path);
/* preallocate receiving buffer. The actual size is not known yet.
*
* On C++14 make_unique function also zeroes the newly allocated buffer.
* It turns out that this operation is increadibly slow for such a big
* buffer. Moreover we don't need a zeroed buffer here.
*/
auto large_buffer = std::unique_ptr(
new char[gkfs::config::rpc::dirents_buff_size]);
// XXX there is a rounding error here depending on the number of targets...
const std::size_t per_host_buff_size =
gkfs::config::rpc::dirents_buff_size / targets.size();
// expose local buffers for RMA from servers
std::vector exposed_buffers;
exposed_buffers.reserve(targets.size());
for(std::size_t i = 0; i < targets.size(); ++i) {
try {
exposed_buffers.emplace_back(ld_network_service->expose(
std::vector{hermes::mutable_buffer{
large_buffer.get() + (i * per_host_buff_size),
per_host_buff_size}},
hermes::access_mode::write_only));
} catch(const std::exception& ex) {
LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'",
__func__, ex.what());
return make_pair(EBUSY, nullptr);
}
}
auto err = 0;
// send RPCs
std::vector> handles;
for(std::size_t i = 0; i < targets.size(); ++i) {
// Setup rpc input parameters for each host
auto endp = CTX->hosts().at(targets[i]);
gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]);
try {
LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
handles.emplace_back(
ld_network_service->post(endp, in));
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
__func__, path, targets[i], ex.what());
err = EBUSY;
break; // we need to gather responses from already sent RPCS
}
}
LOG(DEBUG,
"{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
__func__, path, targets.size(), per_host_buff_size);
auto send_error = err != 0;
auto open_dir = make_shared(path);
// wait for RPC responses
for(std::size_t i = 0; i < handles.size(); ++i) {
gkfs::rpc::get_dirents::output out;
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
out = handles[i].get().at(0);
// skip processing dirent data if there was an error during send
// In this case all responses are gathered but their contents
// skipped
if(send_error)
continue;
if(out.err() != 0) {
LOG(ERROR,
"{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
__func__, targets[i], strerror(out.err()), path);
err = out.err();
// We need to gather all responses before exiting
continue;
}
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
__func__, path, targets[i], ex.what());
err = EBUSY;
// We need to gather all responses before exiting
continue;
}
// each server wrote information to its pre-defined region in
// large_buffer, recover it by computing the base_address for each
// particular server and adding the appropriate offsets
assert(exposed_buffers[i].count() == 1);
void* base_ptr = exposed_buffers[i].begin()->data();
bool* bool_ptr = reinterpret_cast(base_ptr);
char* names_ptr = reinterpret_cast(base_ptr) +
(out.dirents_size() * sizeof(bool));
for(std::size_t j = 0; j < out.dirents_size(); j++) {
gkfs::filemap::FileType ftype =
(*bool_ptr) ? gkfs::filemap::FileType::directory
: gkfs::filemap::FileType::regular;
bool_ptr++;
// Check that we are not outside the recv_buff for this specific
// host
assert((names_ptr - reinterpret_cast(base_ptr)) > 0);
assert(static_cast(
names_ptr - reinterpret_cast(base_ptr)) <
per_host_buff_size);
auto name = std::string(names_ptr);
// number of characters in entry + \0 terminator
names_ptr += name.size() + 1;
open_dir->add(name, ftype);
}
}
return make_pair(err, open_dir);
}
/**
* Send an RPC request to receive all entries of a directory in a server.
* @param path
* @param server
* @return error code
* Returns a tuple with path-isdir-size and ctime
* We still use dirents_buff_size, but we need to match in the client side, as
* the buffer is provided by the "gfind" applications However, as we only ask
* for a server the size should be enought for most of the scenarios. We are
* reusing the forward_get_dirents code. As we only need a server, we could
* simplify the code removing the asynchronous part.
*/
pair>>
forward_get_dirents_single(const string& path, int server) {
LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
auto const targets = CTX->distributor()->locate_directory_metadata(path);
/* preallocate receiving buffer. The actual size is not known yet.
*
* On C++14 make_unique function also zeroes the newly allocated buffer.
* It turns out that this operation is increadibly slow for such a big
* buffer. Moreover we don't need a zeroed buffer here.
*/
auto large_buffer = std::unique_ptr(
new char[gkfs::config::rpc::dirents_buff_size]);
// We use the full size per server...
const std::size_t per_host_buff_size = gkfs::config::rpc::dirents_buff_size;
vector> output;
// expose local buffers for RMA from servers
std::vector exposed_buffers;
exposed_buffers.reserve(1);
std::size_t i = server;
try {
exposed_buffers.emplace_back(ld_network_service->expose(
std::vector{hermes::mutable_buffer{
large_buffer.get(), per_host_buff_size}},
hermes::access_mode::write_only));
} catch(const std::exception& ex) {
LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__,
ex.what());
return make_pair(EBUSY, output);
}
auto err = 0;
// send RPCs
std::vector> handles;
auto endp = CTX->hosts().at(targets[i]);
gkfs::rpc::get_dirents_extended::input in(path, exposed_buffers[0]);
try {
LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
handles.emplace_back(
ld_network_service->post(endp,
in));
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
__func__, path, targets[i], ex.what());
err = EBUSY;
}
LOG(DEBUG,
"{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
__func__, path, targets.size(), per_host_buff_size);
// wait for RPC responses
gkfs::rpc::get_dirents_extended::output out;
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
out = handles[0].get().at(0);
// skip processing dirent data if there was an error during send
// In this case all responses are gathered but their contents skipped
if(out.err() != 0) {
LOG(ERROR,
"{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
__func__, targets[0], strerror(out.err()), path);
err = out.err();
// We need to gather all responses before exiting
}
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
__func__, path, targets[0], ex.what());
err = EBUSY;
// We need to gather all responses before exiting
}
// The parenthesis is extremely important if not the cast will add as a
// size_t or a time_t and not as a char
auto out_buff_ptr = static_cast(exposed_buffers[0].begin()->data());
auto bool_ptr = reinterpret_cast(out_buff_ptr);
auto size_ptr = reinterpret_cast(
(out_buff_ptr) + (out.dirents_size() * sizeof(bool)));
auto ctime_ptr = reinterpret_cast(
(out_buff_ptr) +
(out.dirents_size() * (sizeof(bool) + sizeof(size_t))));
auto names_ptr =
out_buff_ptr + (out.dirents_size() *
(sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
for(std::size_t j = 0; j < out.dirents_size(); j++) {
bool ftype = (*bool_ptr);
bool_ptr++;
size_t size = *size_ptr;
size_ptr++;
time_t ctime = *ctime_ptr;
ctime_ptr++;
auto name = std::string(names_ptr);
// number of characters in entry + \0 terminator
names_ptr += name.size() + 1;
output.emplace_back(std::forward_as_tuple(name, ftype, size, ctime));
}
return make_pair(err, output);
}
#ifdef HAS_SYMLINKS
/**
* Send an RPC request to create a symlink.
* @param path
* @param target_path
* @return error code
*/
int
forward_mk_symlink(const std::string& path, const std::string& target_path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service
->post(endp, path, target_path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
return out.err() ? out.err() : 0;
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
}
}
#endif
} // namespace gkfs::rpc