Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/*************************************************************************
* Copyright (C) 2017-2019 Barcelona Supercomputing Center *
* Centro Nacional de Supercomputacion *
* All rights reserved. *
* *
* This file is part of NORNS, a service that allows other programs to *
* start, track and manage asynchronous transfers of data resources *
* between different storage backends. *
* *
* See AUTHORS file in the top level directory for information regarding *
* developers and contributors. *
* *
* This software was developed as part of the EC H2020 funded project *
* NEXTGenIO (Project ID: 671951). *
* www.nextgenio.eu *
* *
* Permission is hereby granted, free of charge, to any person obtaining *
* a copy of this software and associated documentation files (the *
* "Software"), to deal in the Software without restriction, including *
* without limitation the rights to use, copy, modify, merge, publish, *
* distribute, sublicense, and/or sell copies of the Software, and to *
* permit persons to whom the Software is furnished to do so, subject to *
* the following conditions: *
* *
* The above copyright notice and this permission notice shall be *
* included in all copies or substantial portions of the Software. *
* *
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, *
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF *
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND *
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS *
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN *
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN *
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE *
* SOFTWARE. *
*************************************************************************/
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstdlib>
#include <exception>
#include <dirent.h>
#include <fstream>
#include <fcntl.h>
#ifdef __LOGGER_ENABLE_DEBUG__
#include <sys/prctl.h>
#endif
#include <boost/optional.hpp>
#include <boost/optional/optional_io.hpp>
#include "hermes.hpp"
#include "rpcs.hpp"
m_is_paused(false),
m_settings(std::make_shared<config::settings>()) {}
/*
* --- Daemonize structure ---
* Check if this is already a daemon
* Fork off praent process
* Obtain new process group
* Close all descriptors
* Handle standard IO
* Change file mode mask
* Change the current working directory
* Check if daemon already exists
* Manage signals
*/
pid_t pid, sid;
/* Check if this is already a daemon */
if(getppid() == 1) {
// We need to destroy the global logger before calling fork. Otherwise the
// logger will not function properly since its internal thread will not
// be duplicated by fork(). Furthermore, if we don't destroy pre-fork()
// and attempt to replace it post-fork(), the logger destructor will attempt
// to join the (now invalid) thread and end up blocking forever. To avoid
// this (and since we want to be able to output messages from all
// processes), we destroy it now and recreate it post-fork() both in the
// parent process and in the child.
logger::destroy_global_logger();
/* Fork off the parent process */
pid = fork();
// re-initialize logging facilities (post-fork)
init_logger();
if(pid < 0) {
LOGGER_ERRNO("Failed to create child process");
exit(EXIT_FAILURE);
}
/* Parent returns to caller */
if(pid != 0) {
return pid;
}
/* Become a session and process group leader with no controlling tty */
if((sid = setsid()) < 0) {
/* Log failure */
LOGGER_ERRNO("Failed to disassociate controlling tty");
exit(EXIT_FAILURE);
}
/* Handle standard IO: discard data to/from stdin, stdout and stderr */
int dev_null;
if((dev_null = open("/dev/null", O_RDWR)) == -1) {
LOGGER_ERRNO("Failed to open \"/dev/null\"");
exit(EXIT_FAILURE);
}
if(dup2(dev_null, STDIN_FILENO) == -1) {
LOGGER_ERRNO("Failed to dup \"/dev/null\" onto stdin");
exit(EXIT_FAILURE);
}
if(dup2(dev_null, STDOUT_FILENO) == -1) {
LOGGER_ERRNO("Failed to dup \"/dev/null\" onto stdout");
exit(EXIT_FAILURE);
}
if(dup2(dev_null, STDERR_FILENO) == -1) {
LOGGER_ERRNO("Failed to dup \"/dev/null\" onto stderr");
exit(EXIT_FAILURE);
}
/* Change the file mode creation mask */
umask(0);
/* ensure the process does not keep a directory in use,
* avoid relative paths beyond this point! */
if(chdir("/") < 0) {
LOGGER_ERRNO("Failed to change working directory to root directory");
exit(EXIT_FAILURE);
}
/* Check if daemon already exists:
* First instance of the daemon will lock the file so that other
* instances understand that an instance is already running.
*/
if((pfd = open(m_settings->pidfile().c_str(), O_RDWR | O_CREAT, 0640)) == -1) {
LOGGER_ERRNO("Failed to create daemon lock file");
exit(EXIT_FAILURE);
}
if(lockf(pfd, F_TLOCK, 0) < 0) {
LOGGER_ERRNO("Failed to acquire lock on pidfile");
LOGGER_ERROR("Another instance of this daemon may already be running");
exit(EXIT_FAILURE);
}
/* record pid in lockfile */
std::string pidstr(std::to_string(getpid()));
if(write(pfd, pidstr.c_str(), pidstr.length()) !=
static_cast<ssize_t>(pidstr.length())) {
LOGGER_ERRNO("Failed to write pidfile");
exit(EXIT_FAILURE);
close(pfd);
close(dev_null);
/* Manage signals */
signal(SIGCHLD, SIG_IGN); /* ignore child */
signal(SIGTSTP, SIG_IGN); /* ignore tty signals */
signal(SIGTTOU, SIG_IGN);
signal(SIGTTIN, SIG_IGN);
// signal(SIGHUP, signal_handler); /* catch hangup signal */
// signal(SIGTERM, signal_handler); /* catch kill signal */
urd_error urd::validate_iotask_args(iotask_type type,
const resource_info_ptr& src_rinfo,
const resource_info_ptr& dst_rinfo) const {
if(type != iotask_type::copy &&
type != iotask_type::move &&
type != iotask_type::remove) {
return urd_error::bad_args;
if(src_rinfo->is_remote() && dst_rinfo->is_remote()) {
return urd_error::not_supported;
// // dst_resource cannot be a memory region
// if(dst_rinfo->type() == data::resource_type::memory_region) {
// return urd_error::not_supported;
// }
return urd_error::success;
///////////////////////////////////////////////////////////////////////////////
// handlers for user requests
///////////////////////////////////////////////////////////////////////////////
response_ptr
urd::iotask_create_handler(const request_ptr base_request) {
// downcast the generic request to the concrete implementation
auto request =
utils::static_unique_ptr_cast<api::iotask_create_request>(
std::move(base_request));
const auto type = request->get<0>();
const auto src_rinfo = request->get<1>();
const auto dst_rinfo = request->get<2>().get_value_or(nullptr);
std::vector<std::string> nsids;
std::vector<bool> remotes;
std::vector<std::shared_ptr<storage::backend>> backend_ptrs;
std::vector<std::shared_ptr<data::resource_info>> rinfo_ptrs;
boost::optional<iotask_id> tid;
boost::optional<auth::credentials> auth;
boost::optional<io::generic_task> t;
urd_error rv = urd_error::success;
if(m_is_paused) {
rv = urd_error::accept_paused;
goto log_and_return;
}
auth = request->credentials();
LOGGER_CRITICAL("Request without credentials");
rv = urd_error::snafu; // TODO: invalid_credentials? eaccess? eperm?
goto log_and_return;
}
// if(src_rinfo->is_remote()) {
// rv = urd_error::not_supported;
// goto log_and_return;
// }
for(const auto& rinfo : {src_rinfo, dst_rinfo}) {
if(rinfo) {
nsids.push_back(rinfo->nsid());
remotes.push_back(rinfo->is_remote());
rinfo_ptrs.emplace_back(rinfo);
#ifdef __LOGGER_ENABLE_DEBUG__
LOGGER_DEBUG("Request metadata:");
LOGGER_DEBUG(" rinfos: {} {}", src_rinfo, dst_rinfo);
LOGGER_DEBUG(" nsids: {");
for(std::size_t i=0; i<nsids.size(); ++i) {
LOGGER_DEBUG(" \"{}\"", nsids[i]);
}
LOGGER_DEBUG(" }");
LOGGER_DEBUG(" remotes: {");
for(std::size_t i=0; i<remotes.size(); ++i) {
LOGGER_DEBUG(" {}", remotes[i]);
}
LOGGER_DEBUG(" }");
#endif
{
bool all_found = false;
boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
std::tie(all_found, backend_ptrs) = m_namespace_mgr->find(nsids, remotes);
if(!all_found) {
rv = urd_error::no_such_namespace;
#ifdef __LOGGER_ENABLE_DEBUG__
for(std::size_t i=0; i<nsids.size(); ++i) {
LOGGER_DEBUG("nsid: {}, remote?: {}, bptr: {}", nsids[i], remotes[i], backend_ptrs[i]);
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
//FIXME: use appropriate args for each task rather than a vector of nullptrs
switch(type) {
case iotask_type::move:
case iotask_type::copy:
std::tie(rv, t) =
m_task_mgr->create_local_initiated_task(type, *auth, backend_ptrs, rinfo_ptrs);
break;
case iotask_type::remove:
std::tie(rv, t) =
m_task_mgr->create_local_initiated_task(type, *auth, backend_ptrs, rinfo_ptrs);
break;
case iotask_type::noop:
std::tie(rv, t) =
m_task_mgr->create_local_initiated_task(type, *auth, backend_ptrs, rinfo_ptrs);
break;
default:
rv = urd_error::bad_args;
goto log_and_return;
}
if(rv == urd_error::success) {
tid = t->id();
// enqueue task so that it's eventually run by a worker thread
m_task_mgr->enqueue_task(std::move(*t));
}
// std::tie(rv, tid) = m_task_mgr->create_task(type, *auth, backend_ptrs, rinfo_ptrs);
resp = std::make_unique<api::iotask_create_response>(tid.get_value_or(0));
resp->set_error_code(rv);
LOGGER_INFO("IOTASK_CREATE({}) = {}", request->to_string(), resp->to_string());
response_ptr
urd::iotask_status_handler(const request_ptr base_request) const {
auto resp = std::make_unique<api::iotask_status_response>();
// downcast the generic request to the concrete implementation
auto request =
utils::static_unique_ptr_cast<api::iotask_status_request>(
std::move(base_request));
auto task_info_ptr = m_task_mgr->find(request->get<0>());
resp->set_error_code(urd_error::success);
// stats provides a thread-safe view of a task status
// (locking is done internally)
resp->set<0>(task_info_ptr->stats());
if(task_info_ptr->status() == io::task_status::finished ||
task_info_ptr->status() == io::task_status::finished_with_error) {
m_task_mgr->erase(request->get<0>());
}
}
else {
resp->set_error_code(urd_error::no_such_task);
LOGGER_INFO("IOTASK_STATUS({}) = {}",
request->to_string(), resp->to_string());
return std::move(resp);
///////////////////////////////////////////////////////////////////////////////
// handlers for control requests
///////////////////////////////////////////////////////////////////////////////
response_ptr urd::ping_handler(const request_ptr /*base_request*/) {
response_ptr resp = std::make_unique<api::ping_response>();
resp->set_error_code(urd_error::success);
LOGGER_INFO("PING_REQUEST() = {}", resp->to_string());
return resp;
}
response_ptr urd::job_register_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::job_register_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::job_register_request>(std::move(base_request));
uint32_t jobid = request->get<0>();
auto hosts = request->get<1>();
{
boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
if(m_jobs.find(jobid) != m_jobs.end()) {
resp->set_error_code(urd_error::job_exists);
}
else {
m_jobs.emplace(jobid, std::make_shared<job>(jobid, hosts));
resp->set_error_code(urd_error::success);
LOGGER_INFO("REGISTER_JOB({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::job_update_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::job_update_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::job_update_request>(std::move(base_request));
uint32_t jobid = request->get<0>();
auto hosts = request->get<1>();
{
boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
const auto& it = m_jobs.find(jobid);
resp->set_error_code(urd_error::no_such_job);
}
else {
it->second->update(hosts);
resp->set_error_code(urd_error::success);
LOGGER_INFO("UPDATE_JOB({}) = {}", request->to_string(), resp->to_string());
return resp;
}
response_ptr urd::job_remove_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::job_unregister_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::job_unregister_request>(std::move(base_request));
uint32_t jobid = request->get<0>();
{
boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
const auto& it = m_jobs.find(jobid);
resp->set_error_code(urd_error::no_such_job);
}
else {
m_jobs.erase(it);
resp->set_error_code(urd_error::success);
LOGGER_INFO("UNREGISTER_JOB({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::process_add_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::process_register_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::process_register_request>(std::move(base_request));
//pid_t uid = request->get<1>();
gid_t gid = request->get<2>();
pid_t pid = request->get<3>();
boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
const auto& it = m_jobs.find(jobid);
if(it == m_jobs.end()) {
resp->set_error_code(urd_error::no_such_job);
goto log_and_return;
}
resp->set_error_code(urd_error::success);
LOGGER_INFO("ADD_PROCESS({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::process_remove_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::process_unregister_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::process_unregister_request>(std::move(base_request));
uint32_t jobid = request->get<0>();
//pid_t uid = request->get<1>();
gid_t gid = request->get<2>();
pid_t pid = request->get<3>();
boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
const auto& it = m_jobs.find(jobid);
if(it == m_jobs.end()) {
resp->set_error_code(urd_error::no_such_job);
goto log_and_return;
}
if(it->second->find_and_remove_process(pid, gid)) {
resp->set_error_code(urd_error::success);
resp->set_error_code(urd_error::no_such_process);
LOGGER_INFO("REMOVE_PROCESS({}) = {}", request->to_string(), resp->to_string());
urd_error urd::create_namespace(const config::namespace_def& nsdef) {
const auto type = storage::backend_factory::get().
get_type(nsdef.alias());
if(type == backend_type::unknown) {
return urd_error::bad_args;
}
return create_namespace(nsdef.nsid(), type, nsdef.track(),
nsdef.mountpoint(), nsdef.capacity());
}
urd_error urd::create_namespace(const std::string& nsid, backend_type type,
bool track, const bfs::path& mount,
uint32_t quota) {
boost::unique_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
if(m_namespace_mgr->contains(nsid)) {
return urd_error::namespace_exists;
}
if(auto bptr = storage::backend_factory::create_from(
type, nsid, track, mount, quota)) {
m_namespace_mgr->add(nsid, bptr);
return urd_error::success;
}
return urd_error::bad_args;
}
response_ptr urd::namespace_register_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::backend_register_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::backend_register_request>(std::move(base_request));
std::string nsid = request->get<0>();
backend_type type = request->get<1>();
bool track = request->get<2>();
std::string mount = request->get<3>();
int32_t quota = request->get<4>();
resp->set_error_code(create_namespace(nsid, type, track, mount, quota));
LOGGER_INFO("REGISTER_NAMESPACE({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::namespace_update_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::backend_update_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::backend_update_request>(std::move(base_request));
resp->set_error_code(urd_error::success);
LOGGER_INFO("UPDATE_NAMESPACE({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::namespace_remove_handler(const request_ptr base_request) {
response_ptr resp = std::make_unique<api::backend_unregister_response>();
// downcast the generic request to the concrete implementation
auto request = utils::static_unique_ptr_cast<api::backend_unregister_request>(std::move(base_request));
std::string nsid = request->get<0>();
boost::unique_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
if(m_namespace_mgr->remove(nsid)) {
resp->set_error_code(urd_error::success);
resp->set_error_code(urd_error::no_such_namespace);
LOGGER_INFO("UNREGISTER_NAMESPACE({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::global_status_handler(const request_ptr /*base_request*/) {
auto resp = std::make_unique<api::global_status_response>();
resp->set_error_code(urd_error::success);
resp->set<0>(m_task_mgr->global_stats());
LOGGER_INFO("GLOBAL_STATUS() = {}", resp->to_string());
response_ptr
urd::command_handler(const request_ptr base_request) {
// downcast the generic request to the concrete implementation
auto request =
utils::static_unique_ptr_cast<api::command_request>(
std::move(base_request));
auto resp = std::make_unique<api::command_response>();
resp->set_error_code(urd_error::success);
switch(request->get<0>()) {
case command_type::ping:
break; // nothing special to do here
case command_type::pause_listen:
pause_listening();
break;
case command_type::resume_listen:
resume_listening();
break;
case command_type::shutdown:
LOGGER_WARN("Shutdown requested!");
pause_listening();
const auto rv = check_shutdown();
resp->set_error_code(rv);
if(rv != urd_error::success) {
resume_listening();
case command_type::unknown:
resp->set_error_code(urd_error::bad_args);
break;
}
LOGGER_INFO("COMMAND({}) = {}", request->to_string(), resp->to_string());
return std::move(resp);
}
response_ptr urd::unknown_request_handler(const request_ptr /*base_request*/) {
response_ptr resp = std::make_unique<api::bad_request_response>();
resp->set_error_code(urd_error::bad_request);
LOGGER_INFO("UNKNOWN_REQUEST() = {}", resp->to_string());
// N.B. This function is called by the progress thread internal to
// m_network_service rather than by the main execution thread
urd::push_resource_handler(hermes::request<rpc::push_resource>&& req) {
const auto args = req.args();
LOGGER_WARN("incoming rpc::push_resource(from: \"{}@{}:{}\", to: \"{}:{}\")",
args.in_address(),
args.in_resource_name(),
urd_error rv = urd_error::success;
boost::optional<io::generic_task> t;
std::shared_ptr<storage::backend> src_backend;
boost::optional<std::shared_ptr<storage::backend>> dst_backend;
auto dst_rtype = static_cast<data::resource_type>(args.out_resource_type());
auth::credentials auth; //XXX fake credentials for now
const auto create_rinfo =
[&](const data::resource_type& rtype) ->
std::shared_ptr<data::resource_info> {
switch(rtype) {
case data::resource_type::remote_resource:
return std::make_shared<data::remote_resource_info>(
args.in_address(), args.in_nsid(),
args.in_is_collection(), args.in_resource_name(),
args.in_buffers());
case data::resource_type::local_posix_path:
case data::resource_type::shared_posix_path:
return std::make_shared<data::local_path_info>(
args.out_nsid(), args.out_resource_name());
default:
rv = urd_error::not_supported;
return {};
}
};
if(m_is_paused) {
rv = urd_error::accept_paused;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
static_cast<uint32_t>(io::task_status::finished_with_error),
static_cast<uint32_t>(rv),
auto src_rinfo = create_rinfo(data::resource_type::remote_resource);
auto dst_rinfo = create_rinfo(dst_rtype);
//TODO: check src_rinfo and dst_rinfo
// TODO: actually retrieve and validate credentials, etc
src_backend =
std::make_shared<storage::detail::remote_backend>(args.in_nsid());
{
boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
dst_backend = m_namespace_mgr->find(args.out_nsid());
}
if(!dst_backend) {
rv = urd_error::no_such_namespace;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
static_cast<uint32_t>(io::task_status::finished_with_error),
static_cast<int32_t>(rv),
return;
}
LOGGER_DEBUG("nsid: {}, bptr: {}", args.in_nsid(), dst_backend);
auto ctx =
std::make_shared<hermes::request<rpc::push_resource>>(std::move(req));
std::tie(rv, t) =
m_task_mgr->create_remote_initiated_task(
iotask_type::remote_transfer, auth,
ctx, src_backend, src_rinfo,
*dst_backend, dst_rinfo);
if(rv == urd_error::success) {
// run the task and check that it started correctly
auto req = std::move(*ctx);
if(t->info()->status() == io::task_status::finished_with_error) {
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
static_cast<uint32_t>(io::task_status::finished_with_error),
static_cast<int32_t>(t->info()->task_error()),
static_cast<int32_t>(t->info()->sys_error().value()),
0);
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
void
urd::pull_resource_handler(hermes::request<rpc::pull_resource>&& req) {
const auto args = req.args();
LOGGER_WARN("incoming rpc::push_request(from: \"{}:{}\", to: \"{}@{}:{}\")",
args.in_nsid(),
args.in_resource_name(),
args.out_address(),
args.out_nsid(),
args.out_resource_name());
urd_error rv = urd_error::success;
boost::optional<io::generic_task> tsk;
boost::optional<std::shared_ptr<storage::backend>> src_backend;
std::shared_ptr<storage::backend> dst_backend;
/// XXX this information should be retrievable from the backend
auto src_rtype = static_cast<data::resource_type>(args.in_resource_type());
auth::credentials auth; //XXX fake credentials for now
const auto create_rinfo =
[&](const data::resource_type& rtype) ->
std::shared_ptr<data::resource_info> {
switch(rtype) {
case data::resource_type::remote_resource:
return std::make_shared<data::remote_resource_info>(
args.out_address(), args.out_nsid(), false,
args.out_resource_name(), args.out_buffers());
case data::resource_type::local_posix_path:
case data::resource_type::shared_posix_path:
return std::make_shared<data::local_path_info>(
args.in_nsid(), args.in_resource_name());
default:
rv = urd_error::not_supported;
return {};
}
};
if(m_is_paused) {
rv = urd_error::accept_paused;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
static_cast<uint32_t>(io::task_status::finished_with_error),
static_cast<uint32_t>(rv),
0,
0);
return;
}
auto src_rinfo = create_rinfo(src_rtype);
auto dst_rinfo = create_rinfo(data::resource_type::remote_resource);
//TODO: check src_rinfo and dst_rinfo
// TODO: actually retrieve and validate credentials, etc
{
boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
src_backend = m_namespace_mgr->find(args.in_nsid());
}
if(!src_backend) {
rv = urd_error::no_such_namespace;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
static_cast<uint32_t>(io::task_status::finished_with_error),
static_cast<int32_t>(rv),
0,
0);
return;
}
LOGGER_DEBUG("nsid: {}, bptr: {}", args.in_nsid(), src_backend);
dst_backend =
std::make_shared<storage::detail::remote_backend>(args.out_nsid());
auto ctx =
std::make_shared<hermes::request<rpc::pull_resource>>(std::move(req));
std::tie(rv, tsk) =
m_task_mgr->create_remote_initiated_task(
iotask_type::remote_transfer, auth,
ctx, *src_backend, src_rinfo,
dst_backend, dst_rinfo);
if(rv == urd_error::success) {
// run the task and check that it started correctly
(*tsk)();
auto req = std::move(*ctx);
if(tsk->info()->status() == io::task_status::finished_with_error) {
rv = tsk->info()->task_error();
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
static_cast<uint32_t>(io::task_status::finished_with_error),
static_cast<int32_t>(tsk->info()->task_error()),
static_cast<int32_t>(tsk->info()->sys_error().value()),
0);
}
}
}
urd::stat_resource_handler(hermes::request<rpc::stat_resource>&& req) {
const auto args = req.args();
LOGGER_WARN("incoming rpc::stat_resource(\"{}:{}\")",
args.nsid(),
args.resource_name());
urd_error rv = urd_error::success;
boost::optional<std::shared_ptr<storage::backend>> dst_backend;
{
boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
dst_backend = m_namespace_mgr->find(args.nsid());
}
if(!dst_backend) {
rv = urd_error::no_such_namespace;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
false,
//XXX ENOENT should not be required:
// the transfer() interface should be
// richer rather than returning only a
// std::error_code
ENOENT,
0);
return;
}
auto rtype = static_cast<data::resource_type>(args.resource_type());
const auto create_rinfo =
[&](const data::resource_type& rtype) ->
std::shared_ptr<data::resource_info> {
switch(rtype) {
case data::resource_type::local_posix_path:
case data::resource_type::shared_posix_path:
return std::make_shared<data::local_path_info>(
args.nsid(), args.resource_name());
default:
return {};
}
};
auto rinfo = create_rinfo(rtype);
if(!rinfo) {
LOGGER_ERROR("Failed to access resource {}", rinfo->to_string());
rv = urd_error::not_supported;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),
static_cast<uint32_t>(rv),
//XXX EOPNOTSUPP should not be required:
// the transfer() interface should be
// richer rather than returning only a
// std::error_code
EOPNOTSUPP,
false,
0);
return;
}
std::error_code ec;
auto rsrc = (*dst_backend)->get_resource(rinfo, ec);
if(ec) {
LOGGER_ERROR("Failed to access resource {}", rinfo->to_string());
rv = urd_error::no_such_resource;
LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
m_network_service->respond(std::move(req),