Commit 95c268b3 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Improve handling of temporary files

parent 5af171e7
Loading
Loading
Loading
Loading
Loading
+4 −1
Original line number Diff line number Diff line
@@ -189,8 +189,11 @@ liburd_aux_la_SOURCES = \
	urd.hpp	\
	utils.cpp \
	utils.hpp \
	utils/file-handle.hpp \
	utils/tar-archive.cpp \
	utils/tar-archive.hpp
	utils/tar-archive.hpp \
	utils/temporary-file.hpp \
	utils/temporary-file.cpp

nodist_liburd_aux_la_SOURCES = \
	config/defaults.cpp \
+160 −190
Original line number Diff line number Diff line
@@ -44,61 +44,82 @@

namespace {

std::tuple<std::error_code, std::shared_ptr<hermes::mapped_buffer>>
create_file(const bfs::path& filename,
            std::size_t size) {
struct archive_entry {
    bool m_is_directory;
    bfs::path m_realpath;
    bfs::path m_archive_path;
};

    std::error_code ec;
bfs::path
pack_archive(const std::string& name_pattern,
             const bfs::path& parent_path,
             const std::vector<archive_entry>& entries,
             std::error_code& ec) {

    int out_fd = 
        ::open(filename.c_str(), O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
    using norns::utils::tar;

    if(out_fd == -1) {
        ec = std::make_error_code(static_cast<std::errc>(errno));
        return std::make_tuple(ec, nullptr);
    }
    bfs::path ar_path = parent_path / bfs::unique_path(name_pattern);

    // preallocate output file
#ifdef HAVE_FALLOCATE
    if(::fallocate(out_fd, 0, 0, size) == -1) {
        if(errno != EOPNOTSUPP) {
            ec = std::make_error_code(static_cast<std::errc>(errno));
            return std::make_tuple(ec, nullptr);
        }
#endif // HAVE_FALLOCATE
    tar ar(ar_path, tar::create, ec);

        // filesystem doesn't support fallocate(), fallback to truncate()
        if(::ftruncate(out_fd, size) != 0) {
            ec = std::make_error_code(static_cast<std::errc>(errno));
            return std::make_tuple(ec, nullptr);
    if(ec) {
        LOGGER_ERROR("Failed to create archive: {}", ec.message());
        return {};
    }

#ifdef HAVE_FALLOCATE
    LOGGER_INFO("Archive created in {}", ar.path());

    for(auto&& e : entries) {
        e.m_is_directory ?
            ar.add_directory(e.m_realpath, e.m_archive_path, ec) :
            ar.add_file(e.m_realpath, e.m_archive_path, ec);

        if(ec) {
            LOGGER_ERROR("Failed to add entry to archive: {}", ec.message());
            return {};
        }
    }
#endif // HAVE_FALLOCATE

retry_close:
    if(close(out_fd) == -1) {
        if(errno == EINTR) {
            goto retry_close;
    return ar.path();
}

        ec = std::make_error_code(static_cast<std::errc>(errno));
        return std::make_tuple(ec, nullptr);
std::error_code
unpack_archive(const bfs::path& archive_path,
               const bfs::path& parent_path) {

    using norns::utils::tar;
    std::error_code ec;

    boost::system::error_code bec;
    tar ar(archive_path, tar::open, ec);

    if(ec) {
        LOGGER_ERROR("Failed to open archive {}: {}", 
                     archive_path, ec.message());
        return ec;
    }

    auto output_data = 
        std::make_shared<hermes::mapped_buffer>(
            filename.string(), 
            hermes::access_mode::write_only,
            &ec);
    ar.extract(parent_path, ec);

    if(ec) {
        LOGGER_ERROR("Failed mapping output data: {}", ec.value());
        return std::make_tuple(ec, output_data);
        LOGGER_ERROR("Failed to extract archive {} into {}: {}",
                     ar.path(), parent_path, ec.message());
        return ec;
    }

    LOGGER_DEBUG("Archive {} extracted into {}, removing archive", 
                 ar.path(), parent_path);

    bfs::remove(ar.path(), bec);

    if(bec) {
        LOGGER_ERROR("Failed to remove archive {}: {}", 
                        ar.path(), bec.message());
        ec.assign(bec.value(), std::generic_category());
        return ec;
    }

    return std::make_tuple(ec, output_data);
    return ec;
}

} // anonymous namespace
@@ -134,43 +155,44 @@ local_path_to_remote_resource_transferor::transfer(

    (void) auth;

    std::error_code ec;
    const auto& d_src = 
        reinterpret_cast<const data::local_path_resource&>(*src);
    const auto& d_dst = 
        reinterpret_cast<const data::remote_resource&>(*dst);
    auto tempfile = std::make_shared<utils::temporary_file>();

    std::string input_path = d_src.canonical_path().string();
    bfs::path input_path = 
        !d_src.is_collection() ? 
        d_src.canonical_path() : 
        [&]() -> bfs::path {

    if(d_src.is_collection()) {
        LOGGER_DEBUG("[{}] Creating archive for local directory", 
            LOGGER_DEBUG("[{}] Creating temporary archive from local directory", 
                         task_info->id());

        std::error_code ec;

        bfs::path ar_path = 
            "/tmp" / bfs::unique_path("norns-archive-%%%%-%%%%-%%%%.tar");
        
        tar ar(ar_path, tar::create, ec);
            const bfs::path ar_path =
                ::pack_archive("norns-archive-%%%%-%%%%-%%%%.tar",
                               "/tmp",
                               {{true, d_src.canonical_path(), d_dst.name()}},
                               ec);

            if(ec) {
            LOGGER_ERROR("Failed to create archive: {}", ec.message());
            return ec;
                LOGGER_ERROR("Failed to create temporary archive: {}", 
                             ec.message());
                return {};
            }

        LOGGER_INFO("Archive created in {}", ar.path());

        ar.add_directory(d_src.canonical_path(), 
                         d_dst.name(),
                         ec);
            tempfile->manage(ar_path, ec);

            if(ec) {
            LOGGER_ERROR("Failed to add directory to archive: {}", 
                LOGGER_ERROR("Failed to create temporary archive: {}", 
                             ec.message());
            return ec;
                return {};
            }

        input_path = ar.path().string();
    }
            return tempfile->path();
        }(); // <<== XXX (IILE)


    LOGGER_DEBUG("[{}] start_transfer: {} -> {}", 
                 task_info->id(), d_src.canonical_path(), d_dst.to_string());
@@ -178,8 +200,7 @@ local_path_to_remote_resource_transferor::transfer(
    hermes::endpoint endp = m_network_endpoint->lookup(d_dst.address());

    try {
        std::error_code ec;
        hermes::mapped_buffer input_data(input_path,
        hermes::mapped_buffer input_buffer(input_path.string(),
                                           hermes::access_mode::read_only,
                                           &ec);

@@ -189,7 +210,7 @@ local_path_to_remote_resource_transferor::transfer(
        }

        std::vector<hermes::mutable_buffer> bufvec{
            hermes::mutable_buffer{input_data.data(), input_data.size()}
            hermes::mutable_buffer{input_buffer.data(), input_buffer.size()}
        };

        auto buffers = 
@@ -205,75 +226,39 @@ local_path_to_remote_resource_transferor::transfer(
                d_dst.name(),
                buffers);

        LOGGER_DEBUG("rpc::in::args{{");
        LOGGER_DEBUG("  address: \"{}\",", m_network_endpoint->self_address()); 
        LOGGER_DEBUG("  in_nsid: \"{}\",", d_src.parent()->nsid());
        LOGGER_DEBUG("  out_nsid: \"{}\",", d_dst.parent()->nsid());
        LOGGER_DEBUG("  btype: {} ({}),",
                     static_cast<uint32_t>(backend_type::posix_filesystem),
                     utils::to_string(backend_type::posix_filesystem));
        LOGGER_DEBUG("  rtype: {} ({}),",
                     static_cast<uint32_t>(data::resource_type::local_posix_path), 
                     utils::to_string(data::resource_type::local_posix_path));
        LOGGER_DEBUG("  rname: \"{}\",", d_dst.name());
        LOGGER_DEBUG("  buffers: {{...}}");
        LOGGER_DEBUG("};");
        LOGGER_FLUSH();

        auto resp = 
            m_network_endpoint->post<rpc::remote_transfer>(endp, args).get();
            m_network_endpoint->post<rpc::remote_transfer>(
                endp, 
                rpc::remote_transfer::input{
                    m_network_endpoint->self_address(),
                    d_src.parent()->nsid(),
                    d_dst.parent()->nsid(), 
                    static_cast<uint32_t>(backend_type::posix_filesystem),
                    static_cast<uint32_t>(
                        data::resource_type::local_posix_path), 
                    d_src.is_collection(),
                    d_dst.name(),
                    buffers
                }).get();

        if(static_cast<task_status>(resp.at(0).status()) ==
            task_status::finished_with_error) {

            // XXX it would probably be worth it to define
            // a temporary_file class that cleans itself when
            // destroyed to ease removal and avoid code duplication
            // below
            if(src->is_collection()) {
                boost::system::error_code bec;

                bfs::remove(input_path, bec);

                if(bec) {
                    LOGGER_ERROR("Failed to remove archive {}: {}", 
                                input_path, ec.message());
                    //TODO
                    return std::make_error_code(
                        static_cast<std::errc>(bec.value()));
                }
            }

            // XXX error interface should be improved
            return std::make_error_code(
                static_cast<std::errc>(resp.at(0).sys_errnum()));
        }

        task_info->record_transfer(input_data.size(), 
        task_info->record_transfer(input_buffer.size(), 
                                   resp.at(0).elapsed_time());

        LOGGER_DEBUG("Remote request completed with output "
                     "{{status: {}, task_error: {}, sys_errnum: {}}} "
                     "({} bytes, {} usecs)",
                    resp.at(0).status(), resp.at(0).task_error(), 
                    resp.at(0).sys_errnum(), input_data.size(), 
                    resp.at(0).sys_errnum(), input_buffer.size(), 
                    resp.at(0).elapsed_time());

        if(src->is_collection()) {
            boost::system::error_code bec;

            bfs::remove(input_path, bec);

            if(bec) {
                LOGGER_ERROR("Failed to remove archive {}: {}", 
                             input_path, ec.message());
                //TODO
                return std::make_error_code(
                    static_cast<std::errc>(bec.value()));
            }
        }

        return std::make_error_code(static_cast<std::errc>(0));
        return ec;
    }
    catch(const std::exception& ex) {
        LOGGER_ERROR(ex.what());
@@ -290,26 +275,22 @@ local_path_to_remote_resource_transferor::accept_transfer(
        const std::shared_ptr<const data::resource>& dst) const {

    (void) auth;
    (void) src;
    (void) dst;

    using utils::tar;

    std::error_code ec;
    const auto& d_src = 
        reinterpret_cast<const data::remote_resource&>(*src);
    const auto& d_dst = 
        reinterpret_cast<const data::local_path_resource&>(*dst);

    // retrieve task context
    const auto ctx = boost::any_cast<
        std::shared_ptr<
            hermes::request<rpc::remote_transfer>>>(task_info->context());
    auto req = std::move(*ctx);

    LOGGER_DEBUG("[{}] accept_transfer: {} -> {}", task_info->id(),
    LOGGER_DEBUG("[{}] accept_push: {} -> {}", task_info->id(),
            d_src.to_string(), d_dst.canonical_path());

    LOGGER_WARN("Invoking rpc::remote_transfer({}) on {}", d_dst.name(), "xxx");

    hermes::exposed_memory remote_buffers = d_src.buffers();

    LOGGER_DEBUG("remote_buffers{{count={}, total_size={}}}",
@@ -320,111 +301,100 @@ local_path_to_remote_resource_transferor::accept_transfer(

    bool is_collection = d_src.is_collection();

    bfs::path output_path = 
        is_collection ? "/tmp/test.tar" : d_dst.canonical_path();
    auto tempfile = 
        std::make_shared<utils::temporary_file>(
            /* output_path */
            std::string{is_collection ? 
                "norns-archive-%%%%-%%%%-%%%%.tar" :
                d_dst.name()},
            /* parent_path */
            bfs::path{is_collection ?
                "/tmp" :
                d_dst.parent()->mount()},
            remote_buffers.size(),
            ec);

    LOGGER_DEBUG("creating local resource: {}", output_path);
    if(ec) {
        LOGGER_ERROR("Failed to create temporary file: {}", ec.message());
        *ctx = std::move(req); // restore ctx
        return ec;
    }

    std::error_code ec;
    std::shared_ptr<hermes::mapped_buffer> output_data;
    LOGGER_DEBUG("created local resource: {}", tempfile->path());

    std::tie(ec, output_data) = 
        ::create_file(output_path, remote_buffers.size());
    auto output_buffer = 
        std::make_shared<hermes::mapped_buffer>(
                tempfile->path().string(),
                hermes::access_mode::write_only,
                &ec);

    if(ec) {
        *ctx = std::move(req);
        LOGGER_ERROR("Failed mmapping output buffer: {}", ec.value());
        *ctx = std::move(req); // restore ctx
        return ec;
    }

    // let's prepare some local buffers
    
    std::vector<hermes::mutable_buffer> bufseq{
        hermes::mutable_buffer{output_data->data(), output_data->size()}
        hermes::mutable_buffer{output_buffer->data(), output_buffer->size()}
    };

    hermes::exposed_memory local_buffers =
        m_network_endpoint->expose(bufseq, hermes::access_mode::write_only);

    LOGGER_DEBUG("pulling remote data into {}", output_path);
    LOGGER_DEBUG("pulling remote data into {}", tempfile->path());

    auto start = std::chrono::steady_clock::now();

    // N.B. IMPORTANT: we NEED to capture output_data by value here so that
    // N.B. IMPORTANT: we NEED to capture output_buffer by value here so that
    // the mapped_buffer doesn't get released before completion_callback()
    // is called.
    const auto completion_callback = 
        [this, is_collection, output_path, d_dst, output_data, start](
        [this, is_collection, tempfile, d_dst, output_buffer, start](
            hermes::request<rpc::remote_transfer>&& req) {

        uint32_t usecs = 
            std::chrono::duration_cast<std::chrono::microseconds>(
                std::chrono::steady_clock::now() - start).count();

        // default response
        rpc::remote_transfer::output out(
        //TODO: hermes offers no way to check for an error yet
        LOGGER_DEBUG("Pull completed ({} usecs)", usecs);

        // default response (success)
        rpc::remote_transfer::output out = {
                static_cast<uint32_t>(task_status::finished),
                static_cast<uint32_t>(urd_error::success),
                0,
                usecs);

        //TODO: hermes offers no way to check for an error yet
        LOGGER_DEBUG("Pull completed ({} usecs)", usecs);
                0};

        if(is_collection) {
            std::error_code ec;
            boost::system::error_code bec;
            tar ar(output_path, tar::open, ec);
            std::error_code ec = 
                ::unpack_archive(tempfile->path(), d_dst.parent()->mount());

            if(ec) {
                LOGGER_ERROR("Failed to open archive {}: {}", 
                             output_path, ec.message());

                out = std::move(rpc::remote_transfer::output{
                    static_cast<uint32_t>(task_status::finished_with_error),
                out = rpc::remote_transfer::output{
                        static_cast<uint32_t>(
                            task_status::finished_with_error),
                        static_cast<uint32_t>(urd_error::system_error),
                        static_cast<uint32_t>(ec.value()),
                    0
                });
                goto respond;
            }
                        0};

            ar.extract(d_dst.parent()->mount(), ec);

            if(ec) {
                LOGGER_ERROR("Failed to extract archive into {}: {}",
                             ar.path(), d_dst.parent()->mount(), 
                             ec.message());
                out = std::move(rpc::remote_transfer::output{
                    static_cast<uint32_t>(task_status::finished_with_error),
                    static_cast<uint32_t>(urd_error::system_error),
                    static_cast<uint32_t>(ec.value()),
                    0
                });
                goto respond;
            }

            LOGGER_DEBUG("Archive {} extracted into {}", 
                         ar.path(), d_dst.parent()->mount());

            bfs::remove(ar.path(), bec);
                         tempfile->path(), d_dst.parent()->mount());

            if(bec) {
                LOGGER_ERROR("Failed to remove archive {}: {}", 
                             ar.path(), ec.message());
                out = std::move(rpc::remote_transfer::output{
                    static_cast<uint32_t>(task_status::finished_with_error),
                    static_cast<uint32_t>(urd_error::system_error),
                    static_cast<uint32_t>(bec.value()),
                    0
                });
            }
            goto respond;
        }

        // prevent output file from being removed by tempfile's destructor
        (void) tempfile->release();

respond:
        if(req.requires_response()) {
            m_network_endpoint->respond<rpc::remote_transfer>(
                    std::move(req), 
                    out);
                    std::move(req), out);
        }
    };

@@ -433,7 +403,7 @@ respond:
                                   std::move(req),
                                   completion_callback);

    return std::make_error_code(static_cast<std::errc>(0));
    return ec;
}

std::string 
+159 −163

File changed.

Preview size limit exceeded, changes collapsed.

+1 −0
Original line number Diff line number Diff line
@@ -37,6 +37,7 @@

#include "common.hpp"
#include "utils/tar-archive.hpp"
#include "utils/temporary-file.hpp"

namespace norns {
namespace utils {
+87 −0
Original line number Diff line number Diff line
/*************************************************************************
 * Copyright (C) 2017-2019 Barcelona Supercomputing Center               *
 *                         Centro Nacional de Supercomputacion           *
 * All rights reserved.                                                  *
 *                                                                       *
 * This file is part of the NORNS Data Scheduler, a service that allows  *
 * other programs to start, track and manage asynchronous transfers of   *
 * data resources transfers requests between different storage backends. *
 *                                                                       *
 * See AUTHORS file in the top level directory for information           *
 * regarding developers and contributors.                                *
 *                                                                       *
 * The NORNS Data Scheduler is free software: you can redistribute it    *
 * and/or modify it under the terms of the GNU Lesser General Public     *
 * License as published by the Free Software Foundation, either          *
 * version 3 of the License, or (at your option) any later version.      *
 *                                                                       *
 * The NORNS Data Scheduler is distributed in the hope that it will be   *
 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty   *
 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU  *
 * Lesser General Public License for more details.                       *
 *                                                                       *
 * You should have received a copy of the GNU Lesser General             *
 * Public License along with the NORNS Data Scheduler.  If not, see      *
 * <http://www.gnu.org/licenses/>.                                       *
 *************************************************************************/

#include "logger.hpp"

namespace norns {
namespace utils {

struct file_handle {

    constexpr static const int init_value{-1};

    file_handle() = default;

    explicit file_handle(int fd) noexcept :
        m_fd(fd) { }

    file_handle(file_handle&& rhs) = default;
    file_handle(const file_handle& other) = delete;
    file_handle& operator=(file_handle&& rhs) = default;
    file_handle& operator=(const file_handle& other) = delete;

    explicit operator bool() const noexcept {
        return valid();
    }

    bool operator!() const noexcept {
        return !valid();
    }

    bool
    valid() const noexcept {
        return m_fd != init_value;
    }

    int
    native() const noexcept {
        return m_fd;
    }

    int
    release() noexcept {
        int ret = m_fd;
        m_fd = init_value;
        return ret;
    }

    ~file_handle() {
        if(m_fd != init_value) {
            if(::close(m_fd) == -1) {
                LOGGER_ERROR("Failed to close file descriptor: {}",
                             logger::errno_message(errno));
            }
        }
    }

    int m_fd;
};



} // namespace utils
} // namespace norns
Loading