Commit a454f2ec authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Initial support for transferring remote directories

parent 07905459
Loading
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -144,7 +144,13 @@ AS_IF([test "x${PROTOC}" == "x"],
AC_SEARCH_LIBS([yaml_parser_initialize], [yaml], 
               [YAML_LIBS="-lyaml"
                AC_SUBST(YAML_LIBS)], 
               [AC_MSG_ERROR([This software required libyaml >= 0.1.4])])
               [AC_MSG_ERROR([This software requires libyaml >= 0.1.4])])

# check for libtar manually (since it doesn't provide a pkgconfig file)
AC_SEARCH_LIBS([tar_open], [tar],
               [TAR_LIBS="-ltar"
                AC_SUBST(TAR_LIBS)],
               [AC_MSG_ERROR([This software requires libtar >= 1.2.0])])

# Checks for header files.

+5 −0
Original line number Diff line number Diff line
@@ -90,6 +90,7 @@ liburd_resources_la_LDFLAGS = \
    @BOOST_THREAD_LIB@ \
    @MERCURY_LIBS@ \
	@PROTOBUF_LIBS@ \
	@TAR_LIBS@ \
	-pthread


@@ -167,6 +168,8 @@ liburd_aux_la_SOURCES = \
	io/transferors/local-path-to-remote-resource.hpp \
	io/transferors/remote-resource-to-local-path.cpp \
	io/transferors/remote-resource-to-local-path.hpp \
	io/transferors/tar-archive.cpp \
	io/transferors/tar-archive.hpp \
	io/transferors/memory-to-local-path.cpp \
	io/transferors/memory-to-local-path.hpp \
	io/transferors/memory-to-shared-path.cpp \
@@ -218,6 +221,7 @@ liburd_aux_la_LDFLAGS = \
    @MERCURY_LIBS@ \
	@PROTOBUF_LIBS@ \
	@YAMLCPP_LIBS@ \
	@TAR_LIBS@ \
	liburd_resources.la \
	-pthread

@@ -293,6 +297,7 @@ urd_LDFLAGS = \
    @BOOST_THREAD_LIB@ \
    @MERCURY_LIBS@ \
	@PROTOBUF_LIBS@ \
	@TAR_LIBS@ \
	liburd_aux.la

# we also need to include it as an additional dependency, since automake
+58 −8
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>

#include "utils.hpp"
#include "logger.hpp"
@@ -38,6 +39,7 @@
#include "backends/posix-fs.hpp"
#include "hermes.hpp"
#include "rpcs.hpp"
#include "tar-archive.hpp"
#include "local-path-to-remote-resource.hpp"


