From ab00c5922105e6c1c8f389e708d326a3f083bc7c Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 24 Nov 2023 19:17:14 +0100 Subject: [PATCH 01/11] Fix fallocate error handling in posix_file::file::fallocate --- spack/packages/cargo/package.py | 1 + src/posix_file/posix_file/file.hpp | 18 +++++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index cccab79..ae1e451 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -39,6 +39,7 @@ class Cargo(CMakePackage): version("0.3.1", sha256="613485354e24c4b97cb6d045657569f94dc1d9bbb391b5a166f8d18b3595428b") version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960") version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd") + version("0.3.4", branch="rnou/fallocate") # build variants variant('build_type', default='Release', diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 03b4bc7..83a1324 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -182,7 +182,7 @@ public: fallocate(int mode, offset offset, std::size_t len) const { if(!m_handle) { - throw io_error("posix_file::file::fallocate", EBADF); + throw io_error("posix_file::file::fallocate (handle)", EBADF); } int ret = ::fallocate(m_handle.native(), mode, offset, @@ -190,15 +190,15 @@ public: if(ret == -1) { // Try an alternative to fallocate for beegfs - if(errno == EOPNOTSUPP) { - ret = ::posix_fallocate(m_handle.native(), offset, - static_cast(len)); - if (ret == -1) { - throw io_error("posix_file::file::posix_fallocate", errno); - } else return; - } + + ret = ::posix_fallocate(m_handle.native(), offset, + static_cast(len)); + if(ret == -1) { + throw io_error("posix_file::file::posix_fallocate", errno); + } else + return; + throw io_error("posix_file::file::fallocate", errno); - } } -- GitLab From be7a3755e67bba0b45c2725199998cadfc2f19eb Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 28 Nov 2023 12:32:32 +0100 Subject: [PATCH 02/11] Fix progress loop and add comments --- src/worker/worker.cpp | 67 +++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 34 deletions(-) diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 00a3b3c..12567b5 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -74,7 +74,7 @@ worker::set_output_file(std::filesystem::path output_file) { m_output_file = std::move(output_file); } -void +void worker::set_block_size(std::uint64_t block_size) { m_block_size = block_size; } @@ -107,40 +107,39 @@ worker::run() { bool done = false; while(!done) { + // Always loop pending operations + + auto I = m_ops.begin(); + auto IE = m_ops.end(); + if(I != IE) { + auto op = I->second.first.get(); + int index = I->second.second; + if(op) { + + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + 0.0f, ec); - auto msg = world.iprobe(); - - if(!msg) { - // FIXME: sleep time should be configurable - - // Progress through all transfers - - auto I = m_ops.begin(); - auto IE = m_ops.end(); - if(I != IE) { - auto op = I->second.first.get(); - int index = I->second.second; - if(op) { - - index = op->progress(index); - if(index == -1) { - // operation finished - cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - 0.0f, ec); - - // Transfer finished - I = m_ops.erase(I); - } else { - update_state(op->source(), op->tid(), op->seqno(), - transfer_state::running, op->bw()); - I->second.second = index; - } + // Transfer finished + I = m_ops.erase(I); + } else { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running, op->bw()); + I->second.second = index; } } + } + + auto msg = world.iprobe(); + + if(!msg) { + // Only wait if there are no pending operations and no messages if(m_ops.size() == 0) { std::this_thread::sleep_for(150ms); } @@ -158,9 +157,9 @@ worker::run() { LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); m_ops.emplace(std::make_pair( make_pair(m.input_path(), m.output_path()), - make_pair(operation::make_operation(t, workers, - m.input_path(), - m.output_path(), m_block_size), + make_pair(operation::make_operation( + t, workers, m.input_path(), + m.output_path(), m_block_size), 0))); const auto op = -- GitLab From 0875bbcdb8cb68649f078c4157d00e4d81eec999 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 28 Nov 2023 12:54:21 +0100 Subject: [PATCH 03/11] Fix update_state() function to only update if BW is set --- src/worker/worker.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 12567b5..d98000e 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -115,7 +115,6 @@ worker::run() { auto op = I->second.first.get(); int index = I->second.second; if(op) { - index = op->progress(index); if(index == -1) { // operation finished @@ -128,8 +127,11 @@ worker::run() { // Transfer finished I = m_ops.erase(I); } else { - update_state(op->source(), op->tid(), op->seqno(), - transfer_state::running, op->bw()); + // update only if BW is set + if(op->bw() > 0.0f) { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running, op->bw()); + } I->second.second = index; } } @@ -188,7 +190,6 @@ worker::run() { for(auto I = m_ops.begin(); I != m_ops.end(); I++) { const auto op = I->second.first.get(); if(op) { - op->set_bw_shaping(m.shaping()); } else { LOGGER_INFO("Operation non existent", msg->source(), m); -- GitLab From b9748419f4b4345a81177514d0a64fb388cd1714 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 28 Nov 2023 13:20:43 +0100 Subject: [PATCH 04/11] Add progress logging and fix iterator increment --- src/worker/worker.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index d98000e..140c676 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -116,6 +116,7 @@ worker::run() { int index = I->second.second; if(op) { index = op->progress(index); + LOGGER_INFO("Progress: {}", index); if(index == -1) { // operation finished cargo::error_code ec = op->progress(); @@ -133,6 +134,7 @@ worker::run() { transfer_state::running, op->bw()); } I->second.second = index; + ++I; } } } -- GitLab From 538ae154531beaac22573686ad38d1b257383419 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 28 Nov 2023 13:42:10 +0100 Subject: [PATCH 05/11] BW needs to take into account sleep --- src/worker/mpio_read.cpp | 7 ++++--- src/worker/mpio_write.cpp | 9 +++++---- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index e9f6024..89dc8c8 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -32,7 +32,8 @@ namespace cargo { mpio_read::mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) + std::filesystem::path output_path, + std::uint64_t block_size) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} @@ -179,6 +180,8 @@ mpio_read::progress(int ongoing_index) { auto start = std::chrono::steady_clock::now(); m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), file_range.size()); + // Do sleep + std::this_thread::sleep_for(sleep_value()); auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -192,8 +195,6 @@ mpio_read::progress(int ongoing_index) { m_block_size / 1024.0, elapsed_seconds, bw(), sleep_value()); } - // Do sleep - std::this_thread::sleep_for(sleep_value()); ++index; } diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 5a84ca7..bcfb80a 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -40,7 +40,7 @@ mpio_write::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); - std::size_t block_size = m_kb_size * 1024u; + std::size_t block_size = m_kb_size * 1024u; std::size_t file_size = std::filesystem::file_size(m_input_path); // compute the number of blocks in the file @@ -140,6 +140,10 @@ mpio_write::progress(int ongoing_index) { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -153,9 +157,6 @@ mpio_write::progress(int ongoing_index) { sleep_value()); } - m_bytes_per_rank += n; - // Do sleep - std::this_thread::sleep_for(sleep_value()); ++index; } -- GitLab From 12818a8b7b2ff8ff1c13b7133c8de155624d8cd3 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 28 Nov 2023 14:48:55 +0100 Subject: [PATCH 06/11] Remove progress logging statement in worker::run() function --- src/worker/worker.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 140c676..04a941a 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -116,7 +116,6 @@ worker::run() { int index = I->second.second; if(op) { index = op->progress(index); - LOGGER_INFO("Progress: {}", index); if(index == -1) { // operation finished cargo::error_code ec = op->progress(); -- GitLab From f7621c66059621cbab1e9b100681d6d411f382d3 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 1 Dec 2023 14:26:46 +0100 Subject: [PATCH 07/11] Update to use gkfs_user_library (branch fmt10)) --- CMakeLists.txt | 5 ++ cli/copy.cpp | 25 +++---- cmake/FindGekkoFS.cmake | 65 +++++++++++++++++++ lib/cargo.hpp | 5 +- lib/libcargo.cpp | 5 ++ src/master.cpp | 12 ++-- src/posix_file/CMakeLists.txt | 23 ++++++- src/posix_file/posix_file/file.hpp | 63 +++++++++++------- .../posix_file/fs_plugin/fs_plugin.cpp | 22 +++++++ .../posix_file/fs_plugin/fs_plugin.hpp | 41 ++++++++++++ .../posix_file/fs_plugin/gekko_plugin.cpp | 65 +++++++++++++++++++ .../posix_file/fs_plugin/gekko_plugin.hpp | 34 ++++++++++ .../posix_file/fs_plugin/posix_plugin.cpp | 49 ++++++++++++++ .../posix_file/fs_plugin/posix_plugin.hpp | 29 +++++++++ src/proto/mpi/message.hpp | 12 +++- src/worker/mpio_read.cpp | 7 +- src/worker/mpio_read.hpp | 5 +- src/worker/mpio_write.cpp | 2 +- src/worker/mpio_write.hpp | 5 +- src/worker/ops.cpp | 8 +-- src/worker/ops.hpp | 3 +- src/worker/sequential.hpp | 5 +- src/worker/worker.cpp | 2 +- 23 files changed, 429 insertions(+), 63 deletions(-) create mode 100644 cmake/FindGekkoFS.cmake create mode 100644 src/posix_file/posix_file/fs_plugin/fs_plugin.cpp create mode 100644 src/posix_file/posix_file/fs_plugin/fs_plugin.hpp create mode 100644 src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp create mode 100644 src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp create mode 100644 src/posix_file/posix_file/fs_plugin/posix_plugin.cpp create mode 100644 src/posix_file/posix_file/fs_plugin/posix_plugin.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index f76e44f..918361f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -260,6 +260,11 @@ FetchContent_Declare( FetchContent_MakeAvailable(expected) +### GekkoFS: Optional for gekkofs +find_package(GekkoFS) +if (GekkoFS_FOUND) + add_compile_definitions(GEKKOFS_PLUGIN) +endif() ### Threads: required by ASIO find_package(Threads REQUIRED) diff --git a/cli/copy.cpp b/cli/copy.cpp index 745263a..8f0188d 100644 --- a/cli/copy.cpp +++ b/cli/copy.cpp @@ -29,19 +29,24 @@ #include #include -enum class dataset_flags { posix, mpio }; +enum class dataset_flags { posix, parallel, none, gekkofs, hercules, expand, dataclay }; -std::map dataset_flags_map{ - {"posix", dataset_flags::posix}, - {"mpio", dataset_flags::mpio}}; +std::map dataset_flags_map{ + {"posix", cargo::dataset::type::posix}, + {"parallel", cargo::dataset::type::parallel}, + {"none", cargo::dataset::type::none}, + {"gekkofs", cargo::dataset::type::gekkofs}, + {"hercules", cargo::dataset::type::hercules}, + {"expand", cargo::dataset::type::expand}, + {"dataclay", cargo::dataset::type::dataclay}}; struct copy_config { std::string progname; std::string server_address; std::vector inputs; - dataset_flags input_flags = dataset_flags::posix; + cargo::dataset::type input_flags = cargo::dataset::type::posix; std::vector outputs; - dataset_flags output_flags = dataset_flags::posix; + cargo::dataset::type output_flags = cargo::dataset::type::posix; }; copy_config @@ -119,17 +124,13 @@ main(int argc, char* argv[]) { std::transform(cfg.inputs.cbegin(), cfg.inputs.cend(), std::back_inserter(inputs), [&](const auto& src) { return cargo::dataset{ - src, cfg.input_flags == dataset_flags::mpio - ? cargo::dataset::type::parallel - : cargo::dataset::type::posix}; + src, cfg.input_flags}; }); std::transform(cfg.outputs.cbegin(), cfg.outputs.cend(), std::back_inserter(outputs), [&cfg](const auto& tgt) { return cargo::dataset{ - tgt, cfg.output_flags == dataset_flags::mpio - ? cargo::dataset::type::parallel - : cargo::dataset::type::posix}; + tgt, cfg.output_flags}; }); const auto tx = cargo::transfer_datasets(server, inputs, outputs); diff --git a/cmake/FindGekkoFS.cmake b/cmake/FindGekkoFS.cmake new file mode 100644 index 0000000..4e98d61 --- /dev/null +++ b/cmake/FindGekkoFS.cmake @@ -0,0 +1,65 @@ +################################################################################ +# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(GekkoFS_INCLUDE_DIR + NAMES user_functions.hpp + PREFIX gkfs +) + +find_library(GekkoFS_LIBRARY + NAMES libgkfs_user_lib.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + GekkoFS + DEFAULT_MSG + GekkoFS_INCLUDE_DIR + GekkoFS_LIBRARY +) + +if(GekkoFS_FOUND) + set(GekkoFS_LIBRARIES ${GekkoFS__LIBRARY}) + set(GekkoFS_INCLUDE_DIRS ${GekkoFS__INCLUDE_DIR}) + + + if(NOT TARGET GekkoFS::GekkoFS) + add_library(GekkoFS::GekkoFS UNKNOWN IMPORTED) + set_target_properties(GekkoFS::GekkoFS PROPERTIES + IMPORTED_LOCATION "${GekkoFS_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${GekkoFS_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + GekkoFS_INCLUDE_DIR + GekkoFS_LIBRARY +) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index ca15aa7..1c62d1c 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -77,6 +77,9 @@ public: void path(std::string path); + [[nodiscard]] dataset::type + get_type() const; + template void serialize(Archive& ar) { @@ -86,7 +89,7 @@ public: private: std::string m_path; - dataset::type m_type = dataset::type::none; + dataset::type m_type; }; diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 65fc726..7968eda 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -77,6 +77,11 @@ dataset::path(std::string path) { m_path = std::move(path); }; +dataset::type +dataset::get_type() const { + return m_type; +}; + bool dataset::supports_parallel_transfer() const noexcept { return m_type == dataset::type::parallel; diff --git a/src/master.cpp b/src/master.cpp index a6c0a4d..350bbf4 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -50,18 +50,18 @@ make_message(std::uint64_t tid, std::uint32_t seqno, if(input.supports_parallel_transfer()) { return std::make_tuple(static_cast(cargo::tag::pread), cargo::transfer_message{tid, seqno, input.path(), - output.path()}); + output.path(), static_cast(output.get_type())}); } if(output.supports_parallel_transfer()) { return std::make_tuple(static_cast(cargo::tag::pwrite), cargo::transfer_message{tid, seqno, input.path(), - output.path()}); + output.path(), static_cast(input.get_type())}); } return std::make_tuple( static_cast(cargo::tag::sequential), - cargo::transfer_message{tid, seqno, input.path(), output.path()}); + cargo::transfer_message{tid, seqno, input.path(), output.path(), static_cast(input.get_type())}); } } // namespace @@ -303,12 +303,12 @@ master_server::transfer_datasets(const network::request& req, const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; - // Create the directory if it does not exist - if(!std::filesystem::path(d.path()).parent_path().empty()) { + // Create the directory if it does not exist (only in parallel transfer) + if(!std::filesystem::path(d.path()).parent_path().empty() and d.supports_parallel_transfer()) { std::filesystem::create_directories( std::filesystem::path(d.path()).parent_path()); } - + for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), i, s, d); LOGGER_INFO("msg <= to: {} body: {}", rank, m); diff --git a/src/posix_file/CMakeLists.txt b/src/posix_file/CMakeLists.txt index b916a85..acf0816 100644 --- a/src/posix_file/CMakeLists.txt +++ b/src/posix_file/CMakeLists.txt @@ -22,21 +22,38 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_library(posix_file INTERFACE) +add_library(posix_file STATIC) +set(GEKKO_INCLUDES "") +if (GEKKOFS_PLUGIN) + set(GEKKO_INCLUDES posix_file/fs_plugin/gekko_plugin.cpp posix_file/fs_plugin/gekko_plugin.cpp) +endif() target_sources( posix_file - INTERFACE posix_file/types.hpp + PRIVATE posix_file/types.hpp posix_file/file.hpp posix_file/ranges.hpp posix_file/views.hpp posix_file/math.hpp posix_file/views/block_iterator.hpp posix_file/views/strided_iterator.hpp + posix_file/fs_plugin/fs_plugin.hpp + posix_file/fs_plugin/posix_plugin.hpp + posix_file/fs_plugin/fs_plugin.cpp + posix_file/fs_plugin/posix_plugin.cpp + ${GEKKO_INCLUDES} + ) + target_include_directories(posix_file INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) set_property(TARGET posix_file PROPERTY POSITION_INDEPENDENT_CODE ON) +set(ADHOC "") + +if (GEKKOFS_PLUGIN) +set(ADHOC ${$ADHOC} GekkoFS::GekkoFS) +endif() + +target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected PRIVATE ${ADHOC}) -target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 83a1324..5114885 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -31,7 +31,9 @@ #include #include #include - +#include "fs_plugin/fs_plugin.hpp" +#include "cargo.hpp" +#include extern "C" { #include }; @@ -151,12 +153,20 @@ private: class file { + public: + file(cargo::FSPlugin::type t) { + m_fs_plugin = cargo::FSPlugin::make_fs(t); + }; + explicit file(std::filesystem::path filepath) noexcept : m_path(std::move(filepath)) {} - file(std::filesystem::path filepath, int fd) noexcept - : m_path(std::move(filepath)), m_handle(fd) {} + file(std::filesystem::path filepath, int fd, + std::unique_ptr fs_plugin) noexcept + : m_path(std::move(filepath)), m_handle(fd), + m_fs_plugin(std::move(fs_plugin)) {} + std::filesystem::path path() const noexcept { @@ -185,19 +195,10 @@ public: throw io_error("posix_file::file::fallocate (handle)", EBADF); } - int ret = ::fallocate(m_handle.native(), mode, offset, + int ret = m_fs_plugin->fallocate(m_handle.native(), mode, offset, static_cast(len)); if(ret == -1) { - // Try an alternative to fallocate for beegfs - - ret = ::posix_fallocate(m_handle.native(), offset, - static_cast(len)); - if(ret == -1) { - throw io_error("posix_file::file::posix_fallocate", errno); - } else - return; - throw io_error("posix_file::file::fallocate", errno); } } @@ -217,8 +218,9 @@ public: while(bytes_read < size) { - ssize_t n = ::pread(m_handle.native(), buf.data() + bytes_read, - bytes_left, offset + bytes_read); + ssize_t n = m_fs_plugin->pread(m_handle.native(), + buf.data() + bytes_read, bytes_left, + offset + bytes_read); if(n == 0) { // EOF @@ -257,8 +259,9 @@ public: while(bytes_written < size) { - ssize_t n = ::pwrite(m_handle.native(), buf.data() + bytes_written, - bytes_left, offset + bytes_written); + ssize_t n = m_fs_plugin->pwrite(m_handle.native(), + buf.data() + bytes_written, + bytes_left, offset + bytes_written); if(n == -1) { // Interrupted by a signal, retry @@ -277,26 +280,40 @@ public: return bytes_written; } + protected: const std::filesystem::path m_path; file_handle m_handle; + std::unique_ptr m_fs_plugin; }; + static inline file -open(const std::filesystem::path& filepath, int flags, ::mode_t mode = 0) { +open(const std::filesystem::path& filepath, int flags, ::mode_t mode, + cargo::FSPlugin::type t) { - int fd = ::open(filepath.c_str(), flags, mode); + std::unique_ptr fs_plugin; + fs_plugin = cargo::FSPlugin::make_fs(t); + // We don't check if it exists, we just create it if flags is set to O_CREAT + + if(flags & O_CREAT) { + fs_plugin->mkdir(filepath.parent_path().c_str(), 0755); + } + std::cout << "Opening file " << filepath << std::endl; + int fd = fs_plugin->open(filepath.c_str(), flags, mode); + std::cout << "File opened? " << fd << " -- " << flags << " mode: " << mode << std::endl; if(fd == -1) { - throw io_error("posix_file::open", errno); + throw io_error("posix_file::open_gekko", errno); } - return file{filepath, fd}; + return file{filepath, fd, std::move(fs_plugin)}; } static inline file -create(const std::filesystem::path& filepath, int flags, ::mode_t mode) { - return open(filepath, O_CREAT | flags, mode); +create(const std::filesystem::path& filepath, int flags, ::mode_t mode, + cargo::FSPlugin::type t) { + return open(filepath, O_CREAT | flags, mode, t); } } // namespace posix_file diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp new file mode 100644 index 0000000..8d45833 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -0,0 +1,22 @@ +#include "fs_plugin.hpp" +#include "posix_plugin.hpp" +#ifdef GEKKO_PLUGIN +#include "gekko_plugin.hpp" +#endif +namespace cargo { + +std::unique_ptr +FSPlugin::make_fs(type t) { + + switch(t) { + case type::posix: + return std::make_unique(); +#ifdef GEKKO_PLUGIN + case type::gekkofs: + return std::make_unique(); +#endif + default: + return {}; + } +} +} // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp new file mode 100644 index 0000000..c98e74a --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp @@ -0,0 +1,41 @@ +#ifndef FS_PLUGIN_HPP +#define FS_PLUGIN_HPP + +#include +#include +#include +#include + +namespace cargo { +class FSPlugin { +public: + enum class type { + posix, + parallel, + none, + gekkofs, + hercules, + expand, + dataclay + }; + static std::unique_ptr make_fs(type); + + virtual ~FSPlugin() = default; + + virtual int + open(const std::string& path, int flags, unsigned int mode) = 0; + virtual bool + close(int fd) = 0; + virtual ssize_t + pread(int fd, void* buf, size_t count, off_t offset) = 0; + virtual ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) = 0; + virtual bool + mkdir(const std::string& path, mode_t mode) = 0; + virtual off_t + lseek(int fd, off_t offset, int whence) = 0; + virtual off_t + fallocate(int fd, int mode, off_t offset, off_t len) = 0; +}; +} // namespace cargo +#endif // FS_PLUGIN_HPP \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp new file mode 100644 index 0000000..1ff0caa --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -0,0 +1,65 @@ + +#include "fs_plugin.hpp" +#include "gekko_plugin.hpp" +#include + +#include +namespace cargo { +gekko_plugin::gekko_plugin() { + int result = gkfs_init(); + if (result != 0) { + std::cerr << "Failed to initialize gekkofs" << std::endl; + } +} + +gekko_plugin::~gekko_plugin() { + int result = gkfs_end(); + if (result != 0) { + std::cerr << "Failed to finalize gekkofs" << std::endl; + } +} +// Override the open function +int +gekko_plugin::open(const std::string& path, int flags, unsigned int mode) { + // Call to gekkofs has the signature inverted + return gkfs::syscall::gkfs_open(path, mode, flags); +} + +// Override the pread function +ssize_t +gekko_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return gkfs::syscall::gkfs_pread_ws(fd, buf, count, offset); +} + +// Override the pwrite function +ssize_t +gekko_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, offset); +} + + +bool +gekko_plugin::mkdir(const std::string& path, mode_t mode) { + int result = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); + return result; +} + +bool +gekko_plugin::close(int fd) { + return gkfs::syscall::gkfs_close(fd); +} + +off_t +gekko_plugin::lseek(int fd, off_t offset, int whence) { + return gkfs::syscall::gkfs_lseek(fd, offset, whence); +} + +off_t +gekko_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp new file mode 100644 index 0000000..dee1e3d --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp @@ -0,0 +1,34 @@ + +#ifndef GEKKO_PLUGIN_HPP +#define GEKKO_PLUGIN_HPP + +#include "fs_plugin.hpp" + + +namespace cargo { +class gekko_plugin : public FSPlugin { + +public: + gekko_plugin(); + ~gekko_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; + +private: + void* m_libHandle; +}; +}; // namespace cargo + +#endif // GEKKO_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp new file mode 100644 index 0000000..1e81ee5 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp @@ -0,0 +1,49 @@ +#include +#include +#include +#include +#include "posix_plugin.hpp" + +namespace cargo { + +posix_plugin::posix_plugin() { + std::cout << "POSIXPlugin::POSIXPlugin()" << std::endl; +} + + +int +posix_plugin::open(const std::string& path, int flags, unsigned int mode) { + return ::open(path.c_str(), flags, mode); +} + +bool +posix_plugin::close(int fd) { + return ::close(fd) == 0; +} + +ssize_t +posix_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return ::pread(fd, buf, count, offset); +} + +ssize_t +posix_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return ::pwrite(fd, buf, count, offset); +} + +bool +posix_plugin::mkdir(const std::string& path, mode_t mode) { + return ::mkdir(path.c_str(), mode) == 0; +} + +off_t +posix_plugin::lseek(int fd, off_t offset, int whence) { + return ::lseek(fd, offset, whence); +} + +off_t +posix_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) mode; + return ::posix_fallocate(fd, offset, static_cast(len)); +} +}; // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp new file mode 100644 index 0000000..0110284 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp @@ -0,0 +1,29 @@ + +#ifndef POSIX_PLUGIN_HPP +#define POSIX_PLUGIN_HPP + +#include "fs_plugin.hpp" +#include +namespace cargo { +class posix_plugin : public FSPlugin { + +public: + posix_plugin(); + + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +} // namespace cargo +#endif // POSIX_PLUGIN_HPP diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index bc618f2..cfebaac 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -32,6 +32,7 @@ #include #include "cargo.hpp" #include "boost_serialization_std_optional.hpp" +#include "posix_file/file.hpp" namespace cargo { @@ -52,9 +53,9 @@ public: transfer_message() = default; transfer_message(std::uint64_t tid, std::uint32_t seqno, - std::string input_path, std::string output_path) + std::string input_path, std::string output_path, std::uint32_t type) : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)) {} + m_output_path(std::move(output_path)), m_type(type) {} [[nodiscard]] std::uint64_t tid() const { @@ -75,6 +76,11 @@ public: output_path() const { return m_output_path; } + /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */ + [[nodiscard]] cargo::FSPlugin::type + type() const { + return static_cast(m_type); + } private: template @@ -86,12 +92,14 @@ private: ar& m_seqno; ar& m_input_path; ar& m_output_path; + ar& m_type; } std::uint64_t m_tid{}; std::uint32_t m_seqno{}; std::string m_input_path; std::string m_output_path; + std::uint32_t m_type{}; }; class status_message { diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 89dc8c8..47856ff 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -33,9 +33,9 @@ namespace cargo { mpio_read::mpio_read(mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, - std::uint64_t block_size) + std::uint64_t block_size, FSPlugin::type fs_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {} cargo::error_code mpio_read::operator()() { @@ -123,8 +123,9 @@ mpio_read::operator()() { } // step3. POSIX write data + // We need to create the directory if it does not exists (using FSPlugin) m_output_file = std::make_unique( - posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR)); + posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_type)); m_output_file->fallocate(0, 0, file_size); diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index ce17c98..16a59ff 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -38,7 +38,7 @@ class mpio_read : public operation { public: mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size); + std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type); cargo::error_code operator()() final; @@ -50,12 +50,12 @@ public: progress(int ongoing_index ) final; private: + mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; cargo::error_code m_status; - std::unique_ptr m_output_file; int m_workers_size; int m_workers_rank; @@ -63,6 +63,7 @@ private: memory_buffer m_buffer; std::vector m_buffer_regions; std::uint64_t m_kb_size; + FSPlugin::type m_fs_type; }; diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index bcfb80a..f95ff3f 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -69,7 +69,7 @@ mpio_write::operator()() { } m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY)); + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_type)); m_workers_size = workers_size; m_workers_rank = workers_rank; diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 05755ff..ee25864 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -38,9 +38,9 @@ class mpio_write : public operation { public: mpio_write(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) + std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {} cargo::error_code operator()() final; @@ -71,6 +71,7 @@ private: std::vector m_buffer_regions; std::size_t m_bytes_per_rank; std::uint64_t m_kb_size; + FSPlugin::type m_fs_type; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 3e380dd..6f412c4 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -34,21 +34,21 @@ namespace cargo { std::unique_ptr operation::make_operation(cargo::tag t, mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) { + std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type) { using cargo::tag; switch(t) { case tag::pread: return std::make_unique(std::move(workers), std::move(input_path), - std::move(output_path), block_size); + std::move(output_path), block_size, fs_type); case tag::pwrite: return std::make_unique(std::move(workers), std::move(input_path), - std::move(output_path), block_size); + std::move(output_path), block_size, fs_type); case tag::sequential: return std::make_unique(std::move(workers), std::move(input_path), - std::move(output_path), block_size); + std::move(output_path), block_size, fs_type); default: return {}; } diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index b7e0eb8..0d07515 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -30,6 +30,7 @@ #include #include "proto/mpi/message.hpp" #include "cargo.hpp" +#include "posix_file/file.hpp" namespace cargo { /** @@ -41,7 +42,7 @@ public: static std::unique_ptr make_operation(cargo::tag t, boost::mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size); + std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type); virtual ~operation() = default; diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index cb5860b..99aa0f4 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -35,9 +35,9 @@ class seq_operation : public operation { public: seq_operation(mpi::communicator comm, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) + std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type) : m_comm(std::move(comm)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {} cargo::error_code operator()() final; @@ -53,6 +53,7 @@ private: std::filesystem::path m_output_path; cargo::error_code m_status; std::uint64_t m_kb_size; + FSPlugin::type m_fs_type; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 04a941a..37ddbc8 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -162,7 +162,7 @@ worker::run() { make_pair(m.input_path(), m.output_path()), make_pair(operation::make_operation( t, workers, m.input_path(), - m.output_path(), m_block_size), + m.output_path(), m_block_size, m.type()), 0))); const auto op = -- GitLab From d5ddfc88a20f51276d597d91fa905f83abfa04b4 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 4 Dec 2023 11:12:04 +0100 Subject: [PATCH 08/11] Added Expand and Hercules fs --- CMakeLists.txt | 16 +++++ cmake/FindExpand.cmake | 65 +++++++++++++++++++ cmake/FindGekkoFS.cmake | 4 +- cmake/FindHercules.cmake | 65 +++++++++++++++++++ src/posix_file/CMakeLists.txt | 23 ++++++- .../posix_file/fs_plugin/expand_plugin.cpp | 64 ++++++++++++++++++ .../posix_file/fs_plugin/expand_plugin.hpp | 30 +++++++++ .../posix_file/fs_plugin/fs_plugin.cpp | 12 ++++ .../posix_file/fs_plugin/gekko_plugin.hpp | 5 +- .../posix_file/fs_plugin/hercules_plugin.cpp | 64 ++++++++++++++++++ .../posix_file/fs_plugin/hercules_plugin.hpp | 30 +++++++++ 11 files changed, 370 insertions(+), 8 deletions(-) create mode 100644 cmake/FindExpand.cmake create mode 100644 cmake/FindHercules.cmake create mode 100644 src/posix_file/posix_file/fs_plugin/expand_plugin.cpp create mode 100644 src/posix_file/posix_file/fs_plugin/expand_plugin.hpp create mode 100644 src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp create mode 100644 src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 918361f..7f3bc4b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -264,7 +264,23 @@ FetchContent_MakeAvailable(expected) find_package(GekkoFS) if (GekkoFS_FOUND) add_compile_definitions(GEKKOFS_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found GekkoFS") endif() + +### Hercules: Optional for hercules +find_package(Hercules) +if (Hercules_FOUND) + add_compile_definitions(HERCULES_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found Hercules") +endif() + +### Expand: Optional for expand +find_package(Expand) +if (Expand_FOUND) + add_compile_definitions(EXPAND_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found Expand") +endif() + ### Threads: required by ASIO find_package(Threads REQUIRED) diff --git a/cmake/FindExpand.cmake b/cmake/FindExpand.cmake new file mode 100644 index 0000000..d780604 --- /dev/null +++ b/cmake/FindExpand.cmake @@ -0,0 +1,65 @@ +################################################################################ +# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(Expand_INCLUDE_DIR + NAMES user_functions.hpp + PREFIX gkfs +) + +find_library(Expand_LIBRARY + NAMES libexpand_user_lib.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + Expand + DEFAULT_MSG + Expand_INCLUDE_DIR + Expand_LIBRARY +) + +if(Expand_FOUND) + set(Expand_LIBRARIES ${Expand_LIBRARY}) + set(Expand_INCLUDE_DIRS ${Expand_INCLUDE_DIR}) + + + if(NOT TARGET Expand::Expand) + add_library(Expand::Expand UNKNOWN IMPORTED) + set_target_properties(Expand::Expand PROPERTIES + IMPORTED_LOCATION "${Expand_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${Expand_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + Expand_INCLUDE_DIR + Expand_LIBRARY +) diff --git a/cmake/FindGekkoFS.cmake b/cmake/FindGekkoFS.cmake index 4e98d61..7aebff4 100644 --- a/cmake/FindGekkoFS.cmake +++ b/cmake/FindGekkoFS.cmake @@ -45,8 +45,8 @@ find_package_handle_standard_args( ) if(GekkoFS_FOUND) - set(GekkoFS_LIBRARIES ${GekkoFS__LIBRARY}) - set(GekkoFS_INCLUDE_DIRS ${GekkoFS__INCLUDE_DIR}) + set(GekkoFS_LIBRARIES ${GekkoFS_LIBRARY}) + set(GekkoFS_INCLUDE_DIRS ${GekkoFS_INCLUDE_DIR}) if(NOT TARGET GekkoFS::GekkoFS) diff --git a/cmake/FindHercules.cmake b/cmake/FindHercules.cmake new file mode 100644 index 0000000..2f877ab --- /dev/null +++ b/cmake/FindHercules.cmake @@ -0,0 +1,65 @@ +################################################################################ +# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(Hercules_INCLUDE_DIR + NAMES user_functions.hpp + PREFIX hercules +) + +find_library(Hercules_LIBRARY + NAMES libhercules_user_lib.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + Hercules + DEFAULT_MSG + Hercules_INCLUDE_DIR + Hercules_LIBRARY +) + +if(Hercules_FOUND) + set(Hercules_LIBRARIES ${Hercules_LIBRARY}) + set(Hercules_INCLUDE_DIRS ${Hercules_INCLUDE_DIR}) + + + if(NOT TARGET Hercules::Hercules) + add_library(Hercules::Hercules UNKNOWN IMPORTED) + set_target_properties(Hercules::Hercules PROPERTIES + IMPORTED_LOCATION "${Hercules_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${Hercules_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + Hercules_INCLUDE_DIR + Hercules_LIBRARY +) diff --git a/src/posix_file/CMakeLists.txt b/src/posix_file/CMakeLists.txt index acf0816..cfbc971 100644 --- a/src/posix_file/CMakeLists.txt +++ b/src/posix_file/CMakeLists.txt @@ -26,8 +26,16 @@ add_library(posix_file STATIC) set(GEKKO_INCLUDES "") if (GEKKOFS_PLUGIN) - set(GEKKO_INCLUDES posix_file/fs_plugin/gekko_plugin.cpp posix_file/fs_plugin/gekko_plugin.cpp) + set(GEKKO_INCLUDES posix_file/fs_plugin/gekko_plugin.hpp posix_file/fs_plugin/gekko_plugin.cpp) endif() +if (EXPAND_plugin) + set(EXPAND_INCLUDES posix_file/fs_plugin/expand_plugin.hpp posix_file/fs_plugin/expand_plugin.cpp) +endif() +if (HERCULES_plugin) + set(HERCULES_INCLUDES posix_file/fs_plugin/hercules_plugin.chpp posix_file/fs_plugin/hercules_plugin.cpp) +endif() + + target_sources( posix_file PRIVATE posix_file/types.hpp @@ -42,6 +50,8 @@ target_sources( posix_file/fs_plugin/fs_plugin.cpp posix_file/fs_plugin/posix_plugin.cpp ${GEKKO_INCLUDES} + ${HERCULES_INCLUDES} + ${EXPAND_INCLUDES} ) @@ -52,8 +62,17 @@ set_property(TARGET posix_file PROPERTY POSITION_INDEPENDENT_CODE ON) set(ADHOC "") if (GEKKOFS_PLUGIN) -set(ADHOC ${$ADHOC} GekkoFS::GekkoFS) + set(ADHOC ${ADHOC} GekkoFS::GekkoFS) endif() +if (EXPAND_PLUGIN) + set(ADHOC ${ADHOC} Expand::Expand) +endif() + +if (HERCULES_PLUGIN) + set(ADHOC ${ADHOC} Hercules::Hercules) +endif() + + target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected PRIVATE ${ADHOC}) diff --git a/src/posix_file/posix_file/fs_plugin/expand_plugin.cpp b/src/posix_file/posix_file/fs_plugin/expand_plugin.cpp new file mode 100644 index 0000000..ef510b0 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/expand_plugin.cpp @@ -0,0 +1,64 @@ + +#include "fs_plugin.hpp" +#include "expand_plugin.hpp" + + +#include +namespace cargo { +expand_plugin::expand_plugin() { + int result = expand_init(); + if (result != 0) { + std::cerr << "Failed to initialize expand" << std::endl; + } +} + +expand_plugin::~expand_plugin() { + int result = expand_end(); + if (result != 0) { + std::cerr << "Failed to finalize expand" << std::endl; + } +} +// Override the open function +int +expand_plugin::open(const std::string& path, int flags, unsigned int mode) { + return expand_open(path, flags, mode); +} + +// Override the pread function +ssize_t +expand_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return expand_pread_ws(fd, buf, count, offset); +} + +// Override the pwrite function +ssize_t +expand_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return expand_pwrite_ws(fd, buf, count, offset); +} + + +bool +expand_plugin::mkdir(const std::string& path, mode_t mode) { + int result = expand_create(path, mode | S_IFDIR); + return result; +} + +bool +expand_plugin::close(int fd) { + return expand_close(fd); +} + +off_t +expand_plugin::lseek(int fd, off_t offset, int whence) { + return expand_lseek(fd, offset, whence); +} + +off_t +expand_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/expand_plugin.hpp b/src/posix_file/posix_file/fs_plugin/expand_plugin.hpp new file mode 100644 index 0000000..78a7dfc --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/expand_plugin.hpp @@ -0,0 +1,30 @@ + +#ifndef EXPAND_PLUGIN_HPP +#define EXPAND_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class expand_plugin : public FSPlugin { + +public: + expand_plugin(); + ~expand_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +}; // namespace cargo + +#endif // EXPAND_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp index 8d45833..bcd6fe3 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -3,6 +3,10 @@ #ifdef GEKKO_PLUGIN #include "gekko_plugin.hpp" #endif +#ifdef HERCULES_PLUGIN +#include "hercules_plugin.hpp" +#endif + namespace cargo { std::unique_ptr @@ -14,6 +18,14 @@ FSPlugin::make_fs(type t) { #ifdef GEKKO_PLUGIN case type::gekkofs: return std::make_unique(); +#endif +#ifdef HERCULES_PLUGIN + case type::hercules: + return std::make_unique(); +#endif +#ifdef EXPAND_PLUGIN + case type::expand: + return std::make_unique(); #endif default: return {}; diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp index dee1e3d..2d27132 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp @@ -4,7 +4,6 @@ #include "fs_plugin.hpp" - namespace cargo { class gekko_plugin : public FSPlugin { @@ -23,11 +22,9 @@ public: mkdir(const std::string& path, mode_t mode) final; off_t lseek(int fd, off_t offset, int whence) final; + // Fallocate is not needed in GekkoFS as pwrite takes care of it. off_t fallocate(int fd, int mode, off_t offset, off_t len) final; - -private: - void* m_libHandle; }; }; // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp b/src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp new file mode 100644 index 0000000..f3c4a11 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp @@ -0,0 +1,64 @@ + +#include "fs_plugin.hpp" +#include "hercules_plugin.hpp" + + +#include +namespace cargo { +hercules_plugin::hercules_plugin() { + int result = hercules_init(); + if (result != 0) { + std::cerr << "Failed to initialize hercules" << std::endl; + } +} + +hercules_plugin::~hercules_plugin() { + int result = hercules_end(); + if (result != 0) { + std::cerr << "Failed to finalize hercules" << std::endl; + } +} +// Override the open function +int +hercules_plugin::open(const std::string& path, int flags, unsigned int mode) { + return hercules_open(path, flags, mode); +} + +// Override the pread function +ssize_t +hercules_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return hercules_pread_ws(fd, buf, count, offset); +} + +// Override the pwrite function +ssize_t +hercules_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return hercules_pwrite_ws(fd, buf, count, offset); +} + + +bool +hercules_plugin::mkdir(const std::string& path, mode_t mode) { + int result = hercules_create(path, mode | S_IFDIR); + return result; +} + +bool +hercules_plugin::close(int fd) { + return hercules_close(fd); +} + +off_t +hercules_plugin::lseek(int fd, off_t offset, int whence) { + return hercules_lseek(fd, offset, whence); +} + +off_t +hercules_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp b/src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp new file mode 100644 index 0000000..fdef892 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp @@ -0,0 +1,30 @@ + +#ifndef HERCULES_PLUGIN_HPP +#define HERCULES_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class hercules_plugin : public FSPlugin { + +public: + hercules_plugin(); + ~hercules_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +}; // namespace cargo + +#endif // HERCULES_PLUGIN_HPP -- GitLab From 94b47875d350576ef8d82fe7fad252f5ce88f226 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 4 Dec 2023 13:24:56 +0100 Subject: [PATCH 09/11] Implemented sequential for potential adhoc types --- src/master.cpp | 48 +++-- .../posix_file/fs_plugin/fs_plugin.cpp | 3 + src/proto/mpi/message.hpp | 22 +- src/worker/mpio_read.cpp | 13 +- src/worker/mpio_read.hpp | 12 +- src/worker/mpio_write.cpp | 2 +- src/worker/mpio_write.hpp | 10 +- src/worker/ops.cpp | 25 ++- src/worker/ops.hpp | 3 +- src/worker/sequential.cpp | 203 +++++++++++++++++- src/worker/sequential.hpp | 39 +++- src/worker/worker.cpp | 3 +- 12 files changed, 317 insertions(+), 66 deletions(-) diff --git a/src/master.cpp b/src/master.cpp index 350bbf4..37a5eb1 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -48,20 +48,29 @@ make_message(std::uint64_t tid, std::uint32_t seqno, const cargo::dataset& input, const cargo::dataset& output) { if(input.supports_parallel_transfer()) { - return std::make_tuple(static_cast(cargo::tag::pread), - cargo::transfer_message{tid, seqno, input.path(), - output.path(), static_cast(output.get_type())}); + return std::make_tuple( + static_cast(cargo::tag::pread), + cargo::transfer_message{ + tid, seqno, input.path(), + static_cast(input.get_type()), output.path(), + static_cast(output.get_type())}); } if(output.supports_parallel_transfer()) { - return std::make_tuple(static_cast(cargo::tag::pwrite), - cargo::transfer_message{tid, seqno, input.path(), - output.path(), static_cast(input.get_type())}); + return std::make_tuple( + static_cast(cargo::tag::pwrite), + cargo::transfer_message{ + tid, seqno, input.path(), + static_cast(input.get_type()), output.path(), + static_cast(output.get_type())}); } return std::make_tuple( static_cast(cargo::tag::sequential), - cargo::transfer_message{tid, seqno, input.path(), output.path(), static_cast(input.get_type())}); + cargo::transfer_message{tid, seqno, input.path(), + static_cast(input.get_type()), + output.path(), + static_cast(input.get_type())}); } } // namespace @@ -71,10 +80,11 @@ using namespace std::literals; namespace cargo { master_server::master_server(std::string name, std::string address, - bool daemonize, std::filesystem::path rundir, std::uint64_t block_size, + bool daemonize, std::filesystem::path rundir, + std::uint64_t block_size, std::optional pidfile) - : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size), - std::move(pidfile)), + : server(std::move(name), std::move(address), daemonize, std::move(rundir), + std::move(block_size), std::move(pidfile)), provider(m_network_engine, 0), m_mpi_listener_ess(thallium::xstream::create()), m_mpi_listener_ult(m_mpi_listener_ess->make_thread( @@ -122,7 +132,7 @@ master_server::mpi_listener_ult() { status_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_DEBUG("msg => from: {} body: {{payload: {}}}", - msg->source(), m); + msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, m.state(), m.bw(), m.error_code()); @@ -239,12 +249,12 @@ master_server::transfer_datasets(const network::request& req, // Then create a new message for each file and append the // file to the d prefix // We will asume that the path is the original absolute - // The prefix selects the method of transfer + // The prefix selects the method of transfer // And if not specified then we will use none // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then // bbb/xxx -> ttt/xxx const auto& p = s.path(); - + std::vector files; if(std::filesystem::is_directory(p)) { LOGGER_INFO("Expanding input directory {}", p); @@ -269,7 +279,7 @@ master_server::transfer_datasets(const network::request& req, // path (d.path) plus the path from f, removing the // initial path p (taking care of the trailing /) auto leading = p.size(); - if(leading>0 and p.back() == '/') { + if(leading > 0 and p.back() == '/') { leading--; } @@ -303,12 +313,16 @@ master_server::transfer_datasets(const network::request& req, const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; - // Create the directory if it does not exist (only in parallel transfer) - if(!std::filesystem::path(d.path()).parent_path().empty() and d.supports_parallel_transfer()) { + // Create the directory if it does not exist (only in + // parallel transfer) + if(!std::filesystem::path(d.path()) + .parent_path() + .empty() and + d.supports_parallel_transfer()) { std::filesystem::create_directories( std::filesystem::path(d.path()).parent_path()); } - + for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { const auto [t, m] = make_message(r.tid(), i, s, d); LOGGER_INFO("msg <= to: {} body: {}", rank, m); diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp index bcd6fe3..af02eea 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -6,6 +6,9 @@ #ifdef HERCULES_PLUGIN #include "hercules_plugin.hpp" #endif +#ifdef EXPAND_PLUGIN +#include "expand_plugin.hpp" +#endif namespace cargo { diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index cfebaac..5be72c6 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -53,9 +53,11 @@ public: transfer_message() = default; transfer_message(std::uint64_t tid, std::uint32_t seqno, - std::string input_path, std::string output_path, std::uint32_t type) + std::string input_path, std::uint32_t i_type, + std::string output_path, std::uint32_t o_type) : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_type(type) {} + m_i_type(i_type), m_output_path(std::move(output_path)), + m_o_type(o_type) {} [[nodiscard]] std::uint64_t tid() const { @@ -78,8 +80,14 @@ public: } /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */ [[nodiscard]] cargo::FSPlugin::type - type() const { - return static_cast(m_type); + o_type() const { + return static_cast(m_o_type); + } + + /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */ + [[nodiscard]] cargo::FSPlugin::type + i_type() const { + return static_cast(m_i_type); } private: @@ -92,14 +100,16 @@ private: ar& m_seqno; ar& m_input_path; ar& m_output_path; - ar& m_type; + ar& m_i_type; + ar& m_o_type; } std::uint64_t m_tid{}; std::uint32_t m_seqno{}; std::string m_input_path; + std::uint32_t m_i_type{}; std::string m_output_path; - std::uint32_t m_type{}; + std::uint32_t m_o_type{}; }; class status_message { diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 47856ff..92f0a05 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -33,9 +33,11 @@ namespace cargo { mpio_read::mpio_read(mpi::communicator workers, std::filesystem::path input_path, std::filesystem::path output_path, - std::uint64_t block_size, FSPlugin::type fs_type) + std::uint64_t block_size, FSPlugin::type fs_i_type, + FSPlugin::type fs_o_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), + m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type) {} cargo::error_code mpio_read::operator()() { @@ -123,9 +125,10 @@ mpio_read::operator()() { } // step3. POSIX write data - // We need to create the directory if it does not exists (using FSPlugin) - m_output_file = std::make_unique( - posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_type)); + // We need to create the directory if it does not exists (using + // FSPlugin) + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); m_output_file->fallocate(0, 0, file_size); diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index 16a59ff..bd72a6e 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -38,7 +38,8 @@ class mpio_read : public operation { public: mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type); + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type); cargo::error_code operator()() final; @@ -47,15 +48,14 @@ public: progress() const final; int - progress(int ongoing_index ) final; + progress(int ongoing_index) final; private: - mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; cargo::error_code m_status; - + std::unique_ptr m_output_file; int m_workers_size; int m_workers_rank; @@ -63,8 +63,8 @@ private: memory_buffer m_buffer; std::vector m_buffer_regions; std::uint64_t m_kb_size; - FSPlugin::type m_fs_type; - + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index f95ff3f..44b8070 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -69,7 +69,7 @@ mpio_write::operator()() { } m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY, 0, m_fs_type)); + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); m_workers_size = workers_size; m_workers_rank = workers_rank; diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index ee25864..eb50129 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -38,9 +38,12 @@ class mpio_write : public operation { public: mpio_write(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type) + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {} + m_output_path(std::move(output_path)), + m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), + m_fs_o_type(fs_o_type) {} cargo::error_code operator()() final; @@ -71,7 +74,8 @@ private: std::vector m_buffer_regions; std::size_t m_bytes_per_rank; std::uint64_t m_kb_size; - FSPlugin::type m_fs_type; + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 6f412c4..5d30d95 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -34,21 +34,23 @@ namespace cargo { std::unique_ptr operation::make_operation(cargo::tag t, mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type) { + std::filesystem::path output_path, + std::uint64_t block_size, FSPlugin::type fs_i_type, + FSPlugin::type fs_o_type) { using cargo::tag; switch(t) { case tag::pread: - return std::make_unique(std::move(workers), - std::move(input_path), - std::move(output_path), block_size, fs_type); + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); case tag::pwrite: - return std::make_unique(std::move(workers), - std::move(input_path), - std::move(output_path), block_size, fs_type); + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); case tag::sequential: - return std::make_unique(std::move(workers), - std::move(input_path), - std::move(output_path), block_size, fs_type); + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); default: return {}; } @@ -90,7 +92,8 @@ operation::bw() { return m_bw; } -void operation::bw(float_t bw) { +void +operation::bw(float_t bw) { m_bw = bw; } void diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index 0d07515..ba3c156 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -42,7 +42,8 @@ public: static std::unique_ptr make_operation(cargo::tag t, boost::mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type); + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type); virtual ~operation() = default; diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 40f5e4c..eafd205 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -24,15 +24,72 @@ #include #include "sequential.hpp" - +#include namespace cargo { cargo::error_code seq_operation::operator()() { - LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); - m_status = cargo::error_code::not_implemented; - return cargo::error_code::not_implemented; + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + m_status = error_code::transfer_in_progress; + try { + + const auto workers_size = m_workers.size(); + const auto workers_rank = m_workers.rank(); + std::size_t block_size = m_kb_size * 1024u; + std::size_t file_size = std::filesystem::file_size(m_input_path); + + // compute the number of blocks in the file + int total_blocks = static_cast(file_size / block_size); + + if(file_size % block_size != 0) { + ++total_blocks; + } + + // find how many blocks this rank is responsible for + std::size_t blocks_per_rank = total_blocks / workers_size; + + if(int64_t n = total_blocks % workers_size; + n != 0 && workers_rank < n) { + ++blocks_per_rank; + } + + // step 1. acquire buffers + + m_buffer.resize(blocks_per_rank * block_size); + m_buffer_regions.reserve(blocks_per_rank); + + for(std::size_t i = 0; i < blocks_per_rank; ++i) { + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); + } + + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + m_file_size = file_size; + m_total_blocks = total_blocks; + + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } + + return error_code::transfer_in_progress; } cargo::error_code @@ -42,9 +99,143 @@ seq_operation::progress() const { int seq_operation::progress(int ongoing_index) { - ongoing_index++; - m_status = cargo::error_code::not_implemented; + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + // compute the number of blocks in the file + + int index = 0; + if(write == false) { + if(ongoing_index == 0) { + m_bytes_per_rank = 0; + } + try { + for(const auto& file_range : + all_of(*m_input_file) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + m_status = error_code::transfer_in_progress; + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + const std::size_t n = m_input_file->pread( + m_buffer_regions[index], file_range.offset(), + file_range.size()); + + LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", + fmt::join(buffer_regions[index].begin(), + buffer_regions[index].begin() + 10, ""), + fmt::join(buffer_regions[index].end() - 10, + buffer_regions[index].end(), "")); + + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast< + std::chrono::duration>(end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_DEBUG( + "BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + ++index; + } + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + } + write = true; + // We finished reading + // step3. POSIX write data + // We need to create the directory if it does not exists (using + // FSPlugin) + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + + m_output_file->fallocate(0, 0, m_file_size); + + + try { + int index = 0; + m_status = error_code::transfer_in_progress; + for(const auto& file_range : + all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), + file_range.size()); + // Do sleep + std::this_thread::sleep_for(sleep_value()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast>( + end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_DEBUG( + "BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + ++index; + } + + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + + m_status = error_code::success; return -1; } + } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index 99aa0f4..508c197 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -26,6 +26,10 @@ #define CARGO_WORKER_SEQUENTIAL_HPP #include "ops.hpp" +#include +#include +#include "ops.hpp" +#include "memory.hpp" namespace mpi = boost::mpi; @@ -34,26 +38,43 @@ namespace cargo { class seq_operation : public operation { public: - seq_operation(mpi::communicator comm, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size, FSPlugin::type fs_type) - : m_comm(std::move(comm)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), m_fs_type(fs_type) {} + seq_operation(mpi::communicator workers, std::filesystem::path input_path, + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + : m_workers(std::move(workers)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)), + m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), + m_fs_o_type(fs_o_type) {} cargo::error_code operator()() final; cargo::error_code progress() const; - int - progress(int ongoing_index ) final; + int + progress(int ongoing_index) final; private: - mpi::communicator m_comm; + mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; - cargo::error_code m_status; + + std::unique_ptr m_input_file; + std::unique_ptr m_output_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + std::size_t m_file_size; + int m_total_blocks; + + memory_buffer m_buffer; + std::vector m_buffer_regions; + std::size_t m_bytes_per_rank; std::uint64_t m_kb_size; - FSPlugin::type m_fs_type; + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; + cargo::error_code m_status; + bool write{}; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 37ddbc8..fe5f853 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -162,7 +162,8 @@ worker::run() { make_pair(m.input_path(), m.output_path()), make_pair(operation::make_operation( t, workers, m.input_path(), - m.output_path(), m_block_size, m.type()), + m.output_path(), m_block_size, + m.i_type(), m.o_type()), 0))); const auto op = -- GitLab From 8acb665c764385c0571501ab6f63281ef09bf720 Mon Sep 17 00:00:00 2001 From: rnou Date: Mon, 4 Dec 2023 13:27:53 +0100 Subject: [PATCH 10/11] Removed debug messages on posix backend --- src/posix_file/posix_file/file.hpp | 6 +++--- src/posix_file/posix_file/fs_plugin/posix_plugin.cpp | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 5114885..03d6fbe 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -300,11 +300,11 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode, if(flags & O_CREAT) { fs_plugin->mkdir(filepath.parent_path().c_str(), 0755); } - std::cout << "Opening file " << filepath << std::endl; + int fd = fs_plugin->open(filepath.c_str(), flags, mode); - std::cout << "File opened? " << fd << " -- " << flags << " mode: " << mode << std::endl; + if(fd == -1) { - throw io_error("posix_file::open_gekko", errno); + throw io_error("posix_file::open ", errno); } return file{filepath, fd, std::move(fs_plugin)}; diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp index 1e81ee5..4d0f134 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp @@ -7,7 +7,6 @@ namespace cargo { posix_plugin::posix_plugin() { - std::cout << "POSIXPlugin::POSIXPlugin()" << std::endl; } -- GitLab From 224fc9bae142b8e173a8a9904d646e30b45c4be1 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 12 Dec 2023 14:24:00 +0100 Subject: [PATCH 11/11] Add seq_mixed operation and related files --- cli/copy.cpp | 6 +- src/CMakeLists.txt | 2 + src/master.cpp | 4 +- src/proto/mpi/message.hpp | 1 + src/worker/ops.cpp | 5 + src/worker/seq_mixed.cpp | 188 ++++++++++++++++++++++++++++++++++++++ src/worker/seq_mixed.hpp | 82 +++++++++++++++++ src/worker/sequential.cpp | 8 +- src/worker/worker.cpp | 2 + 9 files changed, 291 insertions(+), 7 deletions(-) create mode 100644 src/worker/seq_mixed.cpp create mode 100644 src/worker/seq_mixed.hpp diff --git a/cli/copy.cpp b/cli/copy.cpp index 8f0188d..f55a402 100644 --- a/cli/copy.cpp +++ b/cli/copy.cpp @@ -77,7 +77,8 @@ parse_command_line(int argc, char* argv[]) { app.add_option("--if", cfg.input_flags, "Flags for input datasets. Accepted values\n" " - posix: read data using POSIX (default)\n" - " - mpio: read data using MPI-IO") + " - parallel: read data using MPI-IO\n" + " - gekkofs: read data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, CLI::ignore_case)); @@ -85,7 +86,8 @@ parse_command_line(int argc, char* argv[]) { app.add_option("--of", cfg.output_flags, "Flags for output datasets. Accepted values\n" " - posix: write data using POSIX (default)\n" - " - mpio: write data using MPI-IO") + " - parallel: write data using MPI-IO\n" + " - gekkofs: write data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, CLI::ignore_case)); diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ff1a5c3..9bc9ce8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -43,6 +43,8 @@ target_sources( worker/ops.hpp worker/sequential.cpp worker/sequential.hpp + worker/seq_mixed.cpp + worker/seq_mixed.hpp worker/worker.cpp worker/worker.hpp env.hpp diff --git a/src/master.cpp b/src/master.cpp index 37a5eb1..594f6ac 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -66,7 +66,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno, } return std::make_tuple( - static_cast(cargo::tag::sequential), + static_cast(cargo::tag::seq_mixed), cargo::transfer_message{tid, seqno, input.path(), static_cast(input.get_type()), output.path(), @@ -123,7 +123,7 @@ master_server::mpi_listener_ult() { auto msg = world.iprobe(); if(!msg) { - thallium::thread::self().sleep(m_network_engine, 150); + thallium::thread::self().sleep(m_network_engine, 10); continue; } diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index 5be72c6..a6b1d2c 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -40,6 +40,7 @@ enum class tag : int { pread, pwrite, sequential, + seq_mixed, bw_shaping, status, shutdown diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 5d30d95..0d72c18 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -26,6 +26,7 @@ #include "mpio_read.hpp" #include "mpio_write.hpp" #include "sequential.hpp" +#include "seq_mixed.hpp" namespace mpi = boost::mpi; @@ -51,6 +52,10 @@ operation::make_operation(cargo::tag t, mpi::communicator workers, return std::make_unique( std::move(workers), std::move(input_path), std::move(output_path), block_size, fs_i_type, fs_o_type); + case tag::seq_mixed: + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); default: return {}; } diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp new file mode 100644 index 0000000..b6dec2f --- /dev/null +++ b/src/worker/seq_mixed.cpp @@ -0,0 +1,188 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include "seq_mixed.hpp" +#include + +namespace cargo { + +cargo::error_code +seq_mixed_operation::operator()() { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + m_status = error_code::transfer_in_progress; + try { + + const auto workers_size = m_workers.size(); + const auto workers_rank = m_workers.rank(); + std::size_t block_size = m_kb_size * 1024u; + std::size_t file_size = std::filesystem::file_size(m_input_path); + + // compute the number of blocks in the file + int total_blocks = static_cast(file_size / block_size); + + if(file_size % block_size != 0) { + ++total_blocks; + } + + // find how many blocks this rank is responsible for + std::size_t blocks_per_rank = total_blocks / workers_size; + + if(int64_t n = total_blocks % workers_size; + n != 0 && workers_rank < n) { + ++blocks_per_rank; + } + + // step 1. acquire buffers + + m_buffer.resize(blocks_per_rank * block_size); + m_buffer_regions.reserve(blocks_per_rank); + + for(std::size_t i = 0; i < blocks_per_rank; ++i) { + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); + } + + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + + m_output_file->fallocate(0, 0, file_size); + + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + m_file_size = file_size; + m_total_blocks = total_blocks; + + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } + + return error_code::transfer_in_progress; +} + +cargo::error_code +seq_mixed_operation::progress() const { + return m_status; +} + +int +seq_mixed_operation::progress(int ongoing_index) { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + // compute the number of blocks in the file + + int index = 0; + if(write == false) { + if(ongoing_index == 0) { + m_bytes_per_rank = 0; + } + try { + for(const auto& file_range : + all_of(*m_input_file) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + m_status = error_code::transfer_in_progress; + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + const std::size_t n = m_input_file->pread( + m_buffer_regions[index], file_range.offset(), + file_range.size()); + + LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", + fmt::join(buffer_regions[index].begin(), + buffer_regions[index].begin() + 10, ""), + fmt::join(buffer_regions[index].end() - 10, + buffer_regions[index].end(), "")); + +/* Do write */ + m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), + file_range.size()); + + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast< + std::chrono::duration>(end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_DEBUG( + "BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + ++index; + } + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + } + + m_status = error_code::success; + return -1; +} + + +} // namespace cargo diff --git a/src/worker/seq_mixed.hpp b/src/worker/seq_mixed.hpp new file mode 100644 index 0000000..c3545cc --- /dev/null +++ b/src/worker/seq_mixed.hpp @@ -0,0 +1,82 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_SEQ_MIXED_HPP +#define CARGO_WORKER_SEQ_MIXED_HPP + +#include "ops.hpp" +#include +#include +#include "ops.hpp" +#include "memory.hpp" + +namespace mpi = boost::mpi; + +namespace cargo { + +class seq_mixed_operation : public operation { + +public: + seq_mixed_operation(mpi::communicator workers, std::filesystem::path input_path, + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + : m_workers(std::move(workers)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)), + m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), + m_fs_o_type(fs_o_type) {} + + cargo::error_code + operator()() final; + cargo::error_code + progress() const; + + int + progress(int ongoing_index) final; + +private: + mpi::communicator m_workers; + std::filesystem::path m_input_path; + std::filesystem::path m_output_path; + + std::unique_ptr m_input_file; + std::unique_ptr m_output_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + std::size_t m_file_size; + int m_total_blocks; + + memory_buffer m_buffer; + std::vector m_buffer_regions; + std::size_t m_bytes_per_rank; + std::uint64_t m_kb_size; + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; + cargo::error_code m_status; + bool write{}; +}; + +} // namespace cargo + +#endif // CARGO_WORKER_SEQUENTIAL_HPP diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index eafd205..4254d2c 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -175,11 +175,13 @@ seq_operation::progress(int ongoing_index) { // step3. POSIX write data // We need to create the directory if it does not exists (using // FSPlugin) - m_output_file = std::make_unique(posix_file::create( - m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); - m_output_file->fallocate(0, 0, m_file_size); + if ( write and ongoing_index == 0 ) { + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + m_output_file->fallocate(0, 0, m_file_size); + } try { int index = 0; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index fe5f853..a680c84 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -154,6 +154,8 @@ worker::run() { [[fallthrough]]; case tag::pwrite: [[fallthrough]]; + case tag::seq_mixed: + [[fallthrough]]; case tag::sequential: { transfer_message m; world.recv(msg->source(), msg->tag(), m); -- GitLab