diff --git a/CMakeLists.txt b/CMakeLists.txt index f76e44fe3636e2db3220e38f846acac5cfd4b569..7f3bc4b74883a946130aa14e94a20dff1077f039 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -260,6 +260,27 @@ FetchContent_Declare( FetchContent_MakeAvailable(expected) +### GekkoFS: Optional for gekkofs +find_package(GekkoFS) +if (GekkoFS_FOUND) + add_compile_definitions(GEKKOFS_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found GekkoFS") +endif() + +### Hercules: Optional for hercules +find_package(Hercules) +if (Hercules_FOUND) + add_compile_definitions(HERCULES_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found Hercules") +endif() + +### Expand: Optional for expand +find_package(Expand) +if (Expand_FOUND) + add_compile_definitions(EXPAND_PLUGIN) + message(STATUS "[${PROJECT_NAME}] Found Expand") +endif() + ### Threads: required by ASIO find_package(Threads REQUIRED) diff --git a/cli/copy.cpp b/cli/copy.cpp index 745263a064335c04eb29770d08fa90f2221e74e7..f55a40281d54680248497c278a3d58d0d9fdf95b 100644 --- a/cli/copy.cpp +++ b/cli/copy.cpp @@ -29,19 +29,24 @@ #include #include -enum class dataset_flags { posix, mpio }; +enum class dataset_flags { posix, parallel, none, gekkofs, hercules, expand, dataclay }; -std::map dataset_flags_map{ - {"posix", dataset_flags::posix}, - {"mpio", dataset_flags::mpio}}; +std::map dataset_flags_map{ + {"posix", cargo::dataset::type::posix}, + {"parallel", cargo::dataset::type::parallel}, + {"none", cargo::dataset::type::none}, + {"gekkofs", cargo::dataset::type::gekkofs}, + {"hercules", cargo::dataset::type::hercules}, + {"expand", cargo::dataset::type::expand}, + {"dataclay", cargo::dataset::type::dataclay}}; struct copy_config { std::string progname; std::string server_address; std::vector inputs; - dataset_flags input_flags = dataset_flags::posix; + cargo::dataset::type input_flags = cargo::dataset::type::posix; std::vector outputs; - dataset_flags output_flags = dataset_flags::posix; + cargo::dataset::type output_flags = cargo::dataset::type::posix; }; copy_config @@ -72,7 +77,8 @@ parse_command_line(int argc, char* argv[]) { app.add_option("--if", cfg.input_flags, "Flags for input datasets. Accepted values\n" " - posix: read data using POSIX (default)\n" - " - mpio: read data using MPI-IO") + " - parallel: read data using MPI-IO\n" + " - gekkofs: read data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, CLI::ignore_case)); @@ -80,7 +86,8 @@ parse_command_line(int argc, char* argv[]) { app.add_option("--of", cfg.output_flags, "Flags for output datasets. Accepted values\n" " - posix: write data using POSIX (default)\n" - " - mpio: write data using MPI-IO") + " - parallel: write data using MPI-IO\n" + " - gekkofs: write data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, CLI::ignore_case)); @@ -119,17 +126,13 @@ main(int argc, char* argv[]) { std::transform(cfg.inputs.cbegin(), cfg.inputs.cend(), std::back_inserter(inputs), [&](const auto& src) { return cargo::dataset{ - src, cfg.input_flags == dataset_flags::mpio - ? cargo::dataset::type::parallel - : cargo::dataset::type::posix}; + src, cfg.input_flags}; }); std::transform(cfg.outputs.cbegin(), cfg.outputs.cend(), std::back_inserter(outputs), [&cfg](const auto& tgt) { return cargo::dataset{ - tgt, cfg.output_flags == dataset_flags::mpio - ? cargo::dataset::type::parallel - : cargo::dataset::type::posix}; + tgt, cfg.output_flags}; }); const auto tx = cargo::transfer_datasets(server, inputs, outputs); diff --git a/cmake/FindExpand.cmake b/cmake/FindExpand.cmake new file mode 100644 index 0000000000000000000000000000000000000000..d7806046e4ba72214dc91c7bc96fb88b6b92cb27 --- /dev/null +++ b/cmake/FindExpand.cmake @@ -0,0 +1,65 @@ +################################################################################ +# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(Expand_INCLUDE_DIR + NAMES user_functions.hpp + PREFIX gkfs +) + +find_library(Expand_LIBRARY + NAMES libexpand_user_lib.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + Expand + DEFAULT_MSG + Expand_INCLUDE_DIR + Expand_LIBRARY +) + +if(Expand_FOUND) + set(Expand_LIBRARIES ${Expand_LIBRARY}) + set(Expand_INCLUDE_DIRS ${Expand_INCLUDE_DIR}) + + + if(NOT TARGET Expand::Expand) + add_library(Expand::Expand UNKNOWN IMPORTED) + set_target_properties(Expand::Expand PROPERTIES + IMPORTED_LOCATION "${Expand_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${Expand_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + Expand_INCLUDE_DIR + Expand_LIBRARY +) diff --git a/cmake/FindGekkoFS.cmake b/cmake/FindGekkoFS.cmake new file mode 100644 index 0000000000000000000000000000000000000000..7aebff47b72eb2ea8cf79dc26535f3a54f488103 --- /dev/null +++ b/cmake/FindGekkoFS.cmake @@ -0,0 +1,65 @@ +################################################################################ +# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(GekkoFS_INCLUDE_DIR + NAMES user_functions.hpp + PREFIX gkfs +) + +find_library(GekkoFS_LIBRARY + NAMES libgkfs_user_lib.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + GekkoFS + DEFAULT_MSG + GekkoFS_INCLUDE_DIR + GekkoFS_LIBRARY +) + +if(GekkoFS_FOUND) + set(GekkoFS_LIBRARIES ${GekkoFS_LIBRARY}) + set(GekkoFS_INCLUDE_DIRS ${GekkoFS_INCLUDE_DIR}) + + + if(NOT TARGET GekkoFS::GekkoFS) + add_library(GekkoFS::GekkoFS UNKNOWN IMPORTED) + set_target_properties(GekkoFS::GekkoFS PROPERTIES + IMPORTED_LOCATION "${GekkoFS_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${GekkoFS_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + GekkoFS_INCLUDE_DIR + GekkoFS_LIBRARY +) diff --git a/cmake/FindHercules.cmake b/cmake/FindHercules.cmake new file mode 100644 index 0000000000000000000000000000000000000000..2f877abd278d99163b8eeb81e6ff05c07a4a7272 --- /dev/null +++ b/cmake/FindHercules.cmake @@ -0,0 +1,65 @@ +################################################################################ +# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # +# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# # +# This software was partially supported by the # +# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# # +# This software was partially supported by the # +# ADA-FS project under the SPPEXA project funded by the DFG. # +# # +# This file is part of GekkoFS. # +# # +# GekkoFS is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# GekkoFS is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with GekkoFS. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(Hercules_INCLUDE_DIR + NAMES user_functions.hpp + PREFIX hercules +) + +find_library(Hercules_LIBRARY + NAMES libhercules_user_lib.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + Hercules + DEFAULT_MSG + Hercules_INCLUDE_DIR + Hercules_LIBRARY +) + +if(Hercules_FOUND) + set(Hercules_LIBRARIES ${Hercules_LIBRARY}) + set(Hercules_INCLUDE_DIRS ${Hercules_INCLUDE_DIR}) + + + if(NOT TARGET Hercules::Hercules) + add_library(Hercules::Hercules UNKNOWN IMPORTED) + set_target_properties(Hercules::Hercules PROPERTIES + IMPORTED_LOCATION "${Hercules_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${Hercules_INCLUDE_DIR}" + ) + endif() +endif() + + +mark_as_advanced( + Hercules_INCLUDE_DIR + Hercules_LIBRARY +) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index ca15aa75c234f9b9892a6548a3a924bf842295d4..1c62d1c493eba638df118b8948bc621f64240558 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -77,6 +77,9 @@ public: void path(std::string path); + [[nodiscard]] dataset::type + get_type() const; + template void serialize(Archive& ar) { @@ -86,7 +89,7 @@ public: private: std::string m_path; - dataset::type m_type = dataset::type::none; + dataset::type m_type; }; diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 65fc7262e9d2faa9955f83d28d051c54b2cc14ff..7968edaf55d5e4b098f6c94fb2f6724ac72868a3 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -77,6 +77,11 @@ dataset::path(std::string path) { m_path = std::move(path); }; +dataset::type +dataset::get_type() const { + return m_type; +}; + bool dataset::supports_parallel_transfer() const noexcept { return m_type == dataset::type::parallel; diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index cccab79220a9fc26f31cfc71d964684bb1c032a2..ae1e451bd9af6a7cd784c6eea6792f979c06ef5f 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -39,6 +39,7 @@ class Cargo(CMakePackage): version("0.3.1", sha256="613485354e24c4b97cb6d045657569f94dc1d9bbb391b5a166f8d18b3595428b") version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960") version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd") + version("0.3.4", branch="rnou/fallocate") # build variants variant('build_type', default='Release', diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ff1a5c3779fe132b7860e36bc539206d53a532ea..9bc9ce836dbc8b74942e2911869e546e61e92560 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -43,6 +43,8 @@ target_sources( worker/ops.hpp worker/sequential.cpp worker/sequential.hpp + worker/seq_mixed.cpp + worker/seq_mixed.hpp worker/worker.cpp worker/worker.hpp env.hpp diff --git a/src/master.cpp b/src/master.cpp index a6c0a4de5a1e882843ac6350ded02f3d1382db59..594f6ac4bebf05de3b2c645ec823931853befaea 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -48,20 +48,29 @@ make_message(std::uint64_t tid, std::uint32_t seqno, const cargo::dataset& input, const cargo::dataset& output) { if(input.supports_parallel_transfer()) { - return std::make_tuple(static_cast(cargo::tag::pread), - cargo::transfer_message{tid, seqno, input.path(), - output.path()}); + return std::make_tuple( + static_cast(cargo::tag::pread), + cargo::transfer_message{ + tid, seqno, input.path(), + static_cast(input.get_type()), output.path(), + static_cast(output.get_type())}); } if(output.supports_parallel_transfer()) { - return std::make_tuple(static_cast(cargo::tag::pwrite), - cargo::transfer_message{tid, seqno, input.path(), - output.path()}); + return std::make_tuple( + static_cast(cargo::tag::pwrite), + cargo::transfer_message{ + tid, seqno, input.path(), + static_cast(input.get_type()), output.path(), + static_cast(output.get_type())}); } return std::make_tuple( - static_cast(cargo::tag::sequential), - cargo::transfer_message{tid, seqno, input.path(), output.path()}); + static_cast(cargo::tag::seq_mixed), + cargo::transfer_message{tid, seqno, input.path(), + static_cast(input.get_type()), + output.path(), + static_cast(input.get_type())}); } } // namespace @@ -71,10 +80,11 @@ using namespace std::literals; namespace cargo { master_server::master_server(std::string name, std::string address, - bool daemonize, std::filesystem::path rundir, std::uint64_t block_size, + bool daemonize, std::filesystem::path rundir, + std::uint64_t block_size, std::optional pidfile) - : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(block_size), - std::move(pidfile)), + : server(std::move(name), std::move(address), daemonize, std::move(rundir), + std::move(block_size), std::move(pidfile)), provider(m_network_engine, 0), m_mpi_listener_ess(thallium::xstream::create()), m_mpi_listener_ult(m_mpi_listener_ess->make_thread( @@ -113,7 +123,7 @@ master_server::mpi_listener_ult() { auto msg = world.iprobe(); if(!msg) { - thallium::thread::self().sleep(m_network_engine, 150); + thallium::thread::self().sleep(m_network_engine, 10); continue; } @@ -122,7 +132,7 @@ master_server::mpi_listener_ult() { status_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_DEBUG("msg => from: {} body: {{payload: {}}}", - msg->source(), m); + msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, m.state(), m.bw(), m.error_code()); @@ -239,12 +249,12 @@ master_server::transfer_datasets(const network::request& req, // Then create a new message for each file and append the // file to the d prefix // We will asume that the path is the original absolute - // The prefix selects the method of transfer + // The prefix selects the method of transfer // And if not specified then we will use none // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then // bbb/xxx -> ttt/xxx const auto& p = s.path(); - + std::vector files; if(std::filesystem::is_directory(p)) { LOGGER_INFO("Expanding input directory {}", p); @@ -269,7 +279,7 @@ master_server::transfer_datasets(const network::request& req, // path (d.path) plus the path from f, removing the // initial path p (taking care of the trailing /) auto leading = p.size(); - if(leading>0 and p.back() == '/') { + if(leading > 0 and p.back() == '/') { leading--; } @@ -303,8 +313,12 @@ master_server::transfer_datasets(const network::request& req, const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; - // Create the directory if it does not exist - if(!std::filesystem::path(d.path()).parent_path().empty()) { + // Create the directory if it does not exist (only in + // parallel transfer) + if(!std::filesystem::path(d.path()) + .parent_path() + .empty() and + d.supports_parallel_transfer()) { std::filesystem::create_directories( std::filesystem::path(d.path()).parent_path()); } diff --git a/src/posix_file/CMakeLists.txt b/src/posix_file/CMakeLists.txt index b916a854e1a9e47203512ec216726bab12b50060..cfbc9718693dd4202bda6335b4251f0036def678 100644 --- a/src/posix_file/CMakeLists.txt +++ b/src/posix_file/CMakeLists.txt @@ -22,21 +22,57 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_library(posix_file INTERFACE) +add_library(posix_file STATIC) +set(GEKKO_INCLUDES "") + +if (GEKKOFS_PLUGIN) + set(GEKKO_INCLUDES posix_file/fs_plugin/gekko_plugin.hpp posix_file/fs_plugin/gekko_plugin.cpp) +endif() +if (EXPAND_plugin) + set(EXPAND_INCLUDES posix_file/fs_plugin/expand_plugin.hpp posix_file/fs_plugin/expand_plugin.cpp) +endif() +if (HERCULES_plugin) + set(HERCULES_INCLUDES posix_file/fs_plugin/hercules_plugin.chpp posix_file/fs_plugin/hercules_plugin.cpp) +endif() + target_sources( posix_file - INTERFACE posix_file/types.hpp + PRIVATE posix_file/types.hpp posix_file/file.hpp posix_file/ranges.hpp posix_file/views.hpp posix_file/math.hpp posix_file/views/block_iterator.hpp posix_file/views/strided_iterator.hpp + posix_file/fs_plugin/fs_plugin.hpp + posix_file/fs_plugin/posix_plugin.hpp + posix_file/fs_plugin/fs_plugin.cpp + posix_file/fs_plugin/posix_plugin.cpp + ${GEKKO_INCLUDES} + ${HERCULES_INCLUDES} + ${EXPAND_INCLUDES} + ) + target_include_directories(posix_file INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) set_property(TARGET posix_file PROPERTY POSITION_INDEPENDENT_CODE ON) +set(ADHOC "") + +if (GEKKOFS_PLUGIN) + set(ADHOC ${ADHOC} GekkoFS::GekkoFS) +endif() + +if (EXPAND_PLUGIN) + set(ADHOC ${ADHOC} Expand::Expand) +endif() + +if (HERCULES_PLUGIN) + set(ADHOC ${ADHOC} Hercules::Hercules) +endif() + + +target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected PRIVATE ${ADHOC}) -target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 03b4bc721d525d280dbd7efde528cbbdec660a3e..03d6fbec7bee510d713b6aad78f55f4ea5735d56 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -31,7 +31,9 @@ #include #include #include - +#include "fs_plugin/fs_plugin.hpp" +#include "cargo.hpp" +#include extern "C" { #include }; @@ -151,12 +153,20 @@ private: class file { + public: + file(cargo::FSPlugin::type t) { + m_fs_plugin = cargo::FSPlugin::make_fs(t); + }; + explicit file(std::filesystem::path filepath) noexcept : m_path(std::move(filepath)) {} - file(std::filesystem::path filepath, int fd) noexcept - : m_path(std::move(filepath)), m_handle(fd) {} + file(std::filesystem::path filepath, int fd, + std::unique_ptr fs_plugin) noexcept + : m_path(std::move(filepath)), m_handle(fd), + m_fs_plugin(std::move(fs_plugin)) {} + std::filesystem::path path() const noexcept { @@ -182,23 +192,14 @@ public: fallocate(int mode, offset offset, std::size_t len) const { if(!m_handle) { - throw io_error("posix_file::file::fallocate", EBADF); + throw io_error("posix_file::file::fallocate (handle)", EBADF); } - int ret = ::fallocate(m_handle.native(), mode, offset, + int ret = m_fs_plugin->fallocate(m_handle.native(), mode, offset, static_cast(len)); if(ret == -1) { - // Try an alternative to fallocate for beegfs - if(errno == EOPNOTSUPP) { - ret = ::posix_fallocate(m_handle.native(), offset, - static_cast(len)); - if (ret == -1) { - throw io_error("posix_file::file::posix_fallocate", errno); - } else return; - } throw io_error("posix_file::file::fallocate", errno); - } } @@ -217,8 +218,9 @@ public: while(bytes_read < size) { - ssize_t n = ::pread(m_handle.native(), buf.data() + bytes_read, - bytes_left, offset + bytes_read); + ssize_t n = m_fs_plugin->pread(m_handle.native(), + buf.data() + bytes_read, bytes_left, + offset + bytes_read); if(n == 0) { // EOF @@ -257,8 +259,9 @@ public: while(bytes_written < size) { - ssize_t n = ::pwrite(m_handle.native(), buf.data() + bytes_written, - bytes_left, offset + bytes_written); + ssize_t n = m_fs_plugin->pwrite(m_handle.native(), + buf.data() + bytes_written, + bytes_left, offset + bytes_written); if(n == -1) { // Interrupted by a signal, retry @@ -277,26 +280,40 @@ public: return bytes_written; } + protected: const std::filesystem::path m_path; file_handle m_handle; + std::unique_ptr m_fs_plugin; }; + static inline file -open(const std::filesystem::path& filepath, int flags, ::mode_t mode = 0) { +open(const std::filesystem::path& filepath, int flags, ::mode_t mode, + cargo::FSPlugin::type t) { - int fd = ::open(filepath.c_str(), flags, mode); + std::unique_ptr fs_plugin; + fs_plugin = cargo::FSPlugin::make_fs(t); + // We don't check if it exists, we just create it if flags is set to O_CREAT + + if(flags & O_CREAT) { + fs_plugin->mkdir(filepath.parent_path().c_str(), 0755); + } + + int fd = fs_plugin->open(filepath.c_str(), flags, mode); + if(fd == -1) { - throw io_error("posix_file::open", errno); + throw io_error("posix_file::open ", errno); } - return file{filepath, fd}; + return file{filepath, fd, std::move(fs_plugin)}; } static inline file -create(const std::filesystem::path& filepath, int flags, ::mode_t mode) { - return open(filepath, O_CREAT | flags, mode); +create(const std::filesystem::path& filepath, int flags, ::mode_t mode, + cargo::FSPlugin::type t) { + return open(filepath, O_CREAT | flags, mode, t); } } // namespace posix_file diff --git a/src/posix_file/posix_file/fs_plugin/expand_plugin.cpp b/src/posix_file/posix_file/fs_plugin/expand_plugin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ef510b0a1c5aa62b49c4ee297503d341dc1d3bcc --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/expand_plugin.cpp @@ -0,0 +1,64 @@ + +#include "fs_plugin.hpp" +#include "expand_plugin.hpp" + + +#include +namespace cargo { +expand_plugin::expand_plugin() { + int result = expand_init(); + if (result != 0) { + std::cerr << "Failed to initialize expand" << std::endl; + } +} + +expand_plugin::~expand_plugin() { + int result = expand_end(); + if (result != 0) { + std::cerr << "Failed to finalize expand" << std::endl; + } +} +// Override the open function +int +expand_plugin::open(const std::string& path, int flags, unsigned int mode) { + return expand_open(path, flags, mode); +} + +// Override the pread function +ssize_t +expand_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return expand_pread_ws(fd, buf, count, offset); +} + +// Override the pwrite function +ssize_t +expand_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return expand_pwrite_ws(fd, buf, count, offset); +} + + +bool +expand_plugin::mkdir(const std::string& path, mode_t mode) { + int result = expand_create(path, mode | S_IFDIR); + return result; +} + +bool +expand_plugin::close(int fd) { + return expand_close(fd); +} + +off_t +expand_plugin::lseek(int fd, off_t offset, int whence) { + return expand_lseek(fd, offset, whence); +} + +off_t +expand_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/expand_plugin.hpp b/src/posix_file/posix_file/fs_plugin/expand_plugin.hpp new file mode 100644 index 0000000000000000000000000000000000000000..78a7dfc8d6e7da7f696aaac09650ed9f0495d221 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/expand_plugin.hpp @@ -0,0 +1,30 @@ + +#ifndef EXPAND_PLUGIN_HPP +#define EXPAND_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class expand_plugin : public FSPlugin { + +public: + expand_plugin(); + ~expand_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +}; // namespace cargo + +#endif // EXPAND_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..af02eeaeaf193b1ec9872add2bc1aee2447d4e74 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -0,0 +1,37 @@ +#include "fs_plugin.hpp" +#include "posix_plugin.hpp" +#ifdef GEKKO_PLUGIN +#include "gekko_plugin.hpp" +#endif +#ifdef HERCULES_PLUGIN +#include "hercules_plugin.hpp" +#endif +#ifdef EXPAND_PLUGIN +#include "expand_plugin.hpp" +#endif + +namespace cargo { + +std::unique_ptr +FSPlugin::make_fs(type t) { + + switch(t) { + case type::posix: + return std::make_unique(); +#ifdef GEKKO_PLUGIN + case type::gekkofs: + return std::make_unique(); +#endif +#ifdef HERCULES_PLUGIN + case type::hercules: + return std::make_unique(); +#endif +#ifdef EXPAND_PLUGIN + case type::expand: + return std::make_unique(); +#endif + default: + return {}; + } +} +} // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c98e74a5ab75aab6ea2c713d774dde49c91cfad9 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp @@ -0,0 +1,41 @@ +#ifndef FS_PLUGIN_HPP +#define FS_PLUGIN_HPP + +#include +#include +#include +#include + +namespace cargo { +class FSPlugin { +public: + enum class type { + posix, + parallel, + none, + gekkofs, + hercules, + expand, + dataclay + }; + static std::unique_ptr make_fs(type); + + virtual ~FSPlugin() = default; + + virtual int + open(const std::string& path, int flags, unsigned int mode) = 0; + virtual bool + close(int fd) = 0; + virtual ssize_t + pread(int fd, void* buf, size_t count, off_t offset) = 0; + virtual ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) = 0; + virtual bool + mkdir(const std::string& path, mode_t mode) = 0; + virtual off_t + lseek(int fd, off_t offset, int whence) = 0; + virtual off_t + fallocate(int fd, int mode, off_t offset, off_t len) = 0; +}; +} // namespace cargo +#endif // FS_PLUGIN_HPP \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1ff0caa70c449c7f3abb18bb85de8b9f354a30a6 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -0,0 +1,65 @@ + +#include "fs_plugin.hpp" +#include "gekko_plugin.hpp" +#include + +#include +namespace cargo { +gekko_plugin::gekko_plugin() { + int result = gkfs_init(); + if (result != 0) { + std::cerr << "Failed to initialize gekkofs" << std::endl; + } +} + +gekko_plugin::~gekko_plugin() { + int result = gkfs_end(); + if (result != 0) { + std::cerr << "Failed to finalize gekkofs" << std::endl; + } +} +// Override the open function +int +gekko_plugin::open(const std::string& path, int flags, unsigned int mode) { + // Call to gekkofs has the signature inverted + return gkfs::syscall::gkfs_open(path, mode, flags); +} + +// Override the pread function +ssize_t +gekko_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return gkfs::syscall::gkfs_pread_ws(fd, buf, count, offset); +} + +// Override the pwrite function +ssize_t +gekko_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, offset); +} + + +bool +gekko_plugin::mkdir(const std::string& path, mode_t mode) { + int result = gkfs::syscall::gkfs_create(path, mode | S_IFDIR); + return result; +} + +bool +gekko_plugin::close(int fd) { + return gkfs::syscall::gkfs_close(fd); +} + +off_t +gekko_plugin::lseek(int fd, off_t offset, int whence) { + return gkfs::syscall::gkfs_lseek(fd, offset, whence); +} + +off_t +gekko_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp new file mode 100644 index 0000000000000000000000000000000000000000..2d27132f8dab81060f5582d89a4fee1d09eb7db2 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp @@ -0,0 +1,31 @@ + +#ifndef GEKKO_PLUGIN_HPP +#define GEKKO_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class gekko_plugin : public FSPlugin { + +public: + gekko_plugin(); + ~gekko_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + // Fallocate is not needed in GekkoFS as pwrite takes care of it. + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +}; // namespace cargo + +#endif // GEKKO_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp b/src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f3c4a11da301cd2d7d0c5ef78749c3833d72539d --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/hercules_plugin.cpp @@ -0,0 +1,64 @@ + +#include "fs_plugin.hpp" +#include "hercules_plugin.hpp" + + +#include +namespace cargo { +hercules_plugin::hercules_plugin() { + int result = hercules_init(); + if (result != 0) { + std::cerr << "Failed to initialize hercules" << std::endl; + } +} + +hercules_plugin::~hercules_plugin() { + int result = hercules_end(); + if (result != 0) { + std::cerr << "Failed to finalize hercules" << std::endl; + } +} +// Override the open function +int +hercules_plugin::open(const std::string& path, int flags, unsigned int mode) { + return hercules_open(path, flags, mode); +} + +// Override the pread function +ssize_t +hercules_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return hercules_pread_ws(fd, buf, count, offset); +} + +// Override the pwrite function +ssize_t +hercules_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return hercules_pwrite_ws(fd, buf, count, offset); +} + + +bool +hercules_plugin::mkdir(const std::string& path, mode_t mode) { + int result = hercules_create(path, mode | S_IFDIR); + return result; +} + +bool +hercules_plugin::close(int fd) { + return hercules_close(fd); +} + +off_t +hercules_plugin::lseek(int fd, off_t offset, int whence) { + return hercules_lseek(fd, offset, whence); +} + +off_t +hercules_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp b/src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp new file mode 100644 index 0000000000000000000000000000000000000000..fdef892f262b817bfce28c34a676a6627d2737ce --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/hercules_plugin.hpp @@ -0,0 +1,30 @@ + +#ifndef HERCULES_PLUGIN_HPP +#define HERCULES_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class hercules_plugin : public FSPlugin { + +public: + hercules_plugin(); + ~hercules_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +}; // namespace cargo + +#endif // HERCULES_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4d0f13481c94b2b0cd1c597e2ef461489e6233f9 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp @@ -0,0 +1,48 @@ +#include +#include +#include +#include +#include "posix_plugin.hpp" + +namespace cargo { + +posix_plugin::posix_plugin() { +} + + +int +posix_plugin::open(const std::string& path, int flags, unsigned int mode) { + return ::open(path.c_str(), flags, mode); +} + +bool +posix_plugin::close(int fd) { + return ::close(fd) == 0; +} + +ssize_t +posix_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return ::pread(fd, buf, count, offset); +} + +ssize_t +posix_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return ::pwrite(fd, buf, count, offset); +} + +bool +posix_plugin::mkdir(const std::string& path, mode_t mode) { + return ::mkdir(path.c_str(), mode) == 0; +} + +off_t +posix_plugin::lseek(int fd, off_t offset, int whence) { + return ::lseek(fd, offset, whence); +} + +off_t +posix_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) mode; + return ::posix_fallocate(fd, offset, static_cast(len)); +} +}; // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp new file mode 100644 index 0000000000000000000000000000000000000000..011028443b10b961597eaa1c6897c628fc09d0e7 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp @@ -0,0 +1,29 @@ + +#ifndef POSIX_PLUGIN_HPP +#define POSIX_PLUGIN_HPP + +#include "fs_plugin.hpp" +#include +namespace cargo { +class posix_plugin : public FSPlugin { + +public: + posix_plugin(); + + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; +}; +} // namespace cargo +#endif // POSIX_PLUGIN_HPP diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp index bc618f2ec05bc6acfb8ecbecac662e00bbcacfbe..a6b1d2c9ec1000d48206517806d52ba785159683 100644 --- a/src/proto/mpi/message.hpp +++ b/src/proto/mpi/message.hpp @@ -32,6 +32,7 @@ #include #include "cargo.hpp" #include "boost_serialization_std_optional.hpp" +#include "posix_file/file.hpp" namespace cargo { @@ -39,6 +40,7 @@ enum class tag : int { pread, pwrite, sequential, + seq_mixed, bw_shaping, status, shutdown @@ -52,9 +54,11 @@ public: transfer_message() = default; transfer_message(std::uint64_t tid, std::uint32_t seqno, - std::string input_path, std::string output_path) + std::string input_path, std::uint32_t i_type, + std::string output_path, std::uint32_t o_type) : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)) {} + m_i_type(i_type), m_output_path(std::move(output_path)), + m_o_type(o_type) {} [[nodiscard]] std::uint64_t tid() const { @@ -75,6 +79,17 @@ public: output_path() const { return m_output_path; } + /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */ + [[nodiscard]] cargo::FSPlugin::type + o_type() const { + return static_cast(m_o_type); + } + + /* Enum is converted from cargo::dataset::type to cargo::FSPlugin::type */ + [[nodiscard]] cargo::FSPlugin::type + i_type() const { + return static_cast(m_i_type); + } private: template @@ -86,12 +101,16 @@ private: ar& m_seqno; ar& m_input_path; ar& m_output_path; + ar& m_i_type; + ar& m_o_type; } std::uint64_t m_tid{}; std::uint32_t m_seqno{}; std::string m_input_path; + std::uint32_t m_i_type{}; std::string m_output_path; + std::uint32_t m_o_type{}; }; class status_message { diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index e9f60241bec0068033c223473f091c04d67856ab..92f0a05955f997f3bae9cf8c56bb01054695cd51 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -32,9 +32,12 @@ namespace cargo { mpio_read::mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) + std::filesystem::path output_path, + std::uint64_t block_size, FSPlugin::type fs_i_type, + FSPlugin::type fs_o_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} + m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)), + m_fs_i_type(fs_i_type), m_fs_o_type(fs_o_type) {} cargo::error_code mpio_read::operator()() { @@ -122,8 +125,10 @@ mpio_read::operator()() { } // step3. POSIX write data - m_output_file = std::make_unique( - posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR)); + // We need to create the directory if it does not exists (using + // FSPlugin) + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); m_output_file->fallocate(0, 0, file_size); @@ -179,6 +184,8 @@ mpio_read::progress(int ongoing_index) { auto start = std::chrono::steady_clock::now(); m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), file_range.size()); + // Do sleep + std::this_thread::sleep_for(sleep_value()); auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -192,8 +199,6 @@ mpio_read::progress(int ongoing_index) { m_block_size / 1024.0, elapsed_seconds, bw(), sleep_value()); } - // Do sleep - std::this_thread::sleep_for(sleep_value()); ++index; } diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp index ce17c9876e546ad0fe4afe56e22dfee3a6963bc4..bd72a6e4a38a4860bc6a1faeb2b7635f5ded0ff5 100644 --- a/src/worker/mpio_read.hpp +++ b/src/worker/mpio_read.hpp @@ -38,7 +38,8 @@ class mpio_read : public operation { public: mpio_read(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size); + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type m_fs_o_type); cargo::error_code operator()() final; @@ -47,15 +48,14 @@ public: progress() const final; int - progress(int ongoing_index ) final; + progress(int ongoing_index) final; private: mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; cargo::error_code m_status; - - + std::unique_ptr m_output_file; int m_workers_size; int m_workers_rank; @@ -63,7 +63,8 @@ private: memory_buffer m_buffer; std::vector m_buffer_regions; std::uint64_t m_kb_size; - + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; }; } // namespace cargo diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 5a84ca75ff9ce22d04315efa0849c8e0cacfb066..44b8070417f23d68f30ef71f90428eee0eaa6692 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -40,7 +40,7 @@ mpio_write::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); - std::size_t block_size = m_kb_size * 1024u; + std::size_t block_size = m_kb_size * 1024u; std::size_t file_size = std::filesystem::file_size(m_input_path); // compute the number of blocks in the file @@ -69,7 +69,7 @@ mpio_write::operator()() { } m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY)); + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); m_workers_size = workers_size; m_workers_rank = workers_rank; @@ -140,6 +140,10 @@ mpio_write::progress(int ongoing_index) { fmt::join(buffer_regions[index].end() - 10, buffer_regions[index].end(), "")); + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -153,9 +157,6 @@ mpio_write::progress(int ongoing_index) { sleep_value()); } - m_bytes_per_rank += n; - // Do sleep - std::this_thread::sleep_for(sleep_value()); ++index; } diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp index 05755ff55a2576acd2b542f4142c1b77fd8cbf6f..eb50129e7a14514e3986eac469a1d81f4f26fec8 100644 --- a/src/worker/mpio_write.hpp +++ b/src/worker/mpio_write.hpp @@ -38,9 +38,12 @@ class mpio_write : public operation { public: mpio_write(mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) : m_workers(std::move(workers)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} + m_output_path(std::move(output_path)), + m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), + m_fs_o_type(fs_o_type) {} cargo::error_code operator()() final; @@ -71,6 +74,8 @@ private: std::vector m_buffer_regions; std::size_t m_bytes_per_rank; std::uint64_t m_kb_size; + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; }; } // namespace cargo diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp index 3e380dd595e809c9bc0771a1f7ed915747de0fa8..0d72c18542115371156fc5d4c9bd5967d6f89f0b 100644 --- a/src/worker/ops.cpp +++ b/src/worker/ops.cpp @@ -26,6 +26,7 @@ #include "mpio_read.hpp" #include "mpio_write.hpp" #include "sequential.hpp" +#include "seq_mixed.hpp" namespace mpi = boost::mpi; @@ -34,21 +35,27 @@ namespace cargo { std::unique_ptr operation::make_operation(cargo::tag t, mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) { + std::filesystem::path output_path, + std::uint64_t block_size, FSPlugin::type fs_i_type, + FSPlugin::type fs_o_type) { using cargo::tag; switch(t) { case tag::pread: - return std::make_unique(std::move(workers), - std::move(input_path), - std::move(output_path), block_size); + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); case tag::pwrite: - return std::make_unique(std::move(workers), - std::move(input_path), - std::move(output_path), block_size); + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); case tag::sequential: - return std::make_unique(std::move(workers), - std::move(input_path), - std::move(output_path), block_size); + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); + case tag::seq_mixed: + return std::make_unique( + std::move(workers), std::move(input_path), + std::move(output_path), block_size, fs_i_type, fs_o_type); default: return {}; } @@ -90,7 +97,8 @@ operation::bw() { return m_bw; } -void operation::bw(float_t bw) { +void +operation::bw(float_t bw) { m_bw = bw; } void diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp index b7e0eb8e1647a96e1b9ee2484262663dcb073eef..ba3c1565cbf6ac892c049fbf35b469eda1f1dab9 100644 --- a/src/worker/ops.hpp +++ b/src/worker/ops.hpp @@ -30,6 +30,7 @@ #include #include "proto/mpi/message.hpp" #include "cargo.hpp" +#include "posix_file/file.hpp" namespace cargo { /** @@ -41,7 +42,8 @@ public: static std::unique_ptr make_operation(cargo::tag t, boost::mpi::communicator workers, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size); + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type); virtual ~operation() = default; diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b6dec2f28d49ca501d3e2d50fe23b28347ac6388 --- /dev/null +++ b/src/worker/seq_mixed.cpp @@ -0,0 +1,188 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include "seq_mixed.hpp" +#include + +namespace cargo { + +cargo::error_code +seq_mixed_operation::operator()() { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + m_status = error_code::transfer_in_progress; + try { + + const auto workers_size = m_workers.size(); + const auto workers_rank = m_workers.rank(); + std::size_t block_size = m_kb_size * 1024u; + std::size_t file_size = std::filesystem::file_size(m_input_path); + + // compute the number of blocks in the file + int total_blocks = static_cast(file_size / block_size); + + if(file_size % block_size != 0) { + ++total_blocks; + } + + // find how many blocks this rank is responsible for + std::size_t blocks_per_rank = total_blocks / workers_size; + + if(int64_t n = total_blocks % workers_size; + n != 0 && workers_rank < n) { + ++blocks_per_rank; + } + + // step 1. acquire buffers + + m_buffer.resize(blocks_per_rank * block_size); + m_buffer_regions.reserve(blocks_per_rank); + + for(std::size_t i = 0; i < blocks_per_rank; ++i) { + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); + } + + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + + m_output_file->fallocate(0, 0, file_size); + + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + m_file_size = file_size; + m_total_blocks = total_blocks; + + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } + + return error_code::transfer_in_progress; +} + +cargo::error_code +seq_mixed_operation::progress() const { + return m_status; +} + +int +seq_mixed_operation::progress(int ongoing_index) { + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + // compute the number of blocks in the file + + int index = 0; + if(write == false) { + if(ongoing_index == 0) { + m_bytes_per_rank = 0; + } + try { + for(const auto& file_range : + all_of(*m_input_file) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + m_status = error_code::transfer_in_progress; + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + const std::size_t n = m_input_file->pread( + m_buffer_regions[index], file_range.offset(), + file_range.size()); + + LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", + fmt::join(buffer_regions[index].begin(), + buffer_regions[index].begin() + 10, ""), + fmt::join(buffer_regions[index].end() - 10, + buffer_regions[index].end(), "")); + +/* Do write */ + m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), + file_range.size()); + + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast< + std::chrono::duration>(end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_DEBUG( + "BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + ++index; + } + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + } + + m_status = error_code::success; + return -1; +} + + +} // namespace cargo diff --git a/src/worker/seq_mixed.hpp b/src/worker/seq_mixed.hpp new file mode 100644 index 0000000000000000000000000000000000000000..c3545ccfada3145587e51ac7ae5b68fa80955eb0 --- /dev/null +++ b/src/worker/seq_mixed.hpp @@ -0,0 +1,82 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_SEQ_MIXED_HPP +#define CARGO_WORKER_SEQ_MIXED_HPP + +#include "ops.hpp" +#include +#include +#include "ops.hpp" +#include "memory.hpp" + +namespace mpi = boost::mpi; + +namespace cargo { + +class seq_mixed_operation : public operation { + +public: + seq_mixed_operation(mpi::communicator workers, std::filesystem::path input_path, + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + : m_workers(std::move(workers)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)), + m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), + m_fs_o_type(fs_o_type) {} + + cargo::error_code + operator()() final; + cargo::error_code + progress() const; + + int + progress(int ongoing_index) final; + +private: + mpi::communicator m_workers; + std::filesystem::path m_input_path; + std::filesystem::path m_output_path; + + std::unique_ptr m_input_file; + std::unique_ptr m_output_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + std::size_t m_file_size; + int m_total_blocks; + + memory_buffer m_buffer; + std::vector m_buffer_regions; + std::size_t m_bytes_per_rank; + std::uint64_t m_kb_size; + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; + cargo::error_code m_status; + bool write{}; +}; + +} // namespace cargo + +#endif // CARGO_WORKER_SEQUENTIAL_HPP diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 40f5e4c038baaebb1b53a6c91ee1b7914d8122c7..4254d2c64256362ffee0369eae873ab0364be6cc 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -24,15 +24,72 @@ #include #include "sequential.hpp" - +#include namespace cargo { cargo::error_code seq_operation::operator()() { - LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); - m_status = cargo::error_code::not_implemented; - return cargo::error_code::not_implemented; + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + m_status = error_code::transfer_in_progress; + try { + + const auto workers_size = m_workers.size(); + const auto workers_rank = m_workers.rank(); + std::size_t block_size = m_kb_size * 1024u; + std::size_t file_size = std::filesystem::file_size(m_input_path); + + // compute the number of blocks in the file + int total_blocks = static_cast(file_size / block_size); + + if(file_size % block_size != 0) { + ++total_blocks; + } + + // find how many blocks this rank is responsible for + std::size_t blocks_per_rank = total_blocks / workers_size; + + if(int64_t n = total_blocks % workers_size; + n != 0 && workers_rank < n) { + ++blocks_per_rank; + } + + // step 1. acquire buffers + + m_buffer.resize(blocks_per_rank * block_size); + m_buffer_regions.reserve(blocks_per_rank); + + for(std::size_t i = 0; i < blocks_per_rank; ++i) { + m_buffer_regions.emplace_back(m_buffer.data() + i * block_size, + block_size); + } + + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + + m_workers_size = workers_size; + m_workers_rank = workers_rank; + m_block_size = block_size; + m_file_size = file_size; + m_total_blocks = total_blocks; + + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return make_system_error(e.error_code()); + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return make_system_error(e.code().value()); + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return error_code::other; + } + + return error_code::transfer_in_progress; } cargo::error_code @@ -42,9 +99,145 @@ seq_operation::progress() const { int seq_operation::progress(int ongoing_index) { - ongoing_index++; - m_status = cargo::error_code::not_implemented; + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + // compute the number of blocks in the file + + int index = 0; + if(write == false) { + if(ongoing_index == 0) { + m_bytes_per_rank = 0; + } + try { + for(const auto& file_range : + all_of(*m_input_file) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + m_status = error_code::transfer_in_progress; + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + const std::size_t n = m_input_file->pread( + m_buffer_regions[index], file_range.offset(), + file_range.size()); + + LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", + fmt::join(buffer_regions[index].begin(), + buffer_regions[index].begin() + 10, ""), + fmt::join(buffer_regions[index].end() - 10, + buffer_regions[index].end(), "")); + + + m_bytes_per_rank += n; + // Do sleep + std::this_thread::sleep_for(sleep_value()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast< + std::chrono::duration>(end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_DEBUG( + "BW (read) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + ++index; + } + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + } + write = true; + // We finished reading + // step3. POSIX write data + // We need to create the directory if it does not exists (using + // FSPlugin) + + if ( write and ongoing_index == 0 ) { + m_output_file = std::make_unique(posix_file::create( + m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); + + m_output_file->fallocate(0, 0, m_file_size); + } + + try { + int index = 0; + m_status = error_code::transfer_in_progress; + for(const auto& file_range : + all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + strided(m_workers_size, m_workers_rank)) { + if(index < ongoing_index) { + ++index; + continue; + } else { + if(index > ongoing_index) { + return index; + } + } + + assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); + m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), + file_range.size()); + // Do sleep + std::this_thread::sleep_for(sleep_value()); + auto end = std::chrono::steady_clock::now(); + // Send transfer bw + double elapsed_seconds = + std::chrono::duration_cast>( + end - start) + .count(); + if((elapsed_seconds) > 0) { + bw((m_block_size / (1024.0 * 1024.0)) / (elapsed_seconds)); + LOGGER_DEBUG( + "BW (write) Update: {} / {} = {} mb/s [ Sleep {} ]", + m_block_size / 1024.0, elapsed_seconds, bw(), + sleep_value()); + } + + ++index; + } + + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + m_status = make_system_error(e.error_code()); + return -1; + } catch(const std::system_error& e) { + LOGGER_ERROR("Unexpected system error: {}", e.what()); + m_status = make_system_error(e.code().value()); + return -1; + } catch(const std::exception& e) { + LOGGER_ERROR("Unexpected exception: {}", e.what()); + m_status = error_code::other; + return -1; + } + + m_status = error_code::success; return -1; } + } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index cb5860bd30c87f2405441c62c40926ed173bf61f..508c1979fb6f22690ee1b60b8ff33795e3922b26 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -26,6 +26,10 @@ #define CARGO_WORKER_SEQUENTIAL_HPP #include "ops.hpp" +#include +#include +#include "ops.hpp" +#include "memory.hpp" namespace mpi = boost::mpi; @@ -34,25 +38,43 @@ namespace cargo { class seq_operation : public operation { public: - seq_operation(mpi::communicator comm, std::filesystem::path input_path, - std::filesystem::path output_path, std::uint64_t block_size) - : m_comm(std::move(comm)), m_input_path(std::move(input_path)), - m_output_path(std::move(output_path)), m_kb_size(std::move(block_size)) {} + seq_operation(mpi::communicator workers, std::filesystem::path input_path, + std::filesystem::path output_path, std::uint64_t block_size, + FSPlugin::type fs_i_type, FSPlugin::type fs_o_type) + : m_workers(std::move(workers)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)), + m_kb_size(std::move(block_size)), m_fs_i_type(fs_i_type), + m_fs_o_type(fs_o_type) {} cargo::error_code operator()() final; cargo::error_code progress() const; - int - progress(int ongoing_index ) final; + int + progress(int ongoing_index) final; private: - mpi::communicator m_comm; + mpi::communicator m_workers; std::filesystem::path m_input_path; std::filesystem::path m_output_path; - cargo::error_code m_status; + + std::unique_ptr m_input_file; + std::unique_ptr m_output_file; + int m_workers_size; + int m_workers_rank; + std::size_t m_block_size; + std::size_t m_file_size; + int m_total_blocks; + + memory_buffer m_buffer; + std::vector m_buffer_regions; + std::size_t m_bytes_per_rank; std::uint64_t m_kb_size; + FSPlugin::type m_fs_i_type; + FSPlugin::type m_fs_o_type; + cargo::error_code m_status; + bool write{}; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 00a3b3c1a83746825b98fa70d722ffbeb200539b..a680c840603be908b83db24627597a734f688675 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -74,7 +74,7 @@ worker::set_output_file(std::filesystem::path output_file) { m_output_file = std::move(output_file); } -void +void worker::set_block_size(std::uint64_t block_size) { m_block_size = block_size; } @@ -107,40 +107,42 @@ worker::run() { bool done = false; while(!done) { - - auto msg = world.iprobe(); - - if(!msg) { - // FIXME: sleep time should be configurable - - // Progress through all transfers - - auto I = m_ops.begin(); - auto IE = m_ops.end(); - if(I != IE) { - auto op = I->second.first.get(); - int index = I->second.second; - if(op) { - - index = op->progress(index); - if(index == -1) { - // operation finished - cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - 0.0f, ec); - - // Transfer finished - I = m_ops.erase(I); - } else { + // Always loop pending operations + + auto I = m_ops.begin(); + auto IE = m_ops.end(); + if(I != IE) { + auto op = I->second.first.get(); + int index = I->second.second; + if(op) { + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + 0.0f, ec); + + // Transfer finished + I = m_ops.erase(I); + } else { + // update only if BW is set + if(op->bw() > 0.0f) { update_state(op->source(), op->tid(), op->seqno(), transfer_state::running, op->bw()); - I->second.second = index; } + I->second.second = index; + ++I; } } + } + + auto msg = world.iprobe(); + + if(!msg) { + // Only wait if there are no pending operations and no messages if(m_ops.size() == 0) { std::this_thread::sleep_for(150ms); } @@ -152,15 +154,18 @@ worker::run() { [[fallthrough]]; case tag::pwrite: [[fallthrough]]; + case tag::seq_mixed: + [[fallthrough]]; case tag::sequential: { transfer_message m; world.recv(msg->source(), msg->tag(), m); LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); m_ops.emplace(std::make_pair( make_pair(m.input_path(), m.output_path()), - make_pair(operation::make_operation(t, workers, - m.input_path(), - m.output_path(), m_block_size), + make_pair(operation::make_operation( + t, workers, m.input_path(), + m.output_path(), m_block_size, + m.i_type(), m.o_type()), 0))); const auto op = @@ -189,7 +194,6 @@ worker::run() { for(auto I = m_ops.begin(); I != m_ops.end(); I++) { const auto op = I->second.first.get(); if(op) { - op->set_bw_shaping(m.shaping()); } else { LOGGER_INFO("Operation non existent", msg->source(), m);