Loading src/Makefile.am +2 −2 Original line number Diff line number Diff line Loading @@ -107,8 +107,8 @@ liburd_aux_la_CPPFLAGS = \ -I$(top_builddir)/rpc liburd_aux_la_LDFLAGS = \ @TBB_LIBS@ \ @BOOST_ASIO_LIB@ \ @BOOST_FILESYSTEM_LIB@ \ @BOOST_LDFLAGS@ \ @BOOST_PROGRAM_OPTIONS_LIB@ \ @BOOST_SYSTEM_LIB@ \ Loading Loading @@ -171,8 +171,8 @@ urd_CPPFLAGS = \ # '-Wl,--whole-archive,.libs/liburd_aux.a,--no-whole-archive' and to include # several extra dependencies which would not be needed otherwise urd_LDFLAGS = \ @TBB_LIBS@ \ @BOOST_ASIO_LIB@ \ @BOOST_FILESYSTEM_LIB@ \ @BOOST_LDFLAGS@ \ @BOOST_PROGRAM_OPTIONS_LIB@ \ @BOOST_SYSTEM_LIB@ \ Loading src/io-task.cpp +36 −7 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ * * *************************************************************************/ #include <system_error> #include <atomic> #include "norns.h" Loading Loading @@ -69,16 +70,44 @@ void task::operator()() const { LOGGER_WARN("[{}] FROM: {}", m_id, m_src->to_string()); LOGGER_WARN("[{}] TO: {}", m_id, m_dst->to_string()); auto input_stream = data::make_stream(m_src); auto output_stream = data::make_stream(m_dst); // helper lambda for creating streams and reporting errors auto create_stream = [&] (const resource_ptr res, data::stream_type type) -> std::shared_ptr<data::stream> { try { return data::make_stream(res, type); } catch(const std::system_error& error) { LOGGER_ERROR("[{}] Error creating {} stream for {}: \"{}\"", m_id, (type == data::stream_type::input ? "input" : "output"), res->to_string(), error.code().message()); throw; } }; try { auto input_stream = create_stream(m_src, data::stream_type::input); auto output_stream = create_stream(m_dst, data::stream_type::output); //XXX using something like backend->preferred_transfer_size() //would be nice data::buffer b(8192); std::size_t bytes_read = 0; while((bytes_read = input_stream->read(b)) != 0) { while(input_stream->read(b) != 0) { output_stream->write(b); LOGGER_WARN("[{}] {} bytes read from {}", m_id, bytes_read, m_src->to_string()); std::size_t bytes_written = output_stream->write(b); LOGGER_WARN("[{}] {} bytes written to {}", m_id, bytes_written, m_dst->to_string()); } LOGGER_WARN("[{}] I/O task complete", m_id); LOGGER_WARN("[{}] I/O task completed successfully", m_id); } catch(const std::exception& ex) { LOGGER_WARN("[{}] I/O task completed with error", m_id); } } } // namespace io Loading src/resources/local-path.cpp +134 −14 Original line number Diff line number Diff line Loading @@ -25,9 +25,18 @@ * * *************************************************************************/ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <system_error> #include <boost/filesystem.hpp> #include "logger.hpp" #include "backends.hpp" #include "local-path.hpp" namespace bfs = boost::filesystem; namespace data { /*! Remote path data */ Loading @@ -53,37 +62,148 @@ std::string local_path::to_string() const { return "LOCAL_PATH[\"" + m_nsid + "\", \"" + m_datapath + "\"]"; } std::string local_path::datapath() const { return m_datapath; } namespace detail { resource_impl<resource_type::local_posix_path>::resource_impl(std::shared_ptr<resource_info> base_info) : /* Specific resource implementation */ local_path_resource::resource_impl(std::shared_ptr<resource_info> base_info) : m_backend(), m_resource_info(std::static_pointer_cast<local_path>(base_info)) { } std::string resource_impl<resource_type::local_posix_path>::to_string() const { std::string local_path_resource::to_string() const { return m_backend->to_string() + m_resource_info->to_string(); } resource_type resource_impl<resource_type::local_posix_path>::type() const { return resource_type::local_posix_path; // resource_type local_path_resource::type() const { // return resource_type::local_posix_path; // } std::shared_ptr<resource_info> local_path_resource::info() const { return m_resource_info; } void resource_impl<resource_type::local_posix_path>::set_backend(const backend_ptr backend) { std::shared_ptr<storage::backend> local_path_resource::backend() const { return m_backend; } void local_path_resource::set_backend(const std::shared_ptr<storage::backend> backend) { m_backend = backend; } /* Stream implementation */ stream_impl<resource_type::local_posix_path>::stream_impl(std::shared_ptr<resource> resource) { (void) resource; /* Specific stream implementation */ local_path_stream::stream_impl(std::shared_ptr<resource> resource, stream_type type) { // downcast generic resource_info to specific implementation auto info = std::static_pointer_cast<local_path>(resource->info()); bfs::path mount_point = bfs::canonical(resource->backend()->mount()); bfs::path rsrc_path = info->datapath(); bfs::path rsrc_abs_path = mount_point / rsrc_path; // LOGGER_ERROR("rsrc_abs_path: {}", rsrc_abs_path); // if(info->is_directory()) { <--- we can't know unless the user provides it // } // else { // ... // code below // } bfs::path filename = rsrc_abs_path.filename(); bfs::path parents = rsrc_abs_path.parent_path(); switch(type) { case stream_type::input: m_fd = open(rsrc_abs_path.c_str(), O_RDONLY); break; case stream_type::output: m_fd = open(rsrc_abs_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, S_IRUSR | S_IWUSR); break; } if(m_fd == -1) { throw std::system_error(errno, std::generic_category()); } } local_path_stream::~local_path_stream() { if(m_fd != -1) { if(close(m_fd) == -1) { LOGGER_ERROR("Error when closing stream's file descriptor: {}", strerror(errno)); } } } std::size_t local_path_stream::read(buffer& b) { if(m_fd == -1) { throw std::system_error(EBADF, std::generic_category()); } size_t size = b.size(); // max buffer size size_t brecvd = 0; // bytes read size_t bleft = size; // bytes left to read ssize_t n = 0; while(brecvd < size) { n = ::read(m_fd, &b[0] + brecvd, bleft); if(n == -1 || n == 0) { if(errno == EINTR) { continue; } break; } brecvd += n; bleft -= n; } if(n == -1) { throw std::system_error(errno, std::generic_category()); } if(brecvd < size) { b.resize(brecvd); } return brecvd; } std::size_t local_path_stream::write(const buffer& b) { if(m_fd == -1) { throw std::system_error(EBADF, std::generic_category()); } size_t size = b.size(); size_t bsent = 0; // bytes written size_t bleft = size; // bytes left to write ssize_t n = 0; while(bsent < size) { n = ::write(m_fd, &b[0] + bsent, bleft); if(n == -1) { if(errno == EINTR) { continue; } break; } bsent += n; bleft -= n; } std::size_t stream_impl<resource_type::local_posix_path>::read(buffer& b) { (void) b; return 0; if(n == -1) { throw std::system_error(errno, std::generic_category()); } std::size_t stream_impl<resource_type::local_posix_path>::write(const buffer& b) { (void) b; return 0; return bsent; } } // namespace detail Loading src/resources/local-path.hpp +12 −6 Original line number Diff line number Diff line Loading @@ -42,6 +42,8 @@ struct local_path : public resource_info { bool is_remote() const override; std::string to_string() const override; std::string datapath() const; std::string m_nsid; std::string m_datapath; }; Loading @@ -52,22 +54,26 @@ namespace detail { template <> struct resource_impl<resource_type::local_posix_path> : public resource { using backend_ptr = std::shared_ptr<storage::backend>; resource_impl(std::shared_ptr<resource_info> base_info); std::string to_string() const override; resource_type type() const override; void set_backend(const backend_ptr backend); // resource_type type() const override; std::shared_ptr<resource_info> info() const override; std::shared_ptr<storage::backend> backend() const override; void set_backend(const std::shared_ptr<storage::backend> backend) override; backend_ptr m_backend; std::shared_ptr<storage::backend> m_backend; std::shared_ptr<local_path> m_resource_info; }; template <> struct stream_impl<resource_type::local_posix_path> : public data::stream { stream_impl(std::shared_ptr<resource> resource); stream_impl(std::shared_ptr<resource> resource, stream_type type); ~stream_impl(); std::size_t read(buffer& b) override; std::size_t write(const buffer& b) override; int m_fd = -1; }; } // namespace detail Loading src/resources/make-resource.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -39,7 +39,7 @@ namespace data { inline std::shared_ptr<resource> make_resource(std::shared_ptr<resource_info> rinfo) { switch(rinfo->type()) { case data::resource_type::memory_region: return std::make_shared<data::memory_buffer_resource>(rinfo); return std::make_shared<data::memory_region_resource>(rinfo); case data::resource_type::local_posix_path: return std::make_shared<data::local_path_resource>(rinfo); case data::resource_type::shared_posix_path: Loading Loading
src/Makefile.am +2 −2 Original line number Diff line number Diff line Loading @@ -107,8 +107,8 @@ liburd_aux_la_CPPFLAGS = \ -I$(top_builddir)/rpc liburd_aux_la_LDFLAGS = \ @TBB_LIBS@ \ @BOOST_ASIO_LIB@ \ @BOOST_FILESYSTEM_LIB@ \ @BOOST_LDFLAGS@ \ @BOOST_PROGRAM_OPTIONS_LIB@ \ @BOOST_SYSTEM_LIB@ \ Loading Loading @@ -171,8 +171,8 @@ urd_CPPFLAGS = \ # '-Wl,--whole-archive,.libs/liburd_aux.a,--no-whole-archive' and to include # several extra dependencies which would not be needed otherwise urd_LDFLAGS = \ @TBB_LIBS@ \ @BOOST_ASIO_LIB@ \ @BOOST_FILESYSTEM_LIB@ \ @BOOST_LDFLAGS@ \ @BOOST_PROGRAM_OPTIONS_LIB@ \ @BOOST_SYSTEM_LIB@ \ Loading
src/io-task.cpp +36 −7 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ * * *************************************************************************/ #include <system_error> #include <atomic> #include "norns.h" Loading Loading @@ -69,16 +70,44 @@ void task::operator()() const { LOGGER_WARN("[{}] FROM: {}", m_id, m_src->to_string()); LOGGER_WARN("[{}] TO: {}", m_id, m_dst->to_string()); auto input_stream = data::make_stream(m_src); auto output_stream = data::make_stream(m_dst); // helper lambda for creating streams and reporting errors auto create_stream = [&] (const resource_ptr res, data::stream_type type) -> std::shared_ptr<data::stream> { try { return data::make_stream(res, type); } catch(const std::system_error& error) { LOGGER_ERROR("[{}] Error creating {} stream for {}: \"{}\"", m_id, (type == data::stream_type::input ? "input" : "output"), res->to_string(), error.code().message()); throw; } }; try { auto input_stream = create_stream(m_src, data::stream_type::input); auto output_stream = create_stream(m_dst, data::stream_type::output); //XXX using something like backend->preferred_transfer_size() //would be nice data::buffer b(8192); std::size_t bytes_read = 0; while((bytes_read = input_stream->read(b)) != 0) { while(input_stream->read(b) != 0) { output_stream->write(b); LOGGER_WARN("[{}] {} bytes read from {}", m_id, bytes_read, m_src->to_string()); std::size_t bytes_written = output_stream->write(b); LOGGER_WARN("[{}] {} bytes written to {}", m_id, bytes_written, m_dst->to_string()); } LOGGER_WARN("[{}] I/O task complete", m_id); LOGGER_WARN("[{}] I/O task completed successfully", m_id); } catch(const std::exception& ex) { LOGGER_WARN("[{}] I/O task completed with error", m_id); } } } // namespace io Loading
src/resources/local-path.cpp +134 −14 Original line number Diff line number Diff line Loading @@ -25,9 +25,18 @@ * * *************************************************************************/ #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <system_error> #include <boost/filesystem.hpp> #include "logger.hpp" #include "backends.hpp" #include "local-path.hpp" namespace bfs = boost::filesystem; namespace data { /*! Remote path data */ Loading @@ -53,37 +62,148 @@ std::string local_path::to_string() const { return "LOCAL_PATH[\"" + m_nsid + "\", \"" + m_datapath + "\"]"; } std::string local_path::datapath() const { return m_datapath; } namespace detail { resource_impl<resource_type::local_posix_path>::resource_impl(std::shared_ptr<resource_info> base_info) : /* Specific resource implementation */ local_path_resource::resource_impl(std::shared_ptr<resource_info> base_info) : m_backend(), m_resource_info(std::static_pointer_cast<local_path>(base_info)) { } std::string resource_impl<resource_type::local_posix_path>::to_string() const { std::string local_path_resource::to_string() const { return m_backend->to_string() + m_resource_info->to_string(); } resource_type resource_impl<resource_type::local_posix_path>::type() const { return resource_type::local_posix_path; // resource_type local_path_resource::type() const { // return resource_type::local_posix_path; // } std::shared_ptr<resource_info> local_path_resource::info() const { return m_resource_info; } void resource_impl<resource_type::local_posix_path>::set_backend(const backend_ptr backend) { std::shared_ptr<storage::backend> local_path_resource::backend() const { return m_backend; } void local_path_resource::set_backend(const std::shared_ptr<storage::backend> backend) { m_backend = backend; } /* Stream implementation */ stream_impl<resource_type::local_posix_path>::stream_impl(std::shared_ptr<resource> resource) { (void) resource; /* Specific stream implementation */ local_path_stream::stream_impl(std::shared_ptr<resource> resource, stream_type type) { // downcast generic resource_info to specific implementation auto info = std::static_pointer_cast<local_path>(resource->info()); bfs::path mount_point = bfs::canonical(resource->backend()->mount()); bfs::path rsrc_path = info->datapath(); bfs::path rsrc_abs_path = mount_point / rsrc_path; // LOGGER_ERROR("rsrc_abs_path: {}", rsrc_abs_path); // if(info->is_directory()) { <--- we can't know unless the user provides it // } // else { // ... // code below // } bfs::path filename = rsrc_abs_path.filename(); bfs::path parents = rsrc_abs_path.parent_path(); switch(type) { case stream_type::input: m_fd = open(rsrc_abs_path.c_str(), O_RDONLY); break; case stream_type::output: m_fd = open(rsrc_abs_path.c_str(), O_CREAT | O_EXCL | O_WRONLY, S_IRUSR | S_IWUSR); break; } if(m_fd == -1) { throw std::system_error(errno, std::generic_category()); } } local_path_stream::~local_path_stream() { if(m_fd != -1) { if(close(m_fd) == -1) { LOGGER_ERROR("Error when closing stream's file descriptor: {}", strerror(errno)); } } } std::size_t local_path_stream::read(buffer& b) { if(m_fd == -1) { throw std::system_error(EBADF, std::generic_category()); } size_t size = b.size(); // max buffer size size_t brecvd = 0; // bytes read size_t bleft = size; // bytes left to read ssize_t n = 0; while(brecvd < size) { n = ::read(m_fd, &b[0] + brecvd, bleft); if(n == -1 || n == 0) { if(errno == EINTR) { continue; } break; } brecvd += n; bleft -= n; } if(n == -1) { throw std::system_error(errno, std::generic_category()); } if(brecvd < size) { b.resize(brecvd); } return brecvd; } std::size_t local_path_stream::write(const buffer& b) { if(m_fd == -1) { throw std::system_error(EBADF, std::generic_category()); } size_t size = b.size(); size_t bsent = 0; // bytes written size_t bleft = size; // bytes left to write ssize_t n = 0; while(bsent < size) { n = ::write(m_fd, &b[0] + bsent, bleft); if(n == -1) { if(errno == EINTR) { continue; } break; } bsent += n; bleft -= n; } std::size_t stream_impl<resource_type::local_posix_path>::read(buffer& b) { (void) b; return 0; if(n == -1) { throw std::system_error(errno, std::generic_category()); } std::size_t stream_impl<resource_type::local_posix_path>::write(const buffer& b) { (void) b; return 0; return bsent; } } // namespace detail Loading
src/resources/local-path.hpp +12 −6 Original line number Diff line number Diff line Loading @@ -42,6 +42,8 @@ struct local_path : public resource_info { bool is_remote() const override; std::string to_string() const override; std::string datapath() const; std::string m_nsid; std::string m_datapath; }; Loading @@ -52,22 +54,26 @@ namespace detail { template <> struct resource_impl<resource_type::local_posix_path> : public resource { using backend_ptr = std::shared_ptr<storage::backend>; resource_impl(std::shared_ptr<resource_info> base_info); std::string to_string() const override; resource_type type() const override; void set_backend(const backend_ptr backend); // resource_type type() const override; std::shared_ptr<resource_info> info() const override; std::shared_ptr<storage::backend> backend() const override; void set_backend(const std::shared_ptr<storage::backend> backend) override; backend_ptr m_backend; std::shared_ptr<storage::backend> m_backend; std::shared_ptr<local_path> m_resource_info; }; template <> struct stream_impl<resource_type::local_posix_path> : public data::stream { stream_impl(std::shared_ptr<resource> resource); stream_impl(std::shared_ptr<resource> resource, stream_type type); ~stream_impl(); std::size_t read(buffer& b) override; std::size_t write(const buffer& b) override; int m_fd = -1; }; } // namespace detail Loading
src/resources/make-resource.hpp +1 −1 Original line number Diff line number Diff line Loading @@ -39,7 +39,7 @@ namespace data { inline std::shared_ptr<resource> make_resource(std::shared_ptr<resource_info> rinfo) { switch(rinfo->type()) { case data::resource_type::memory_region: return std::make_shared<data::memory_buffer_resource>(rinfo); return std::make_shared<data::memory_region_resource>(rinfo); case data::resource_type::local_posix_path: return std::make_shared<data::local_path_resource>(rinfo); case data::resource_type::shared_posix_path: Loading