Newer
Older
Copyright 2018-2022, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2022, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
This file is part of GekkoFS' POSIX interface.
GekkoFS' POSIX interface is free software: you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GekkoFS' POSIX interface is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with GekkoFS' POSIX interface. If not, see
<https://www.gnu.org/licenses/>.
SPDX-License-Identifier: LGPL-3.0-or-later
*/
#include <client/rpc/forward_metadata.hpp>
#include <client/preload.hpp>
#include <client/logging.hpp>
#include <client/preload_util.hpp>
#include <client/open_dir.hpp>
#include <common/rpc/rpc_util.hpp>
#include <common/rpc/distributor.hpp>
#include <common/rpc/rpc_types.hpp>
/*
* This file includes all metadata RPC calls.
* NOTE: No errno is defined here!
*/
/**
* Send an RPC for a create request
* @param path
* @param mode
* @return error code
*/
int
forward_create(const std::string& path, const mode_t mode) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service->post<gkfs::rpc::create>(endp, path, mode)
.get()
.at(0);
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
/**
* Send an RPC for a stat request
* @param path
* @param attr
* @return error code
*/
int
forward_stat(const std::string& path, string& attr) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service->post<gkfs::rpc::stat>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
* Send an RPC for a remove request. This removes metadata and all data chunks
* possible distributed across many daemons. Optimizations are in place for
* small files (file_size / chunk_size) < number_of_daemons where no broadcast
* to all daemons is used to remove all chunks. Otherwise, a broadcast to all
* daemons is used.
*
* This function only attempts data removal if data exists (determined when
* metadata is removed)
forward_remove(const std::string& path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
int64_t size = 0;
uint32_t mode = 0;
/*
* Send one RPC to metadata destination and remove metadata while retrieving
* size and mode to determine if data needs to removed too
*/
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::remove_metadata>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
if(out.err())
return out.err();
size = out.size();
mode = out.mode();
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
return EBUSY;
// if file is not a regular file and it's size is 0, data does not need to
// be removed, thus, we exit
if(!(S_ISREG(mode) && (size != 0)))
return 0;
std::vector<hermes::rpc_handle<gkfs::rpc::remove_data>> handles;
if(static_cast<std::size_t>(size / gkfs::config::rpc::chunksize) <
CTX->hosts().size()) {
const auto metadata_host_id =
CTX->distributor()->locate_file_metadata(path);
const auto endp_metadata = CTX->hosts().at(metadata_host_id);
LOG(DEBUG, "Sending RPC to host: {}", endp_metadata.to_string());
gkfs::rpc::remove_data::input in(path);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::remove_data>(
endp_metadata, in));
uint64_t chnk_end = size / gkfs::config::rpc::chunksize;
for(uint64_t chnk_id = chnk_start; chnk_id <= chnk_end; chnk_id++) {
const auto chnk_host_id =
CTX->distributor()->locate_data(path, chnk_id);
if constexpr(gkfs::config::metadata::implicit_data_removal) {
/*
* If the chnk host matches the metadata host the remove
* request as already been sent as part of the metadata
* remove request.
*/
if(chnk_host_id == metadata_host_id)
continue;
}
const auto endp_chnk = CTX->hosts().at(chnk_host_id);
LOG(DEBUG, "Sending RPC to host: {}", endp_chnk.to_string());
ld_network_service->post<gkfs::rpc::remove_data>(
endp_chnk, in));
} catch(const std::exception& ex) {
LOG(ERROR,
"Failed to forward non-blocking rpc request reduced remove requests");
} else { // "Big" files
for(const auto& endp : CTX->hosts()) {
try {
LOG(DEBUG, "Sending RPC to host: {}", endp.to_string());
gkfs::rpc::remove_data::input in(path);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so
// that we can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a
// post(endpoint) returning one result and a
// broadcast(endpoint_set) returning a result_set. When that
// happens we can remove the .at(0) :/
handles.emplace_back(
ld_network_service->post<gkfs::rpc::remove_data>(endp,
in));
} catch(const std::exception& ex) {
// TODO(amiranda): we should cancel all previously posted
// requests here, unfortunately, Hermes does not support it yet
// :/
LOG(ERROR,
"Failed to forward non-blocking rpc request to host: {}",
for(const auto& h : handles) {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
auto out = h.get().at(0);
LOG(ERROR, "received error response: {}", out.err());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
}
* Send an RPC for a decrement file size request. This is for example used
* during a truncate() call.
* @param path
* @param length
* @return error code
*/
int
forward_decr_size(const std::string& path, size_t length) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::decr_size>(endp, path, length)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
/**
* Send an RPC for an update metadentry request.
* NOTE: Currently unused.
* @param path
* @param md
* @param md_flags
* @return error code
*/
int
forward_update_metadentry(
const string& path, const gkfs::metadata::Metadata& md,
const gkfs::metadata::MetadentryUpdateFlags& md_flags) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::update_metadentry>(
endp, path,
(md_flags.link_count ? md.link_count() : 0),
/* mode */ 0,
/* uid */ 0,
/* gid */ 0, (md_flags.size ? md.size() : 0),
(md_flags.blocks ? md.blocks() : 0),
(md_flags.atime ? md.atime() : 0),
(md_flags.mtime ? md.mtime() : 0),
(md_flags.ctime ? md.ctime() : 0),
bool_to_merc_bool(md_flags.link_count),
/* mode_flag */ false,
bool_to_merc_bool(md_flags.size),
bool_to_merc_bool(md_flags.blocks),
bool_to_merc_bool(md_flags.atime),
bool_to_merc_bool(md_flags.mtime),
bool_to_merc_bool(md_flags.ctime))
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
/**
* Send an RPC request for an update to the file size.
* This is called during a write() call or similar
* @param path
* @param size
* @param offset
* @param append_flag
* @return pair<error code, size after update>
*/
pair<int, off64_t>
forward_update_metadentry_size(const string& path, const size_t size,
const off64_t offset, const bool append_flag) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::update_metadentry_size>(
endp, path, size, offset,
bool_to_merc_bool(append_flag))
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
return make_pair(out.err(), 0);
else
return make_pair(0, out.ret_size());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
/**
* Send an RPC request to get the current file size.
* This is called during a lseek() call
* @param path
* @return pair<error code, file size>
*/
pair<int, off64_t>
forward_get_metadentry_size(const std::string& path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out = ld_network_service
->post<gkfs::rpc::get_metadentry_size>(endp, path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
return make_pair(out.err(), 0);
else
return make_pair(0, out.ret_size());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");
* Send an RPC request to receive all entries of a directory.
* @param open_dir
* @return error code
pair<int, shared_ptr<gkfs::filemap::OpenDir>>
forward_get_dirents(const string& path) {
LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
auto const targets = CTX->distributor()->locate_directory_metadata(path);
/* preallocate receiving buffer. The actual size is not known yet.
*
* On C++14 make_unique function also zeroes the newly allocated buffer.
* It turns out that this operation is increadibly slow for such a big
* buffer. Moreover we don't need a zeroed buffer here.
*/
auto large_buffer = std::unique_ptr<char[]>(
new char[gkfs::config::rpc::dirents_buff_size]);
// XXX there is a rounding error here depending on the number of targets...
const std::size_t per_host_buff_size =
gkfs::config::rpc::dirents_buff_size / targets.size();
// expose local buffers for RMA from servers
std::vector<hermes::exposed_memory> exposed_buffers;
exposed_buffers.reserve(targets.size());
for(std::size_t i = 0; i < targets.size(); ++i) {
try {
exposed_buffers.emplace_back(ld_network_service->expose(
std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
large_buffer.get() + (i * per_host_buff_size),
per_host_buff_size}},
} catch(const std::exception& ex) {
LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'",
__func__, ex.what());
return make_pair(EBUSY, nullptr);
auto err = 0;
// send RPCs
std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents>> handles;
for(std::size_t i = 0; i < targets.size(); ++i) {
// Setup rpc input parameters for each host
auto endp = CTX->hosts().at(targets[i]);
gkfs::rpc::get_dirents::input in(path, exposed_buffers[i]);
LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::get_dirents>(endp, in));
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
__func__, path, targets[i], ex.what());
err = EBUSY;
break; // we need to gather responses from already sent RPCS
"{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
__func__, path, targets.size(), per_host_buff_size);
auto send_error = err != 0;
auto open_dir = make_shared<gkfs::filemap::OpenDir>(path);
for(std::size_t i = 0; i < handles.size(); ++i) {
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
out = handles[i].get().at(0);
// skip processing dirent data if there was an error during send
// In this case all responses are gathered but their contents
// skipped
if(send_error)
continue;
if(out.err() != 0) {
LOG(ERROR,
"{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
__func__, targets[i], strerror(out.err()), path);
err = out.err();
// We need to gather all responses before exiting
continue;
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
__func__, path, targets[i], ex.what());
err = EBUSY;
// We need to gather all responses before exiting
continue;
// each server wrote information to its pre-defined region in
// large_buffer, recover it by computing the base_address for each
// particular server and adding the appropriate offsets
assert(exposed_buffers[i].count() == 1);
void* base_ptr = exposed_buffers[i].begin()->data();
bool* bool_ptr = reinterpret_cast<bool*>(base_ptr);
char* names_ptr = reinterpret_cast<char*>(base_ptr) +
(out.dirents_size() * sizeof(bool));
for(std::size_t j = 0; j < out.dirents_size(); j++) {
gkfs::filemap::FileType ftype =
(*bool_ptr) ? gkfs::filemap::FileType::directory
: gkfs::filemap::FileType::regular;
// Check that we are not outside the recv_buff for this specific
// host
assert((names_ptr - reinterpret_cast<char*>(base_ptr)) > 0);
assert(static_cast<unsigned long int>(
names_ptr - reinterpret_cast<char*>(base_ptr)) <
per_host_buff_size);
open_dir->add(name, ftype);
return make_pair(err, open_dir);
Ramon Nou
committed
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
/**
* Send an RPC request to receive all entries of a directory in a server.
* @param path
* @param server
* @return error code
* Returns a tuple with path-isdir-size and ctime
* We still use dirents_buff_size, but we need to match in the client side, as
* the buffer is provided by the "gfind" applications However, as we only ask
* for a server the size should be enought for most of the scenarios. We are
* reusing the forward_get_dirents code. As we only need a server, we could
* simplify the code removing the asynchronous part.
*/
pair<int, vector<tuple<const std::string, bool, size_t, time_t>>>
forward_get_dirents_single(const string& path, int server) {
LOG(DEBUG, "{}() enter for path '{}'", __func__, path)
auto const targets = CTX->distributor()->locate_directory_metadata(path);
/* preallocate receiving buffer. The actual size is not known yet.
*
* On C++14 make_unique function also zeroes the newly allocated buffer.
* It turns out that this operation is increadibly slow for such a big
* buffer. Moreover we don't need a zeroed buffer here.
*/
auto large_buffer = std::unique_ptr<char[]>(
new char[gkfs::config::rpc::dirents_buff_size]);
// We use the full size per server...
const std::size_t per_host_buff_size = gkfs::config::rpc::dirents_buff_size;
vector<tuple<const std::string, bool, size_t, time_t>> output;
// expose local buffers for RMA from servers
std::vector<hermes::exposed_memory> exposed_buffers;
exposed_buffers.reserve(1);
std::size_t i = server;
try {
exposed_buffers.emplace_back(ld_network_service->expose(
std::vector<hermes::mutable_buffer>{hermes::mutable_buffer{
large_buffer.get(), per_host_buff_size}},
hermes::access_mode::write_only));
} catch(const std::exception& ex) {
LOG(ERROR, "{}() Failed to expose buffers for RMA. err '{}'", __func__,
ex.what());
return make_pair(EBUSY, output);
}
auto err = 0;
// send RPCs
std::vector<hermes::rpc_handle<gkfs::rpc::get_dirents_extended>> handles;
auto endp = CTX->hosts().at(targets[i]);
gkfs::rpc::get_dirents_extended::input in(path, exposed_buffers[0]);
try {
LOG(DEBUG, "{}() Sending RPC to host: '{}'", __func__, targets[i]);
handles.emplace_back(
ld_network_service->post<gkfs::rpc::get_dirents_extended>(endp,
in));
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Unable to send non-blocking get_dirents() on {} [peer: {}] err '{}'",
__func__, path, targets[i], ex.what());
err = EBUSY;
}
LOG(DEBUG,
"{}() path '{}' send rpc_srv_get_dirents() rpc to '{}' targets. per_host_buff_size '{}' Waiting on reply next and deserialize",
__func__, path, targets.size(), per_host_buff_size);
// wait for RPC responses
gkfs::rpc::get_dirents_extended::output out;
try {
// XXX We might need a timeout here to not wait forever for an
// output that never comes?
out = handles[0].get().at(0);
// skip processing dirent data if there was an error during send
// In this case all responses are gathered but their contents skipped
if(out.err() != 0) {
LOG(ERROR,
"{}() Failed to retrieve dir entries from host '{}'. Error '{}', path '{}'",
__func__, targets[0], strerror(out.err()), path);
err = out.err();
// We need to gather all responses before exiting
}
} catch(const std::exception& ex) {
LOG(ERROR,
"{}() Failed to get rpc output.. [path: {}, target host: {}] err '{}'",
__func__, path, targets[0], ex.what());
err = EBUSY;
// We need to gather all responses before exiting
}
// The parenthesis is extremely important if not the cast will add as a
// size_t or a time_t and not as a char
auto out_buff_ptr = static_cast<char*>(exposed_buffers[0].begin()->data());
auto bool_ptr = reinterpret_cast<bool*>(out_buff_ptr);
auto size_ptr = reinterpret_cast<size_t*>(
(out_buff_ptr) + (out.dirents_size() * sizeof(bool)));
auto ctime_ptr = reinterpret_cast<time_t*>(
(out_buff_ptr) +
(out.dirents_size() * (sizeof(bool) + sizeof(size_t))));
auto names_ptr =
out_buff_ptr + (out.dirents_size() *
(sizeof(bool) + sizeof(size_t) + sizeof(time_t)));
for(std::size_t j = 0; j < out.dirents_size(); j++) {
bool ftype = (*bool_ptr);
bool_ptr++;
size_t size = *size_ptr;
size_ptr++;
time_t ctime = *ctime_ptr;
ctime_ptr++;
auto name = std::string(names_ptr);
// number of characters in entry + \0 terminator
names_ptr += name.size() + 1;
output.emplace_back(std::forward_as_tuple(name, ftype, size, ctime));
}
return make_pair(err, output);
}
/**
* Send an RPC request to create a symlink.
* @param path
* @param target_path
* @return error code
*/
int
forward_mk_symlink(const std::string& path, const std::string& target_path) {
auto endp = CTX->hosts().at(CTX->distributor()->locate_file_metadata(path));
try {
LOG(DEBUG, "Sending RPC ...");
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we
// can retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service
->post<gkfs::rpc::mk_symlink>(endp, path, target_path)
.get()
.at(0);
LOG(DEBUG, "Got response success: {}", out.err());
} catch(const std::exception& ex) {
LOG(ERROR, "while getting rpc output");