@@ -68,6 +70,8 @@ local_path_to_remote_resource_transferor::transfer(
        const std::shared_ptr<const data::resource>& src,  
        const std::shared_ptr<const data::resource>& dst) const {

    using utils::tar;

    (void) auth;

    const auto& d_src = 
@@ -75,6 +79,40 @@ local_path_to_remote_resource_transferor::transfer(
    const auto& d_dst = 
        reinterpret_cast<const data::remote_resource&>(*dst);

    std::string input_path = d_src.canonical_path().string();

    if(src->is_collection()) {
        LOGGER_DEBUG("[{}] Creating archive for local directory", 
                     task_info->id());

        std::error_code ec;

        bfs::path ar_path = 
            "/tmp" / bfs::unique_path("norns-archive-%%%%-%%%%-%%%%.tar");
        
        tar ar(ar_path, tar::create, ec);

        if(ec) {
            LOGGER_ERROR("Failed to create archive: {}", 
                         logger::errno_message(ec.value()));
            return ec;
        }

        LOGGER_INFO("Archive created in {}", ar.path());

        ar.add_directory(d_src.canonical_path(), 
                         d_dst.name(),
                         ec);

        if(ec) {
            LOGGER_ERROR("Failed to add directory to archive: {}", 
                         logger::errno_message(ec.value()));
            return ec;
        }

        input_path = ar.path().string();
    }

    LOGGER_DEBUG("[{}] start_transfer: {} -> {}", 
                 task_info->id(), d_src.canonical_path(), d_dst.to_string());

@@ -82,7 +120,7 @@ local_path_to_remote_resource_transferor::transfer(

    try {
        std::error_code ec;
        hermes::mapped_buffer input_data(d_src.canonical_path().string(),
        hermes::mapped_buffer input_data(input_path,
                                         hermes::access_mode::read_only,
                                         &ec);

@@ -104,6 +142,7 @@ local_path_to_remote_resource_transferor::transfer(
                d_dst.parent()->nsid(), 
                static_cast<uint32_t>(backend_type::posix_filesystem),
                static_cast<uint32_t>(data::resource_type::local_posix_path), 
                d_src.is_collection(),
                d_dst.name(),
                buffers);

@@ -122,23 +161,34 @@ local_path_to_remote_resource_transferor::transfer(
        LOGGER_DEBUG("};");
        LOGGER_FLUSH();

        auto start = std::chrono::steady_clock::now();

        auto rpc = 
            m_network_endpoint->post<norns::rpc::remote_transfer>(endp, args);

        auto resp = rpc.get();

        double usecs = std::chrono::duration<double, std::micro>(
                std::chrono::steady_clock::now() - start).count();

        task_info->record_transfer(input_data.size(), usecs);
        task_info->record_transfer(input_data.size(), 
                                   resp.at(0).elapsed_time());

        LOGGER_DEBUG("Remote request completed with output "
                     "{{status: {}, task_error: {}, sys_errnum: {}}} "
                     "({} bytes, {} usecs)",
                    resp.at(0).status(), resp.at(0).task_error(), 
                    resp.at(0).sys_errnum(), input_data.size(), usecs);
                    resp.at(0).sys_errnum(), input_data.size(), 
                    resp.at(0).elapsed_time());

        if(src->is_collection()) {
            boost::system::error_code bec;

            bfs::remove(input_path, bec);

            if(bec) {
                LOGGER_ERROR("Failed to remove archive {}: {}", 
                             input_path, logger::errno_message(ec.value()));
                //TODO
                return std::make_error_code(
                    static_cast<std::errc>(bec.value()));
            }
        }

        return std::make_error_code(static_cast<std::errc>(0));
    }
+81 −13
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@
#include "io/task-stats.hpp"
#include "hermes.hpp"
#include "rpcs.hpp"
#include "tar-archive.hpp"
#include "remote-resource-to-local-path.hpp"

namespace {
@@ -121,6 +122,8 @@ remote_resource_to_local_path_transferor::transfer(
    (void) src;
    (void) dst;

    using utils::tar;

    const auto& d_src = 
        reinterpret_cast<const data::remote_resource&>(*src);
    const auto& d_dst = 
@@ -144,22 +147,21 @@ remote_resource_to_local_path_transferor::transfer(

    assert(remote_buffers.count() == 1);

    LOGGER_DEBUG("creating local resource: {}", d_dst.canonical_path());
    bool is_collection = d_src.is_collection();

    bfs::path output_path = 
        is_collection ? "/tmp/test.tar" : d_dst.canonical_path();

    LOGGER_DEBUG("creating local resource: {}", output_path);

    std::error_code ec;
    std::shared_ptr<hermes::mapped_buffer> output_data;

    std::tie(ec, output_data) = 
        ::create_file(d_dst.canonical_path(), remote_buffers.size());
        ::create_file(output_path, remote_buffers.size());

    if(ec) {
        if(req.requires_response()) {
            m_remote_endpoint->respond<rpc::remote_transfer>(
                    std::move(req),
                    static_cast<uint32_t>(task_status::finished_with_error),
                    static_cast<uint32_t>(urd_error::system_error),
                    static_cast<uint32_t>(ec.value()));
        }
        *ctx = std::move(req);
        return ec;
    }

@@ -172,23 +174,89 @@ remote_resource_to_local_path_transferor::transfer(
    hermes::exposed_memory local_buffers =
        m_remote_endpoint->expose(bufseq, hermes::access_mode::write_only);

    LOGGER_DEBUG("pulling remote data into {}", d_dst.canonical_path());
    LOGGER_DEBUG("pulling remote data into {}", output_path);

    auto start = std::chrono::steady_clock::now();

    // N.B. IMPORTANT: we NEED to capture output_data by value here so that
    // the mapped_buffer doesn't get released before completion_callback()
    // is called.
    const auto completion_callback = [this, output_data](
    const auto completion_callback = 
        [this, is_collection, output_path, d_dst, output_data, start](
            hermes::request<rpc::remote_transfer>&& req) {

        uint32_t usecs = 
            std::chrono::duration_cast<std::chrono::microseconds>(
                std::chrono::steady_clock::now() - start).count();

        // default response
        rpc::remote_transfer::output out(
                static_cast<uint32_t>(task_status::finished),
                static_cast<uint32_t>(urd_error::success),
                0,
                usecs);

        //TODO: hermes offers no way to check for an error yet
        LOGGER_DEBUG("pull succeeded");
        LOGGER_DEBUG("Transfer completed ({} usecs)", usecs);

        if(is_collection) {
            std::error_code ec;
            boost::system::error_code bec;
            tar ar(output_path, tar::open, ec);

            if(ec) {
                LOGGER_ERROR("Failed to open archive {}: {}", 
                             output_path, logger::errno_message(ec.value()));

                out = std::move(rpc::remote_transfer::output{
                    static_cast<uint32_t>(task_status::finished_with_error),
                    static_cast<uint32_t>(urd_error::system_error),
                    static_cast<uint32_t>(ec.value()),
                    0
                });
                goto respond;
            }

            ar.extract(d_dst.parent()->mount(), ec);

            if(ec) {
                LOGGER_ERROR("Failed to extact archive into {}: {}",
                             ar.path(), d_dst.parent()->mount(), 
                             logger::errno_message(ec.value()));
                out = std::move(rpc::remote_transfer::output{
                    static_cast<uint32_t>(task_status::finished_with_error),
                    static_cast<uint32_t>(urd_error::system_error),
                    static_cast<uint32_t>(ec.value()),
                    0
                });
                goto respond;
            }

            LOGGER_DEBUG("Archive {} extracted into {}", 
                         ar.path(), d_dst.parent()->mount());

            bfs::remove(ar.path(), bec);

            if(bec) {
                LOGGER_ERROR("Failed to remove archive {}: {}", 
                             ar.path(), logger::errno_message(ec.value()));
                out = std::move(rpc::remote_transfer::output{
                    static_cast<uint32_t>(task_status::finished_with_error),
                    static_cast<uint32_t>(urd_error::system_error),
                    static_cast<uint32_t>(ec.value()),
                    0
                });
            }
        }

respond:
        if(req.requires_response()) {
            m_remote_endpoint->respond<rpc::remote_transfer>(
                    std::move(req), 
                    static_cast<uint32_t>(task_status::finished),
                    static_cast<uint32_t>(urd_error::success),
                    0);
                    0,
                    usecs);
        }
    };

+108 −0
Original line number Diff line number Diff line
/*************************************************************************
 * Copyright (C) 2017-2019 Barcelona Supercomputing Center               *
 *                         Centro Nacional de Supercomputacion           *
 * All rights reserved.                                                  *
 *                                                                       *
 * This file is part of the NORNS Data Scheduler, a service that allows  *
 * other programs to start, track and manage asynchronous transfers of   *
 * data resources transfers requests between different storage backends. *
 *                                                                       *
 * See AUTHORS file in the top level directory for information           *
 * regarding developers and contributors.                                *
 *                                                                       *
 * The NORNS Data Scheduler is free software: you can redistribute it    *
 * and/or modify it under the terms of the GNU Lesser General Public     *
 * License as published by the Free Software Foundation, either          *
 * version 3 of the License, or (at your option) any later version.      *
 *                                                                       *
 * The NORNS Data Scheduler is distributed in the hope that it will be   *
 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty   *
 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU  *
 * Lesser General Public License for more details.                       *
 *                                                                       *
 * You should have received a copy of the GNU Lesser General             *
 * Public License along with the NORNS Data Scheduler.  If not, see      *
 * <http://www.gnu.org/licenses/>.                                       *
 *************************************************************************/

#include "utils.hpp"
#include "logger.hpp"
#include "tar-archive.hpp"

namespace norns {
namespace utils {

tar::tar(const bfs::path& filename, 
         openmode op,
         std::error_code& ec) :
    m_path(filename) {

    constexpr const std::array<int, 2> flags = {
        {O_WRONLY | O_CREAT| O_EXCL, O_RDONLY}
    };

    constexpr const std::array<int, 2> modes = {
        {S_IRUSR | S_IWUSR, 0}
    };

    if(tar_open(&m_tar, m_path.c_str(), NULL, 
                flags[static_cast<int>(op)], 
                modes[static_cast<int>(op)], TAR_GNU) != 0) {
        ec = std::make_error_code(static_cast<std::errc>(errno)); 
        LOGGER_ERROR("Failed to open archive for writing: {}", 
                     logger::errno_message(ec.value()));
        return;
    }
}

void
tar::add_directory(const bfs::path& real_dir, 
                   const bfs::path& archive_dir,
                   std::error_code& ec) {

    const bfs::path rd = 
        norns::utils::remove_trailing_separator(real_dir);
    const bfs::path ad = 
        norns::utils::remove_trailing_separator(archive_dir);

    if(tar_append_tree(m_tar, const_cast<char*>(rd.c_str()),
                        const_cast<char*>(ad.c_str())) != 0) {
        ec = std::make_error_code(static_cast<std::errc>(errno));
        return;
    }

    ec = std::make_error_code(static_cast<std::errc>(0)); 
}

void
tar::extract(const bfs::path& parent_dir, 
             std::error_code& ec) {

    if(m_tar == nullptr) {
        ec = std::make_error_code(static_cast<std::errc>(EINVAL)); 
        return;
    }

    if(tar_extract_all(m_tar, const_cast<char*>(parent_dir.c_str())) != 0) {
        ec = std::make_error_code(static_cast<std::errc>(errno));
    }
}


bfs::path
tar::path() const {
    return m_path;
}

tar::~tar() {

    if(m_tar != nullptr) {
        if(tar_close(m_tar) != 0) {
            LOGGER_ERROR("Failed to close TAR archive: {}",
                        logger::errno_message(errno));
        }
    }
}

} // namespace utils
} // namespace norns
Loading