Newer
Older
/*
Copyright 2018-2019, Barcelona Supercomputing Center (BSC), Spain
Copyright 2015-2019, Johannes Gutenberg Universitaet Mainz, Germany
This software was partially supported by the
EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).
This software was partially supported by the
ADA-FS project under the SPPEXA project funded by the DFG.
SPDX-License-Identifier: MIT
*/
#include <global/configure.hpp>
#include <client/rpc/ld_rpc_metadentry.hpp>
#include "client/preload.hpp"
#include "client/preload_util.hpp"
#include "client/open_dir.hpp"
Marc Vef
committed
#include <global/rpc/rpc_utils.hpp>
#include <global/rpc/distributor.hpp>
#include <global/rpc/rpc_types.hpp>
#include <client/rpc/hg_rpcs.hpp>
static inline hg_return_t
margo_forward_timed_wrap(const hg_handle_t& handle, void* in_struct) {
return margo_forward_timed(handle, in_struct, RPC_TIMEOUT);
}
int mk_node(const std::string& path, const mode_t mode) {
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::create>(endp, path, mode).get().at(0);
err = out.err();
CTX->log()->debug("{}() Got response success: {}", __func__, err);
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
return -1;
int stat(const std::string& path, string& attr) {
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::stat>(endp, path).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
}
attr = out.db_val();
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
int decr_size(const std::string& path, size_t length) {
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::decr_size>(
endp, path, length).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
}
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
return -1;
}
int rm_node(const std::string& path, const bool remove_metadentry_only) {
// if only the metadentry should be removed, send one rpc to the
// metadentry's responsible node to remove the metadata
// else, send an rpc to all hosts and thus broadcast chunk_removal.
if(remove_metadentry_only) {
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::remove>(endp, path).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
}
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
std::vector<hermes::rpc_handle<gkfs::rpc::remove>> handles;
hermes::endpoint_set endps;
std::copy(CTX->hosts2().begin(),
CTX->hosts2().end(),
std::back_inserter(endps));
try {
auto output_set =
ld_network_service->broadcast<gkfs::rpc::remove>(endps, path).get();
// Wait for RPC responses and then get response
for (const auto& out : output_set) {
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
}
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
return -1;
int update_metadentry(const string& path, const Metadata& md, const MetadentryUpdateFlags& md_flags) {
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::update_metadentry>(
endp,
path,
(md_flags.link_count ? md.link_count() : 0),
/* mode */ 0,
/* uid */ 0,
/* gid */ 0,
(md_flags.size ? md.size() : 0),
(md_flags.blocks ? md.blocks() : 0),
(md_flags.atime ? md.atime() : 0),
(md_flags.mtime ? md.mtime() : 0),
(md_flags.ctime ? md.ctime() : 0),
bool_to_merc_bool(md_flags.link_count),
/* mode_flag */ false,
bool_to_merc_bool(md_flags.size),
bool_to_merc_bool(md_flags.blocks),
bool_to_merc_bool(md_flags.atime),
bool_to_merc_bool(md_flags.mtime),
bool_to_merc_bool(md_flags.ctime)).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
}
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
}
}
int update_metadentry_size(const string& path, const size_t size, const off64_t offset, const bool append_flag,
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::update_metadentry_size>(
endp, path, size, offset,
bool_to_merc_bool(append_flag)).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
}
ret_size = out.ret_size();
return out.err();
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
ret_size = 0;
return EUNKNOWN;
}
int get_metadentry_size(const std::string& path, off64_t& ret_size) {
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::get_metadentry_size>(
endp, path).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
ret_size = out.ret_size();
return out.err();
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
errno = EBUSY;
ret_size = 0;
return EUNKNOWN;
}
/**
* Sends an RPC request to a specific node to push all chunks that belong to him
*/
void get_dirents(OpenDir& open_dir){
CTX->log()->trace("{}() called", __func__);
auto const root_dir = open_dir.path();
auto const targets = CTX->distributor()->locate_directory_metadata(root_dir);
auto const host_size = targets.size();
std::vector<hg_handle_t> rpc_handles(host_size);
std::vector<margo_request> rpc_waiters(host_size);
std::vector<rpc_get_dirents_in_t> rpc_in(host_size);
std::vector<char*> recv_buffers(host_size);
/* preallocate receiving buffer. The actual size is not known yet.
*
* On C++14 make_unique function also zeroes the newly allocated buffer.
* It turns out that this operation is increadibly slow for such a big
* buffer. Moreover we don't need a zeroed buffer here.
*/
auto recv_buff = std::unique_ptr<char[]>(new char[RPC_DIRENTS_BUFF_SIZE]);
const unsigned long int per_host_buff_size = RPC_DIRENTS_BUFF_SIZE / host_size;
hg_return_t hg_ret;
for(const auto& target_host: targets){
CTX->log()->trace("{}() target_host: {}", __func__, target_host);
//Setup rpc input parameters for each host
rpc_in[target_host].path = root_dir.c_str();
recv_buffers[target_host] = recv_buff.get() + (target_host * per_host_buff_size);
ld_margo_rpc_id, 1,
reinterpret_cast<void**>(&recv_buffers[target_host]),
&per_host_buff_size,
HG_BULK_WRITE_ONLY, &(rpc_in[target_host].bulk_handle));
if(hg_ret != HG_SUCCESS){
throw std::runtime_error("Failed to create margo bulk handle");
}
hg_ret = margo_create_wrap_helper(rpc_get_dirents_id, target_host, rpc_handles[target_host]);
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
if (hg_ret != HG_SUCCESS) {
std::runtime_error("Failed to create margo handle");
}
// Send RPC
CTX->log()->trace("{}() Sending RPC to host: {}", __func__, target_host);
hg_ret = margo_iforward(rpc_handles[target_host],
&rpc_in[target_host],
&rpc_waiters[target_host]);
if (hg_ret != HG_SUCCESS) {
CTX->log()->error("{}() Unable to send non-blocking get_dirents on {} to recipient {}", __func__, root_dir, target_host);
for (uint64_t i = 0; i <= target_host; i++) {
margo_bulk_free(rpc_in[i].bulk_handle);
margo_destroy(rpc_handles[i]);
}
throw std::runtime_error("Failed to forward non-blocking rpc request");
}
}
for(unsigned int target_host = 0; target_host < host_size; target_host++){
hg_ret = margo_wait(rpc_waiters[target_host]);
if (hg_ret != HG_SUCCESS) {
throw std::runtime_error(fmt::format("Failed while waiting for rpc completion. [root dir: {}, target host: {}]", root_dir, target_host));
}
rpc_get_dirents_out_t out{};
hg_ret = margo_get_output(rpc_handles[target_host], &out);
if (hg_ret != HG_SUCCESS) {
throw std::runtime_error(fmt::format("Failed to get rpc output.. [path: {}, target host: {}]", root_dir, target_host));
}
if (out.err) {
CTX->log()->error("{}() Sending RPC to host: {}", __func__, target_host);
throw std::runtime_error(fmt::format("Failed to retrieve dir entries from host '{}'. "
"Error '{}', path '{}'", target_host, strerror(out.err), root_dir));
}
bool* bool_ptr = reinterpret_cast<bool*>(recv_buffers[target_host]);
char* names_ptr = recv_buffers[target_host] + (out.dirents_size * sizeof(bool));
for(unsigned int i = 0; i < out.dirents_size; i++){
FileType ftype = (*bool_ptr)? FileType::directory : FileType::regular;
bool_ptr++;
//Check that we are not outside the recv_buff for this specific host
assert((names_ptr - recv_buffers[target_host]) > 0);
assert(static_cast<unsigned long int>(names_ptr - recv_buffers[target_host]) < per_host_buff_size);
auto name = std::string(names_ptr);
names_ptr += name.size() + 1;
open_dir.add(name, ftype);
}
margo_free_output(rpc_handles[target_host], &out);
margo_bulk_free(rpc_in[target_host].bulk_handle);
margo_destroy(rpc_handles[target_host]);
}
}
#ifdef HAS_SYMLINKS
int mk_symlink(const std::string& path, const std::string& target_path) {
auto endp = CTX->hosts2().at(
CTX->distributor()->locate_file_metadata(path));
try {
CTX->log()->debug("{}() Sending RPC ...", __func__);
// TODO(amiranda): add a post() with RPC_TIMEOUT to hermes so that we can
// retry for RPC_TRIES (see old commits with margo)
// TODO(amiranda): hermes will eventually provide a post(endpoint)
// returning one result and a broadcast(endpoint_set) returning a
// result_set. When that happens we can remove the .at(0) :/
auto out =
ld_network_service->post<gkfs::rpc::mk_symlink>(
endp, path, target_path).get().at(0);
CTX->log()->debug("{}() Got response success: {}", __func__, out.err());
if(out.err() != 0) {
errno = out.err();
return -1;
return 0;
} catch(const std::exception& ex) {
CTX->log()->error("{}() while getting rpc output", __func__);
} //end namespace rpc_send