urd.cpp 54.1 KiB
Newer Older
Alberto Miranda's avatar
Alberto Miranda committed
/************************************************************************* 
 * Copyright (C) 2017-2019 Barcelona Supercomputing Center               *
 *                         Centro Nacional de Supercomputacion           *
 * All rights reserved.                                                  *
 *                                                                       *
 * This file is part of NORNS, a service that allows other programs to   *
 * start, track and manage asynchronous transfers of data resources      *
 * between different storage backends.                                   *
 *                                                                       *
 * See AUTHORS file in the top level directory for information regarding *
 * developers and contributors.                                          *
 *                                                                       *
 * This software was developed as part of the EC H2020 funded project    *
 * NEXTGenIO (Project ID: 671951).                                       *
 *     www.nextgenio.eu                                                  *
 *                                                                       *
 * Permission is hereby granted, free of charge, to any person obtaining *
 * a copy of this software and associated documentation files (the       *
 * "Software"), to deal in the Software without restriction, including   *
 * without limitation the rights to use, copy, modify, merge, publish,   *
 * distribute, sublicense, and/or sell copies of the Software, and to    *
 * permit persons to whom the Software is furnished to do so, subject to *
 * the following conditions:                                             *
 *                                                                       *
 * The above copyright notice and this permission notice shall be        *
 * included in all copies or substantial portions of the Software.       *
 *                                                                       *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       *
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    *
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND                 *
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS   *
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN    *
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN     *
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE      *
 * SOFTWARE.                                                             *
 *************************************************************************/

Arnau Bago Castro's avatar
Arnau Bago Castro committed
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cstdlib>
#include <exception>
#include <dirent.h>
#include <fstream>
#include <fcntl.h>
#include <list>
Arnau Bago Castro's avatar
Arnau Bago Castro committed
#include <signal.h>
#include <string.h>
#include <ctime>
#ifdef __LOGGER_ENABLE_DEBUG__
#include <sys/prctl.h>
#endif

Alberto Miranda's avatar
Alberto Miranda committed
#include <boost/optional.hpp>
#include <boost/optional/optional_io.hpp>
#include <boost/atomic.hpp>
#include <functional>

#include "common.hpp"
#include "api.hpp"
#include "backends.hpp"
Alberto Miranda's avatar
Alberto Miranda committed
#include "logger.hpp"
#include "job.hpp"
#include "resources.hpp"
#include "io.hpp"
#include "namespaces.hpp"
#include "fmt.hpp"
#include "hermes.hpp"
#include "rpcs.hpp"
#include "context.hpp"
#include "urd.hpp"
    m_settings(std::make_shared<config::settings>()) {}
pid_t urd::daemonize() {
    /*
     * --- Daemonize structure ---
     *  Check if this is already a daemon
     *  Fork off praent process
     *  Obtain new process group
     *  Close all descriptors
     *  Handle standard IO
     *  Change file mode mask
     *  Change the current working directory
     *  Check if daemon already exists
     *  Manage signals
     */

    pid_t pid, sid;

    /* Check if this is already a daemon */ 
    if(getppid() == 1) {
    // We need to destroy the global logger before calling fork. Otherwise the
    // logger will not function properly since its internal thread will not
    // be duplicated by fork(). Furthermore, if we don't destroy pre-fork()
    // and attempt to replace it post-fork(), the logger destructor will attempt
    // to join the (now invalid) thread and end up blocking forever. To avoid
    // this (and since we want to be able to output messages from all 
    // processes), we destroy it now and recreate it post-fork() both in the
    // parent process and in the child.
    logger::destroy_global_logger();

    /* Fork off the parent process */
    pid = fork();

    // re-initialize logging facilities (post-fork)
    init_logger();

    if(pid < 0) {
        LOGGER_ERRNO("Failed to create child process");
    /* Parent returns to caller */
    /* Become a session and process group leader with no controlling tty */
    if((sid = setsid()) < 0) {
        /* Log failure */
        LOGGER_ERRNO("Failed to disassociate controlling tty");
    /* Handle standard IO: discard data to/from stdin, stdout and stderr */
    int dev_null;
    if((dev_null = open("/dev/null", O_RDWR)) == -1) {
        LOGGER_ERRNO("Failed to open \"/dev/null\"");
    if(dup2(dev_null, STDIN_FILENO) == -1) {
        LOGGER_ERRNO("Failed to dup \"/dev/null\" onto stdin");
        exit(EXIT_FAILURE);
    }

    if(dup2(dev_null, STDOUT_FILENO) == -1) {
        LOGGER_ERRNO("Failed to dup \"/dev/null\" onto stdout");
        exit(EXIT_FAILURE);
    }

    if(dup2(dev_null, STDERR_FILENO) == -1) {
        LOGGER_ERRNO("Failed to dup \"/dev/null\" onto stderr");
    /* Change the file mode creation mask */
    umask(0);
    /* ensure the process does not keep a directory in use,
     * avoid relative paths beyond this point! */
    if(chdir("/") < 0) {
        LOGGER_ERRNO("Failed to change working directory to root directory");
        exit(EXIT_FAILURE);
    }
    
    /* Check if daemon already exists:
     * First instance of the daemon will lock the file so that other
     * instances understand that an instance is already running. 
     */
    if((pfd = open(m_settings->pidfile().c_str(), O_RDWR | O_CREAT, 0640)) == -1) {
        LOGGER_ERRNO("Failed to create daemon lock file");
    if(lockf(pfd, F_TLOCK, 0) < 0) {
        LOGGER_ERRNO("Failed to acquire lock on pidfile");
        LOGGER_ERROR("Another instance of this daemon may already be running");
        exit(EXIT_FAILURE);
    }

    /* record pid in lockfile */
    std::string pidstr(std::to_string(getpid()));
    if(write(pfd, pidstr.c_str(), pidstr.length()) != 
            static_cast<ssize_t>(pidstr.length())) {
        LOGGER_ERRNO("Failed to write pidfile");
        exit(EXIT_FAILURE);
    close(pfd);
    close(dev_null);

    /* Manage signals */
    signal(SIGCHLD, SIG_IGN); /* ignore child */
    signal(SIGTSTP, SIG_IGN); /* ignore tty signals */
    signal(SIGTTOU, SIG_IGN);
    signal(SIGTTIN, SIG_IGN);
//  signal(SIGHUP, signal_handler); /* catch hangup signal */
//  signal(SIGTERM, signal_handler); /* catch kill signal */
urd_error urd::validate_iotask_args(iotask_type type, 
                                    const resource_info_ptr& src_rinfo,
                                    const resource_info_ptr& dst_rinfo) const {
    if(type != iotask_type::copy && 
       type != iotask_type::move &&
       type != iotask_type::remove) {
        return urd_error::bad_args;
    if(src_rinfo->is_remote() && dst_rinfo->is_remote()) {
        return urd_error::not_supported;
//    // dst_resource cannot be a memory region
//    if(dst_rinfo->type() == data::resource_type::memory_region) {
//        return urd_error::not_supported;
//    }
    return urd_error::success;

///////////////////////////////////////////////////////////////////////////////
//                 handlers for user requests
///////////////////////////////////////////////////////////////////////////////

response_ptr 
urd::iotask_create_handler(const request_ptr base_request) {

    // downcast the generic request to the concrete implementation
    auto request = 
        utils::static_unique_ptr_cast<api::iotask_create_request>(
                std::move(base_request));
    const auto type = request->get<0>();
    const auto src_rinfo = request->get<1>();
    const auto dst_rinfo = request->get<2>().get_value_or(nullptr);
    std::vector<std::string> nsids;
    std::vector<bool> remotes;
    std::vector<std::shared_ptr<storage::backend>> backend_ptrs;
    std::vector<std::shared_ptr<data::resource_info>> rinfo_ptrs;
    boost::optional<iotask_id> tid;
    boost::optional<auth::credentials> auth;
    boost::optional<io::generic_task> t;
    response_ptr resp;
    urd_error rv = urd_error::success;
    if(m_is_paused) {
        rv = urd_error::accept_paused;
        goto log_and_return;
    }

    auth = request->credentials();
    if(!auth) {
        LOGGER_CRITICAL("Request without credentials");
        rv = urd_error::snafu; // TODO: invalid_credentials? eaccess? eperm?
        goto log_and_return;
    }

//    if(src_rinfo->is_remote()) {
//        rv = urd_error::not_supported;
//        goto log_and_return;
//    }
    for(const auto& rinfo : {src_rinfo, dst_rinfo}) {
        if(rinfo) {
            nsids.push_back(rinfo->nsid());
            remotes.push_back(rinfo->is_remote());
            rinfo_ptrs.emplace_back(rinfo);
#ifdef __LOGGER_ENABLE_DEBUG__
    LOGGER_DEBUG("Request metadata:");
    LOGGER_DEBUG("  rinfos: {} {}", src_rinfo, dst_rinfo);
    LOGGER_DEBUG("  nsids: {");
    for(std::size_t i=0; i<nsids.size(); ++i) {
        LOGGER_DEBUG("    \"{}\"", nsids[i]);
    }
    LOGGER_DEBUG("  }");
    LOGGER_DEBUG("  remotes: {");
    for(std::size_t i=0; i<remotes.size(); ++i) {
        LOGGER_DEBUG("    {}", remotes[i]);
    }
    LOGGER_DEBUG("  }");
#endif
    {
        bool all_found = false;
        boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
        std::tie(all_found, backend_ptrs) = m_namespace_mgr->find(nsids, remotes);
        if(!all_found) {
            rv = urd_error::no_such_namespace;
            goto log_and_return;
#ifdef __LOGGER_ENABLE_DEBUG__
    for(std::size_t i=0; i<nsids.size(); ++i) {
        LOGGER_DEBUG("nsid: {}, remote?: {}, bptr: {}", nsids[i], remotes[i], backend_ptrs[i]);

    //FIXME: use appropriate args for each task rather than a vector of nullptrs
    switch(type) {
        case iotask_type::move:
        case iotask_type::copy:
            std::tie(rv, t) = 
                m_task_mgr->create_local_initiated_task(type, *auth, backend_ptrs, rinfo_ptrs);
            break;
        case iotask_type::remove:
            std::tie(rv, t) =
                m_task_mgr->create_local_initiated_task(type, *auth, backend_ptrs, rinfo_ptrs);
            break;
        case iotask_type::noop:
            std::tie(rv, t) = 
                m_task_mgr->create_local_initiated_task(type, *auth, backend_ptrs, rinfo_ptrs);
            break;
        default:
            rv = urd_error::bad_args;
            goto log_and_return;
    }

    if(rv == urd_error::success) {
        tid = t->id();

        // enqueue task so that it's eventually run by a worker thread
        m_task_mgr->enqueue_task(std::move(*t));
    }

//    std::tie(rv, tid) = m_task_mgr->create_task(type, *auth, backend_ptrs, rinfo_ptrs);
    resp = std::make_unique<api::iotask_create_response>(tid.get_value_or(0));
    resp->set_error_code(rv);
    LOGGER_INFO("IOTASK_CREATE({}) = {}", request->to_string(), resp->to_string());
response_ptr 
urd::iotask_status_handler(const request_ptr base_request) const {
    auto resp = std::make_unique<api::iotask_status_response>();

    // downcast the generic request to the concrete implementation
    auto request = 
        utils::static_unique_ptr_cast<api::iotask_status_request>(
                std::move(base_request));
    auto task_info_ptr = m_task_mgr->find(request->get<0>());
    if(task_info_ptr) {
        resp->set_error_code(urd_error::success);

        // stats provides a thread-safe view of a task status 
        // (locking is done internally)
        resp->set<0>(task_info_ptr->stats());

        if(task_info_ptr->status() == io::task_status::finished ||
           task_info_ptr->status() == io::task_status::finished_with_error) {
            m_task_mgr->erase(request->get<0>());
        }

    }
    else {
        resp->set_error_code(urd_error::no_such_task);
    LOGGER_INFO("IOTASK_STATUS({}) = {}", 
                request->to_string(), resp->to_string());
///////////////////////////////////////////////////////////////////////////////
//                 handlers for control requests
///////////////////////////////////////////////////////////////////////////////

response_ptr urd::ping_handler(const request_ptr /*base_request*/) {
    response_ptr resp = std::make_unique<api::ping_response>();

    resp->set_error_code(urd_error::success);

    LOGGER_INFO("PING_REQUEST() = {}", resp->to_string());
    return resp;
}

response_ptr urd::job_register_handler(const request_ptr base_request) {
    response_ptr resp = std::make_unique<api::job_register_response>();
    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::job_register_request>(std::move(base_request));
    uint32_t jobid = request->get<0>();
    auto hosts = request->get<1>();
    {
        boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
        if(m_jobs.find(jobid) != m_jobs.end()) {
            resp->set_error_code(urd_error::job_exists);
        }
        else {
            m_jobs.emplace(jobid, std::make_shared<job>(jobid, hosts));
            resp->set_error_code(urd_error::success);
    LOGGER_INFO("REGISTER_JOB({}) = {}", request->to_string(), resp->to_string());
Alberto Miranda's avatar
Alberto Miranda committed
    return resp;
response_ptr urd::job_update_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::job_update_response>();
    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::job_update_request>(std::move(base_request));
    uint32_t jobid = request->get<0>();
    auto hosts = request->get<1>();
    {
        boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
        const auto& it = m_jobs.find(jobid);
        if(it == m_jobs.end()) {
            resp->set_error_code(urd_error::no_such_job);
        }
        else {
            it->second->update(hosts);
            resp->set_error_code(urd_error::success);
    LOGGER_INFO("UPDATE_JOB({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::job_remove_handler(const request_ptr base_request) {
    response_ptr resp = std::make_unique<api::job_unregister_response>();
    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::job_unregister_request>(std::move(base_request));

    uint32_t jobid = request->get<0>();
    {
        boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);
        const auto& it = m_jobs.find(jobid);
        if(it == m_jobs.end()) {
            resp->set_error_code(urd_error::no_such_job);
        }
        else {
            m_jobs.erase(it);
            resp->set_error_code(urd_error::success);
    LOGGER_INFO("UNREGISTER_JOB({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::process_add_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::process_register_response>();
    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::process_register_request>(std::move(base_request));
    uint32_t jobid = request->get<0>();
    //pid_t uid = request->get<1>();
    gid_t gid = request->get<2>();
    pid_t pid = request->get<3>();

    boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);

    const auto& it = m_jobs.find(jobid);

    if(it == m_jobs.end()) {
        resp->set_error_code(urd_error::no_such_job);
    it->second->add_process(pid, gid);
    resp->set_error_code(urd_error::success);
    LOGGER_INFO("ADD_PROCESS({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::process_remove_handler(const request_ptr base_request) {
    response_ptr resp = std::make_unique<api::process_unregister_response>();
    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::process_unregister_request>(std::move(base_request));

    uint32_t jobid = request->get<0>();
    //pid_t uid = request->get<1>();
    gid_t gid = request->get<2>();
    pid_t pid = request->get<3>();

    boost::unique_lock<boost::shared_mutex> lock(m_jobs_mutex);

    const auto& it = m_jobs.find(jobid);

    if(it == m_jobs.end()) {
        resp->set_error_code(urd_error::no_such_job);
    if(it->second->find_and_remove_process(pid, gid)) {
        resp->set_error_code(urd_error::success);
        resp->set_error_code(urd_error::no_such_process);
    LOGGER_INFO("REMOVE_PROCESS({}) = {}", request->to_string(), resp->to_string());
urd_error urd::create_namespace(const config::namespace_def& nsdef) {

    const auto type = storage::backend_factory::get().
        get_type(nsdef.alias());

    if(type == backend_type::unknown) {
        return urd_error::bad_args;
    }

    return create_namespace(nsdef.nsid(), type, nsdef.track(), 
                            nsdef.mountpoint(), nsdef.capacity());
}

urd_error urd::create_namespace(const std::string& nsid, backend_type type,
                                bool track, const bfs::path& mount, 
                                uint32_t quota) {

    boost::unique_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);

    if(m_namespace_mgr->contains(nsid)) {
        return urd_error::namespace_exists;
    }

    if(auto bptr = storage::backend_factory::create_from(
                type, nsid, track, mount, quota)) {
        m_namespace_mgr->add(nsid, bptr);
        return urd_error::success;
    }

    return urd_error::bad_args;
}

response_ptr urd::namespace_register_handler(const request_ptr base_request) {
    response_ptr resp = std::make_unique<api::backend_register_response>();
    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::backend_register_request>(std::move(base_request));
    std::string nsid = request->get<0>();
    backend_type type = request->get<1>();
    bool track = request->get<2>();
    std::string mount = request->get<3>();
    int32_t quota = request->get<4>();
    resp->set_error_code(create_namespace(nsid, type, track, mount, quota));
    LOGGER_INFO("REGISTER_NAMESPACE({}) = {}", request->to_string(), resp->to_string());
/* XXX not supported yet
response_ptr urd::namespace_update_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::backend_update_response>();

    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::backend_update_request>(std::move(base_request));
    resp->set_error_code(urd_error::success);
    LOGGER_INFO("UPDATE_NAMESPACE({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::namespace_remove_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::backend_unregister_response>();

    // downcast the generic request to the concrete implementation
    auto request = utils::static_unique_ptr_cast<api::backend_unregister_request>(std::move(base_request));
    std::string nsid = request->get<0>();
        boost::unique_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
        if(m_namespace_mgr->remove(nsid)) {
            resp->set_error_code(urd_error::success);
            resp->set_error_code(urd_error::no_such_namespace);
    LOGGER_INFO("UNREGISTER_NAMESPACE({}) = {}", request->to_string(), resp->to_string());
response_ptr urd::global_status_handler(const request_ptr /*base_request*/) {
    auto resp = std::make_unique<api::global_status_response>();

    resp->set_error_code(urd_error::success);
    resp->set<0>(m_task_mgr->global_stats());

    LOGGER_INFO("GLOBAL_STATUS() = {}", resp->to_string());
    return std::move(resp);
response_ptr
urd::command_handler(const request_ptr base_request) {

    // downcast the generic request to the concrete implementation
    auto request = 
        utils::static_unique_ptr_cast<api::command_request>(
                std::move(base_request));
    auto resp = std::make_unique<api::command_response>();
    resp->set_error_code(urd_error::success);

    switch(request->get<0>()) {
        case command_type::ping:
            break; // nothing special to do here
        case command_type::pause_listen:
            pause_listening();
        case command_type::resume_listen:
            resume_listening();
        case command_type::shutdown:
            LOGGER_WARN("Shutdown requested!");
            const auto rv = check_shutdown();
            resp->set_error_code(rv);
            if(rv != urd_error::success) {
        case command_type::unknown:
            resp->set_error_code(urd_error::bad_args);
            break;
    }

    LOGGER_INFO("COMMAND({}) = {}", request->to_string(), resp->to_string());
    return std::move(resp);
}


response_ptr urd::unknown_request_handler(const request_ptr /*base_request*/) {
    response_ptr resp = std::make_unique<api::bad_request_response>();

    resp->set_error_code(urd_error::bad_request);

    LOGGER_INFO("UNKNOWN_REQUEST() = {}", resp->to_string());
// N.B. This function is called by the progress thread internal to 
// m_network_service rather than by the main execution thread
urd::push_resource_handler(hermes::request<rpc::push_resource>&& req) {

    const auto args = req.args();

    LOGGER_WARN("incoming rpc::push_resource(from: \"{}@{}:{}\", to: \"{}:{}\")", 
                args.in_nsid(), 
                args.in_address(),
                args.in_resource_name(),
                args.out_nsid(),
                args.out_resource_name());
    urd_error rv = urd_error::success;
    boost::optional<io::generic_task> t;
    std::shared_ptr<storage::backend> src_backend;
    boost::optional<std::shared_ptr<storage::backend>> dst_backend;
    auto dst_rtype = static_cast<data::resource_type>(args.out_resource_type());
    auth::credentials auth; //XXX fake credentials for now

    const auto create_rinfo = 
        [&](const data::resource_type& rtype) -> 
            std::shared_ptr<data::resource_info> {

        switch(rtype) {
            case data::resource_type::remote_resource:
                return std::make_shared<data::remote_resource_info>(
                        args.in_address(), args.in_nsid(), 
                        args.in_is_collection(), args.in_resource_name(), 
                        args.in_buffers());
            case data::resource_type::local_posix_path:
            case data::resource_type::shared_posix_path:
                return std::make_shared<data::local_path_info>(
                        args.out_nsid(), args.out_resource_name());
            default:
                rv = urd_error::not_supported;
                return {};
        }
    };

    if(m_is_paused) {
        rv = urd_error::accept_paused;
        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req), 
                    static_cast<uint32_t>(io::task_status::finished_with_error),
                    static_cast<uint32_t>(rv),
    auto src_rinfo = create_rinfo(data::resource_type::remote_resource);
    auto dst_rinfo = create_rinfo(dst_rtype);

    //TODO: check src_rinfo and dst_rinfo


    // TODO: actually retrieve and validate credentials, etc


    src_backend = 
        std::make_shared<storage::detail::remote_backend>(args.in_nsid());

    {
        boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
        dst_backend = m_namespace_mgr->find(args.out_nsid());
    }

    if(!dst_backend) {
        rv = urd_error::no_such_namespace;
        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req), 
                static_cast<uint32_t>(io::task_status::finished_with_error),
                static_cast<int32_t>(rv),
        return;
    }

    LOGGER_DEBUG("nsid: {}, bptr: {}", args.in_nsid(), dst_backend);

    auto ctx =
        std::make_shared<hermes::request<rpc::push_resource>>(std::move(req));

    std::tie(rv, t) =
        m_task_mgr->create_remote_initiated_task(
                iotask_type::remote_transfer, auth, 
                ctx, src_backend, src_rinfo, 
                *dst_backend, dst_rinfo);

    if(rv == urd_error::success) {
        // run the task and check that it started correctly
        auto req = std::move(*ctx);

        if(t->info()->status() == io::task_status::finished_with_error) {
            rv = t->info()->task_error();
            LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
            m_network_service->respond(std::move(req), 
                    static_cast<uint32_t>(io::task_status::finished_with_error),
                    static_cast<int32_t>(t->info()->task_error()),
                    static_cast<int32_t>(t->info()->sys_error().value()),
                    0);
void
urd::pull_resource_handler(hermes::request<rpc::pull_resource>&& req) {

    const auto args = req.args();

    LOGGER_WARN("incoming rpc::push_request(from: \"{}:{}\", to: \"{}@{}:{}\")", 
                args.in_nsid(), 
                args.in_resource_name(),
                args.out_address(),
                args.out_nsid(), 
                args.out_resource_name());

    urd_error rv = urd_error::success;
    boost::optional<io::generic_task> tsk;
    boost::optional<std::shared_ptr<storage::backend>> src_backend;
    std::shared_ptr<storage::backend> dst_backend;

    /// XXX this information should be retrievable from the backend
    auto src_rtype = static_cast<data::resource_type>(args.in_resource_type());

    auth::credentials auth; //XXX fake credentials for now

    const auto create_rinfo = 
        [&](const data::resource_type& rtype) -> 
            std::shared_ptr<data::resource_info> {

        switch(rtype) {
            case data::resource_type::remote_resource:
                return std::make_shared<data::remote_resource_info>(
                        args.out_address(), args.out_nsid(), false, 
                        args.out_resource_name(), args.out_buffers());
            case data::resource_type::local_posix_path:
            case data::resource_type::shared_posix_path:
                return std::make_shared<data::local_path_info>(
                        args.in_nsid(), args.in_resource_name());
            default:
                rv = urd_error::not_supported;
                return {};
        }
    };

    if(m_is_paused) {
        rv = urd_error::accept_paused;
        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req), 
                    static_cast<uint32_t>(io::task_status::finished_with_error),
                    static_cast<uint32_t>(rv),
                    0,
                    0);
        return;
    }

    auto src_rinfo = create_rinfo(src_rtype);
    auto dst_rinfo = create_rinfo(data::resource_type::remote_resource);

    //TODO: check src_rinfo and dst_rinfo


    // TODO: actually retrieve and validate credentials, etc

    {
        boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
        src_backend = m_namespace_mgr->find(args.in_nsid());
    }

    if(!src_backend) {
        rv = urd_error::no_such_namespace;
        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req), 
                static_cast<uint32_t>(io::task_status::finished_with_error),
                static_cast<int32_t>(rv),
                0,
                0);
        return;
    }

    LOGGER_DEBUG("nsid: {}, bptr: {}", args.in_nsid(), src_backend);

    dst_backend = 
        std::make_shared<storage::detail::remote_backend>(args.out_nsid());

    auto ctx =
        std::make_shared<hermes::request<rpc::pull_resource>>(std::move(req));

    std::tie(rv, tsk) =
        m_task_mgr->create_remote_initiated_task(
                iotask_type::remote_transfer, auth, 
                ctx, *src_backend, src_rinfo, 
                dst_backend, dst_rinfo);

    if(rv == urd_error::success) {
        // run the task and check that it started correctly
        (*tsk)();

        auto req = std::move(*ctx);

        if(tsk->info()->status() == io::task_status::finished_with_error) {
            rv = tsk->info()->task_error();
            LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
            m_network_service->respond(std::move(req), 
                    static_cast<uint32_t>(io::task_status::finished_with_error),
                    static_cast<int32_t>(tsk->info()->task_error()),
                    static_cast<int32_t>(tsk->info()->sys_error().value()),
                    0);
        }
    }
}

urd::stat_resource_handler(hermes::request<rpc::stat_resource>&& req) {

    const auto args = req.args();

    LOGGER_WARN("incoming rpc::stat_resource(\"{}:{}\")", 
                args.nsid(), 
                args.resource_name());

    urd_error rv = urd_error::success;
    boost::optional<std::shared_ptr<storage::backend>> dst_backend;

    {
        boost::shared_lock<boost::shared_mutex> lock(m_namespace_mgr_mutex);
        dst_backend = m_namespace_mgr->find(args.nsid());
    }

    if(!dst_backend) {
        rv = urd_error::no_such_namespace;
        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req), 
                                    static_cast<uint32_t>(rv),
                                    false,
                                    //XXX ENOENT should not be required:
                                    // the transfer() interface should be
                                    // richer rather than returning only a
                                    // std::error_code
                                    ENOENT,
                                    0);
        return;
    }

    auto rtype = static_cast<data::resource_type>(args.resource_type());

    const auto create_rinfo = 
        [&](const data::resource_type& rtype) -> 
            std::shared_ptr<data::resource_info> {

        switch(rtype) {
            case data::resource_type::local_posix_path:
            case data::resource_type::shared_posix_path:
                return std::make_shared<data::local_path_info>(
                        args.nsid(), args.resource_name());
            default:
                return {};
        }
    };

    auto rinfo = create_rinfo(rtype);

    if(!rinfo) {
        LOGGER_ERROR("Failed to access resource {}", rinfo->to_string());
        rv = urd_error::not_supported;

        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req), 
                                    static_cast<uint32_t>(rv),
                                    //XXX EOPNOTSUPP should not be required:
                                    // the transfer() interface should be
                                    // richer rather than returning only a
                                    // std::error_code
                                    EOPNOTSUPP, 
                                    false,
                                    0);
        return;
    }

    std::error_code ec;
    auto rsrc = (*dst_backend)->get_resource(rinfo, ec);

    if(ec) {
        LOGGER_ERROR("Failed to access resource {}", rinfo->to_string());
        rv = urd_error::no_such_resource;

        LOGGER_INFO("IOTASK_RECEIVE() = {}", utils::to_string(rv));
        m_network_service->respond(std::move(req),