diff --git a/.gitignore b/.gitignore index a34243d89557879056c191c2d01f7041d5ea2930..cf4f6963f2cb22cc1b858468518400b392dd4b1b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ build* -cmake-build-debug +cmake-build-* .idea -.vscode +.vscode \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 5fffd325c8d55c43739a3b67415d1232f4e7574d..7dd8c7b9646c64358ccac3eee992ba21eb8c9bdd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.3.5 + VERSION 0.3.6 LANGUAGES C CXX ) @@ -281,6 +281,17 @@ if (Expand_FOUND) message(STATUS "[${PROJECT_NAME}] Found Expand") endif() +### DataClay: Optional for DataClay +find_package(DataClay) +if (DataClay_FOUND) + add_compile_definitions(DATACLAY_PLUGIN) + add_compile_definitions(DataClay_PATH="${DataClay_MODEL_DIR}/dataclay-plugin") + message(STATUS "[${PROJECT_NAME}] Found DataClay") +endif() + + + + ### Threads: required by ASIO find_package(Threads REQUIRED) diff --git a/COPYRIGHT_NOTICE b/COPYRIGHT_NOTICE index e8a05efb8903fe5e57032d0393743f7b9054714b..4c06aeba626865bb26fe07bf78ae321d6f8e5449 100644 --- a/COPYRIGHT_NOTICE +++ b/COPYRIGHT_NOTICE @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * Copyright 2022-2024, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). diff --git a/README.md b/README.md index c43948aa6d1df5809ec4ca35cc1842c357e594a1..b19c751928edc61ff2957f7aa35bd2c6cd5e06c8 100644 --- a/README.md +++ b/README.md @@ -190,8 +190,25 @@ cli/ccp --server ofi+tcp://127.0.0.1:62000 --input /directory/subdir --output /d `--if or --of` can be: posix, gekkofs, hercules, dataclay, expand and parallel (for MPIIO requests, but only one side is allowed). Typically you should use posix or parallel and then one specialized adhocfs. Posix is also able to be used with LD_PRELOAD, however -higher performance and flexibility can be obtained using the specific configuration. +higher performance and flexibility can be obtained using the specific configuration. Some backends are only available with directory support for stage-in. On the other hand, MPIIO (parallel) uses normally file locking so there is a performance imapact, and posix is faster (we supose no external modifications are done). -Other commands are `ping`, `shutdown` and `shaping` (for bw control). \ No newline at end of file +Other commands are `ping`, `shutdown`, `shaping` (for bw control) and `cargo_ftio` to interactions with ftio (stage-out and gekkofs) + +`cargo_ftio` provides --resume, --pause and --run options to pause and resume the ftio related transfers. We set ftio transfers, the transfers that have gekkofs as --of, that had been setup after a ftio command. + +```shell +#SETUP FTIO, this enables stage-out to be delayed (10000 seconds) +cargo_ftio --server tcp://127.0.0.1:62000 -c -1 -p -1 -t 10000 +#SETUP Stage-out (monitors data directory and subdirs for new file) +ccp --server tcp://127.0.0.1:62000 --input /data --output ~/stage-out --if gekkofs --of parallel +#UPDATE FTIO (as needed, each 25 seconds will do the transfer order) +cargo_ftio --server tcp://127.0.0.1:62000 -c -1 -p -1 -t 25 +``` + +## User libraries for adhocfs +If Cargo finds the adhoc fs libraries (we support GekkoFS and dataclay, in this release), it will automatically use them. +The CMake command will show which adhocfs are detected. + +On the other hand, LD_preload techniques could be used. \ No newline at end of file diff --git a/cli/CMakeLists.txt b/cli/CMakeLists.txt index 68fea90ffd7cae09c4422a0be73facf9bcb65f93..f779733b61c59bd77f8500afe75e609b16c359f0 100644 --- a/cli/CMakeLists.txt +++ b/cli/CMakeLists.txt @@ -101,7 +101,26 @@ target_link_libraries(shaping cargo ) -install(TARGETS cargo_ping cargo_shutdown ccp shaping + +################################################################################ +## ftio: A CLI tool to send the ftio info to a Cargo server +add_executable(cargo_ftio) + +target_sources(cargo_ftio + PRIVATE + ftio.cpp +) + +target_link_libraries(cargo_ftio + PUBLIC + fmt::fmt + CLI11::CLI11 + net::rpc_client + cargo +) + + +install(TARGETS cargo_ping cargo_shutdown ccp shaping cargo_ftio RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} ) diff --git a/cli/copy.cpp b/cli/copy.cpp index f55a40281d54680248497c278a3d58d0d9fdf95b..dcccfa2e5bbada26fa19f205523367d68501b5a8 100644 --- a/cli/copy.cpp +++ b/cli/copy.cpp @@ -78,6 +78,7 @@ parse_command_line(int argc, char* argv[]) { "Flags for input datasets. Accepted values\n" " - posix: read data using POSIX (default)\n" " - parallel: read data using MPI-IO\n" + " - dataclay: read data using DATACLAY\n" " - gekkofs: read data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, @@ -87,6 +88,7 @@ parse_command_line(int argc, char* argv[]) { "Flags for output datasets. Accepted values\n" " - posix: write data using POSIX (default)\n" " - parallel: write data using MPI-IO\n" + " - dataclay: write data using DATACLAY\n" " - gekkofs: write data using gekkofs user library\n") ->option_text("FLAGS") ->transform(CLI::CheckedTransformer(dataset_flags_map, diff --git a/cli/ftio.cpp b/cli/ftio.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ddeb25ec5e113f1ffd28e541eb1dacaa60005973 --- /dev/null +++ b/cli/ftio.cpp @@ -0,0 +1,133 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include + +struct ftio_config { + std::string progname; + std::string server_address; + float confidence; + float probability; + float period; + bool run{false}; + bool pause{false}; + bool resume{false}; +}; + +ftio_config +parse_command_line(int argc, char* argv[]) { + + ftio_config cfg; + + cfg.progname = std::filesystem::path{argv[0]}.filename().string(); + + CLI::App app{"Cargo ftio client", cfg.progname}; + + app.add_option("-s,--server", cfg.server_address, "Server address") + ->option_text("ADDRESS") + ->required(); + + app.add_option("-c,--conf", cfg.confidence, "confidence") + ->option_text("float") + ->default_val("-1.0"); + + app.add_option("-p,--probability", cfg.probability, "probability") + ->option_text("float") + ->default_val("-1.0"); + + app.add_option("-t,--period", cfg.period, "period") + ->option_text("float") + ->default_val("-1.0"); + + app.add_flag( + "--run", cfg.run, + "Trigger stage operation to run now. Has no effect when period is set > 0"); + + app.add_flag( + "--pause", cfg.pause, + "Trigger stage operation to slow down. Other parameters not used"); + + app.add_flag("--resume", cfg.resume, "Trigger stage operation to resume, only pause or resume will take into account. Others parameters not used."); + + try { + app.parse(argc, argv); + return cfg; + } catch(const CLI::ParseError& ex) { + std::exit(app.exit(ex)); + } +} + +auto +parse_address(const std::string& address) { + const auto pos = address.find("://"); + if(pos == std::string::npos) { + throw std::runtime_error(fmt::format("Invalid address: {}", address)); + } + + const auto protocol = address.substr(0, pos); + return std::make_pair(protocol, address); +} + + +int +main(int argc, char* argv[]) { + + ftio_config cfg = parse_command_line(argc, argv); + + try { + const auto [protocol, address] = parse_address(cfg.server_address); + network::client rpc_client{protocol}; + + if(const auto result = rpc_client.lookup(address); result.has_value()) { + const auto& endpoint = result.value(); + const auto retval = + endpoint.call("ftio_int", cfg.confidence, cfg.probability, + cfg.period, cfg.run, cfg.pause, cfg.resume); + + if(retval.has_value()) { + + auto error_code = int{retval.value()}; + + fmt::print("ftio_int RPC was successful!\n"); + fmt::print(" (server replied with: {})\n", error_code); + return EXIT_SUCCESS; + } + + fmt::print(stderr, "ftio_int RPC failed\n"); + return EXIT_FAILURE; + + } else { + fmt::print(stderr, "Failed to lookup address: {}\n", address); + return EXIT_FAILURE; + } + } catch(const std::exception& ex) { + fmt::print(stderr, "Error: {}\n", ex.what()); + return EXIT_FAILURE; + } +} diff --git a/cmake/FindDataClay.cmake b/cmake/FindDataClay.cmake new file mode 100644 index 0000000000000000000000000000000000000000..30ae1c097efccce92fb84e6fb668309bb68ab346 --- /dev/null +++ b/cmake/FindDataClay.cmake @@ -0,0 +1,74 @@ +################################################################################ +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of cargo. # +# # +# cargo is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# cargo is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with cargo. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + + +find_path(DataClay_INCLUDE_DIR + NAMES dataclayplugin.h + PREFIX dataclay-plugin +) + +find_path(DataClay_MODEL_DIR + NAMES client.py + PREFIX dataclay-plugin +) +message(STATUS "[${PROJECT_NAME}] DataClay library MODEL DIR ${DataClay_MODEL_DIR}") + + + + +find_library(DataClay_LIBRARY + NAMES dataclay-plugin/libdataclayplugin.so +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + DataClay + DEFAULT_MSG + DataClay_INCLUDE_DIR + DataClay_LIBRARY + DataClay_MODEL_DIR +) + +if(DataClay_FOUND) + find_package(Python3 REQUIRED Development) + message(STATUS "[${PROJECT_NAME}] DataClay library needs Python include ${Python3_INCLUDE_DIRS}") + + set(DataClay_LIBRARIES ${DataClay_LIBRARY}) + set(DataClay_INCLUDE_DIRS ${DataClay_INCLUDE_DIR} ) + + + if(NOT TARGET DataClay::DataClay) + add_library(DataClay::DataClay UNKNOWN IMPORTED) + set_target_properties(DataClay::DataClay PROPERTIES + IMPORTED_LOCATION "${DataClay_LIBRARY}" + INTERFACE_INCLUDE_DIRECTORIES "${DataClay_INCLUDE_DIR};${Python3_INCLUDE_DIRS}" + ) + endif() +endif() + + +mark_as_advanced( + DataClay_INCLUDE_DIR + DataClay_LIBRARY +) diff --git a/cmake/FindExpand.cmake b/cmake/FindExpand.cmake index d7806046e4ba72214dc91c7bc96fb88b6b92cb27..bcf79f3feb918b22550fad7b5549e1e9c6dbf27e 100644 --- a/cmake/FindExpand.cmake +++ b/cmake/FindExpand.cmake @@ -1,27 +1,23 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # # # -# This software was partially supported by the # -# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # # # -# This software was partially supported by the # -# ADA-FS project under the SPPEXA project funded by the DFG. # +# This file is part of cargo. # # # -# This file is part of GekkoFS. # -# # -# GekkoFS is free software: you can redistribute it and/or modify # +# cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # -# GekkoFS is distributed in the hope that it will be useful, # +# cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # -# along with GekkoFS. If not, see . # +# along with cargo. If not, see . # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ diff --git a/cmake/FindGekkoFS.cmake b/cmake/FindGekkoFS.cmake index 7aebff47b72eb2ea8cf79dc26535f3a54f488103..26f05cf6095e11ef2bada403cde79f11d0145f81 100644 --- a/cmake/FindGekkoFS.cmake +++ b/cmake/FindGekkoFS.cmake @@ -1,27 +1,23 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # # # -# This software was partially supported by the # -# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # # # -# This software was partially supported by the # -# ADA-FS project under the SPPEXA project funded by the DFG. # +# This file is part of cargo. # # # -# This file is part of GekkoFS. # -# # -# GekkoFS is free software: you can redistribute it and/or modify # +# cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # -# GekkoFS is distributed in the hope that it will be useful, # +# cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # -# along with GekkoFS. If not, see . # +# along with cargo. If not, see . # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ diff --git a/cmake/FindHercules.cmake b/cmake/FindHercules.cmake index 2f877abd278d99163b8eeb81e6ff05c07a4a7272..1654e4fa357f27b6a6442b219efd2509900c0a62 100644 --- a/cmake/FindHercules.cmake +++ b/cmake/FindHercules.cmake @@ -1,27 +1,23 @@ ################################################################################ -# Copyright 2018-2023, Barcelona Supercomputing Center (BSC), Spain # -# Copyright 2015-2023, Johannes Gutenberg Universitaet Mainz, Germany # +# Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain # # # -# This software was partially supported by the # -# EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu). # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # # # -# This software was partially supported by the # -# ADA-FS project under the SPPEXA project funded by the DFG. # +# This file is part of cargo. # # # -# This file is part of GekkoFS. # -# # -# GekkoFS is free software: you can redistribute it and/or modify # +# cargo is free software: you can redistribute it and/or modify # # it under the terms of the GNU General Public License as published by # # the Free Software Foundation, either version 3 of the License, or # # (at your option) any later version. # # # -# GekkoFS is distributed in the hope that it will be useful, # +# cargo is distributed in the hope that it will be useful, # # but WITHOUT ANY WARRANTY; without even the implied warranty of # # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # # GNU General Public License for more details. # # # # You should have received a copy of the GNU General Public License # -# along with GekkoFS. If not, see . # +# along with cargo. If not, see . # # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ diff --git a/spack/packages/cargo/package.py b/spack/packages/cargo/package.py index ec8198ee6a60ec2f8325d2efee32a58fff5429c6..57976a8c6c10f353522360d31d628f7756eedc24 100644 --- a/spack/packages/cargo/package.py +++ b/spack/packages/cargo/package.py @@ -27,7 +27,7 @@ class Cargo(CMakePackage): """A parallel data stager for malleable applications.""" homepage = "https://storage.bsc.es/gitlab/hpc/cargo" - url = "https://storage.bsc.es/gitlab/hpc/cargo/-/archive/v0.3.5/cargo-v0.3.5.tar.bz2" + url = "https://storage.bsc.es/gitlab/hpc/cargo/-/archive/v0.3.6/cargo-v0.3.6.tar.bz2" git = "https://storage.bsc.es/gitlab/hpc/cargo.git" maintainers("alberto-miranda") @@ -40,8 +40,8 @@ class Cargo(CMakePackage): version("0.3.2", sha256="ceb6bcb738a35fb41f40b7b1cdd8a806d99995a227980e8ced61dd90418e5960", deprecated=True) version("0.3.3", sha256="1c4ab215e41905cc359894fa1df9006be16730ddc37c5b1369a9ea759bcb61cd", deprecated=True) version("0.3.4", sha256="42b740fb7e82c49d73dfb6caf7549876f72913afb75996c6558e956ea63de3da", deprecated=True) - version("0.3.5", sha256="5c2e998aa96b15bdf513e8c2fce5f20859cf9a6a51882c59b80d5d801a10edd8") - + version("0.3.5", sha256="5c2e998aa96b15bdf513e8c2fce5f20859cf9a6a51882c59b80d5d801a10edd8", deprecated=True) + version("0.3.6") # build variants variant('build_type', default='Release', diff --git a/src/master.cpp b/src/master.cpp index ae46240b7eedd25ebb74ad17cb6e319cfee66c94..47539369b71e2eb2313088016bc4836f9ef68152 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -66,7 +66,7 @@ make_message(std::uint64_t tid, std::uint32_t seqno, } return std::make_tuple( - static_cast(cargo::tag::seq_mixed), + static_cast(cargo::tag::sequential), cargo::transfer_message{tid, seqno, input.path(), static_cast(input.get_type()), output.path(), @@ -88,7 +88,12 @@ master_server::master_server(std::string name, std::string address, provider(m_network_engine, 0), m_mpi_listener_ess(thallium::xstream::create()), m_mpi_listener_ult(m_mpi_listener_ess->make_thread( - [this]() { mpi_listener_ult(); })) { + [this]() { mpi_listener_ult(); })), + m_ftio_listener_ess(thallium::xstream::create()), + m_ftio_listener_ult(m_ftio_listener_ess->make_thread( + [this]() { ftio_scheduling_ult(); })) + +{ #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); @@ -97,6 +102,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(transfer_status)); provider::define(EXPAND(bw_control)); provider::define(EXPAND(transfer_statuses)); + provider::define(EXPAND(ftio_int)); #undef EXPAND @@ -110,6 +116,10 @@ master_server::master_server(std::string name, std::string address, m_mpi_listener_ult = thallium::managed{}; m_mpi_listener_ess->join(); m_mpi_listener_ess = thallium::managed{}; + m_ftio_listener_ult->join(); + m_ftio_listener_ult = thallium::managed{}; + m_ftio_listener_ess->join(); + m_ftio_listener_ess = thallium::managed{}; }); } @@ -125,7 +135,7 @@ master_server::mpi_listener_ult() { if(!msg) { std::this_thread::sleep_for(10ms); - //thallium::thread::self().sleep(m_network_engine, 10); + // thallium::thread::self().sleep(m_network_engine, 10); continue; } @@ -163,6 +173,94 @@ master_server::mpi_listener_ult() { LOGGER_INFO("Exit"); } + +void +master_server::ftio_scheduling_ult() { + + while(!m_shutting_down) { + + if(!m_pending_transfer.m_work or !m_ftio_run) { + std::this_thread::sleep_for(1000ms); + } + // if(!m_pending_transfer.m_work or m_period < 0.0f) { + // std::this_thread::sleep_for(1000ms); + // } + + + // Do something with the confidence and probability + + // if(m_ftio_run) { + // m_ftio_run = false; + // LOGGER_INFO("Confidence is {}, probability is {} and + // period is {}", + // m_confidence, m_probability, m_period); + // } + + if(!m_pending_transfer.m_work) + continue; + if(m_period > 0) { + LOGGER_INFO("Waiting period : {}", m_period); + } else { + LOGGER_INFO("Waiting for run trigger ..."); + } + // Wait in small periods, just in case we change it, This should be + // mutexed... + auto elapsed = m_period; + while(elapsed > 0) { + std::this_thread::sleep_for(std::chrono::seconds((int) (1))); + elapsed -= 1; + // reset elapsed value when new RPC comes in + if(m_ftio_run) { + elapsed = m_period; + m_ftio_run = false; + } + } + if(!m_ftio_run) { + continue; + } + + LOGGER_INFO("Checking if there is work to do in {}", + m_pending_transfer.m_sources); + transfer_dataset_internal(m_pending_transfer); + // This launches the workers to do the work... + // We wait until this transfer is finished + LOGGER_INFO("Transferring : {}", m_pending_transfer.m_expanded_sources); + bool finished = false; + while(!finished) { + std::this_thread::sleep_for(10ms); + m_request_manager.lookup(m_pending_transfer.m_p.tid()) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Failed to lookup request: {}", ec); + }) + .map([&](auto&& rs) { + if(rs.state() == transfer_state::completed) { + finished = true; + } + }); + } + + if(finished) { + // Delete all source files + LOGGER_INFO("Transfer finished for {}", + m_pending_transfer.m_expanded_sources); + auto fs = FSPlugin::make_fs(cargo::FSPlugin::type::gekkofs); + for(auto& file : m_pending_transfer.m_expanded_sources) { + LOGGER_INFO("Deleting {}", file.path()); + // We need to use gekkofs to delete + fs->unlink(file.path()); + } + } + if(m_period > 0) { + // always run whenever period is set + m_ftio_run = true; + } else { + m_ftio_run = false; + } + } + + LOGGER_INFO("Shutting down."); +} + #define RPC_NAME() (__FUNCTION__) void @@ -220,6 +318,120 @@ master_server::shutdown(const network::request& req) { server::shutdown(); } +// Function that gets a pending_request, fills the request and sends the mpi +// message for the transfer We only put files that has mtime < actual +// timestamp , intended for stage-out and ftio +void +master_server::transfer_dataset_internal(pending_transfer& pt) { + + mpi::communicator world; + std::vector v_s_new; + std::vector v_d_new; + time_t now = time(0); + now = now - 5; // Threshold for mtime + for(auto i = 0u; i < pt.m_sources.size(); ++i) { + + const auto& s = pt.m_sources[i]; + const auto& d = pt.m_targets[i]; + + // We need to expand directories to single files on the s + // Then create a new message for each file and append the + // file to the d prefix + // We will asume that the path is the original absolute + // The prefix selects the method of transfer + // And if not specified then we will use none + // i.e. ("xxxx:/xyyy/bbb -> gekko:/cccc/ttt ) then + // bbb/xxx -> ttt/xxx + const auto& p = s.path(); + + std::vector files; + // Check stat of p using FSPlugin class + auto fs = FSPlugin::make_fs( + static_cast(s.get_type())); + struct stat buf; + fs->stat(p, &buf); + if(buf.st_mode & S_IFDIR) { + LOGGER_INFO("Expanding input directory {}", p); + files = fs->readdir(p); + + /* + We have all the files expanded. Now create a new + cargo::dataset for each file as s and a new + cargo::dataset appending the base directory in d to the + file name. + */ + for(const auto& f : files) { + cargo::dataset s_new(s); + cargo::dataset d_new(d); + s_new.path(f); + // We need to get filename from the original root + // path (d.path) plus the path from f, removing the + // initial path p (taking care of the trailing /) + auto leading = p.size(); + if(leading > 0 and p.back() == '/') { + leading--; + } + + d_new.path(d.path() / + std::filesystem::path(f.substr(leading + 1))); + + LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), + d_new.path()); + fs->stat(s_new.path(), &buf); + if(buf.st_mtime < now) { + v_s_new.push_back(s_new); + v_d_new.push_back(d_new); + } + } + } else { + fs->stat(s.path(), &buf); + if(buf.st_mtime < now) { + v_s_new.push_back(s); + v_d_new.push_back(d); + } + } + } + + // empty m_expanded_sources + pt.m_expanded_sources.assign(v_s_new.begin(), v_s_new.end()); + pt.m_expanded_targets.assign(v_d_new.begin(), v_d_new.end()); + + // We have two vectors, so we process the transfer + // [1] Update request_manager + // [2] Send message to worker + + auto ec = m_request_manager.update(pt.m_p.tid(), v_s_new.size(), + pt.m_p.nworkers()); + if(ec != error_code::success) { + LOGGER_ERROR("Failed to update request: {}", ec); + return; + }; + + assert(v_s_new.size() == v_d_new.size()); + + // For all the transfers + for(std::size_t i = 0; i < v_s_new.size(); ++i) { + const auto& s = v_s_new[i]; + const auto& d = v_d_new[i]; + + // Create the directory if it does not exist (only in + // parallel transfer) + if(!std::filesystem::path(d.path()).parent_path().empty() and + d.supports_parallel_transfer()) { + std::filesystem::create_directories( + std::filesystem::path(d.path()).parent_path()); + } + + + // Send message to worker + for(std::size_t rank = 1; rank <= pt.m_p.nworkers(); ++rank) { + const auto [t, m] = make_message(pt.m_p.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } + } +} + void master_server::transfer_datasets(const network::request& req, const std::vector& sources, @@ -236,8 +448,8 @@ master_server::transfer_datasets(const network::request& req, targets); - // As we accept directories expanding directories should be done before and - // update sources and targets. + // As we accept directories expanding directories should be done before + // and update sources and targets. std::vector v_s_new; std::vector v_d_new; @@ -258,15 +470,16 @@ master_server::transfer_datasets(const network::request& req, // bbb/xxx -> ttt/xxx const auto& p = s.path(); - std::vector files; - if(std::filesystem::is_directory(p)) { + std::vector files; + // Check stat of p using FSPlugin class + auto fs = FSPlugin::make_fs( + static_cast(s.get_type())); + struct stat buf; + fs->stat(p, &buf); + if(buf.st_mode & S_IFDIR) { LOGGER_INFO("Expanding input directory {}", p); - for(const auto& f : - std::filesystem::recursive_directory_iterator(p)) { - if(std::filesystem::is_regular_file(f)) { - files.push_back(f.path()); - } - } + files = fs->readdir(p); + /* We have all the files expanded. Now create a new @@ -286,8 +499,8 @@ master_server::transfer_datasets(const network::request& req, leading--; } - d_new.path(d.path() / std::filesystem::path( - f.string().substr(leading + 1))); + d_new.path(d.path() / + std::filesystem::path(f.substr(leading + 1))); LOGGER_DEBUG("Expanded file {} -> {}", s_new.path(), d_new.path()); @@ -309,8 +522,20 @@ master_server::transfer_datasets(const network::request& req, }) .map([&](auto&& r) { assert(v_s_new.size() == v_d_new.size()); - - // For all the files + if(m_ftio) { + if(sources[0].get_type() == cargo::dataset::type::gekkofs) { + + // We have only one pendingTransfer for FTIO + // that can be updated, the issue is that we + // need the tid. + m_pending_transfer.m_p = r; + m_pending_transfer.m_sources = sources; + m_pending_transfer.m_targets = targets; + m_pending_transfer.m_work = true; + LOGGER_INFO("Stored stage-out information"); + } + } + // For all the transfers for(std::size_t i = 0; i < v_s_new.size(); ++i) { const auto& s = v_s_new[i]; const auto& d = v_d_new[i]; @@ -325,10 +550,20 @@ master_server::transfer_datasets(const network::request& req, std::filesystem::path(d.path()).parent_path()); } - for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { - const auto [t, m] = make_message(r.tid(), i, s, d); - LOGGER_INFO("msg <= to: {} body: {}", rank, m); - world.send(static_cast(rank), t, m); + // If we are not using ftio start transfer if we are on + // stage-out + if(!m_ftio) { + // If we are on stage-out + + + for(std::size_t rank = 1; rank <= r.nworkers(); + ++rank) { + const auto [t, m] = make_message(r.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } + } else { + m_ftio_tid = r.tid(); } } LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, @@ -417,4 +652,50 @@ master_server::transfer_statuses(const network::request& req, }); } + +void +master_server::ftio_int(const network::request& req, float conf, float prob, + float period, bool run, bool pause, bool resume) { + using network::get_address; + using network::rpc_info; + using proto::generic_response; + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + if(pause) { + // send shaping info + for(int rank = 1; rank < world.size(); ++rank) { + // Slowdown 1 second per block + const auto m = cargo::shaper_message{m_ftio_tid, +10}; + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), + static_cast(tag::bw_shaping), m); + } + } else if(resume) { + for(int rank = 1; rank < world.size(); ++rank) { + // Restart operation + const auto m = cargo::shaper_message{m_ftio_tid, -1}; + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), + static_cast(tag::bw_shaping), m); + } + } else { + m_confidence = conf; + m_probability = prob; + m_period = period; + m_ftio_run = run; + if(m_period > 0) + m_ftio_run = true; + m_ftio = true; + } + LOGGER_INFO( + "rpc {:>} body: {{confidence: {}, probability: {}, period: {}, run: {}, pause: {}, resume: {}}}", + rpc, conf, prob, period, run, pause, resume); + + const auto resp = generic_response{rpc.id(), error_code::success}; + + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + + req.respond(resp); +} + } // namespace cargo diff --git a/src/master.hpp b/src/master.hpp index f671e3a67f95d37ae07991f682e15744838e6996..612ae8aab232a75f46ce0ea9b5ac78a44f6b1ddf 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -28,9 +28,25 @@ #include "net/server.hpp" #include "cargo.hpp" #include "request_manager.hpp" +#include "parallel_request.hpp" namespace cargo { +class pending_transfer { +public: + pending_transfer():m_p(cargo::parallel_request(0,0,0)) { + m_work = false; + } + + bool m_work; + cargo::parallel_request m_p; + std::vector m_sources; + std::vector m_targets; + // Expanded sources and targets (those that are being processed by the worker) + std::vector m_expanded_sources; + std::vector m_expanded_targets; +}; + class master_server : public network::server, public network::provider { public: @@ -44,6 +60,9 @@ private: void mpi_listener_ult(); + void + ftio_scheduling_ult(); + void ping(const network::request& req); @@ -61,16 +80,42 @@ private: void transfer_statuses(const network::request& req, std::uint64_t tid); - // Receives a request to increase or decrease BW + // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void - bw_control(const network::request& req, std::uint64_t tid, std::int16_t shaping); + bw_control(const network::request& req, std::uint64_t tid, + std::int16_t shaping); + + + void + ftio_int(const network::request& req, float confidence, float probability, + float period, bool run, bool pause, bool resume); private: // Dedicated execution stream for the MPI listener ULT thallium::managed m_mpi_listener_ess; // ULT for the MPI listener thallium::managed m_mpi_listener_ult; + // Dedicated execution stream for the ftio scheduler + thallium::managed m_ftio_listener_ess; + // ULT for the ftio scheduler + thallium::managed m_ftio_listener_ult; + // FTIO decision values (below 0, implies not used) + float m_confidence = -1.0f; + float m_probability = -1.0f; + float m_period = -1.0f; + bool m_ftio_run = true; + // We store the tid of the ftio transfer to proper slow it down. + std::uint64_t m_ftio_tid = 0; + // FTIO enabled flag, we need to call ftio once. + bool m_ftio = false; + + + pending_transfer m_pending_transfer; + + + void + transfer_dataset_internal(pending_transfer& pt); // Request manager request_manager m_request_manager; }; diff --git a/src/posix_file/CMakeLists.txt b/src/posix_file/CMakeLists.txt index f539920fc695a74d3a073f2e39a8bb5aea4d6c19..68446ad8798e9e01b808d416ea02725fe6b440ff 100644 --- a/src/posix_file/CMakeLists.txt +++ b/src/posix_file/CMakeLists.txt @@ -32,9 +32,11 @@ if (Expand_FOUND) set(EXPAND_INCLUDES posix_file/fs_plugin/expand_plugin.hpp posix_file/fs_plugin/expand_plugin.cpp) endif() if (Hercules_FOUND) - set(HERCULES_INCLUDES posix_file/fs_plugin/hercules_plugin.chpp posix_file/fs_plugin/hercules_plugin.cpp) + set(HERCULES_INCLUDES posix_file/fs_plugin/hercules_plugin.hpp posix_file/fs_plugin/hercules_plugin.cpp) +endif() +if (DataClay_FOUND) + set(DATACLAY_INCLUDES posix_file/fs_plugin/dataclay_plugin.hpp posix_file/fs_plugin/dataclay_plugin.cpp) endif() - target_sources( posix_file @@ -52,6 +54,7 @@ target_sources( ${GEKKO_INCLUDES} ${HERCULES_INCLUDES} ${EXPAND_INCLUDES} + ${DATACLAY_INCLUDES} ) @@ -73,6 +76,9 @@ if (Hercules_FOUND) set(ADHOC ${ADHOC} Hercules::Hercules) endif() +if (DataClay_FOUND) + set(ADHOC ${ADHOC} DataClay::DataClay) +endif() target_link_libraries(posix_file INTERFACE fmt::fmt tl::expected PRIVATE ${ADHOC}) diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 03d6fbec7bee510d713b6aad78f55f4ea5735d56..0eb65d9195a0c5913f3858566ec81950e7985f03 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -155,15 +155,16 @@ class file { public: - file(cargo::FSPlugin::type t) { + explicit file(cargo::FSPlugin::type t) { m_fs_plugin = cargo::FSPlugin::make_fs(t); }; - explicit file(std::filesystem::path filepath) noexcept - : m_path(std::move(filepath)) {} + explicit file(std::filesystem::path filepath, + cargo::FSPlugin::type t) noexcept + : m_path(std::move(filepath)), m_fs_plugin(cargo::FSPlugin::make_fs(t)) {} file(std::filesystem::path filepath, int fd, - std::unique_ptr fs_plugin) noexcept + std::shared_ptr fs_plugin) noexcept : m_path(std::move(filepath)), m_handle(fd), m_fs_plugin(std::move(fs_plugin)) {} @@ -180,12 +181,12 @@ public: std::size_t size() const noexcept { - return std::filesystem::file_size(m_path); + return m_fs_plugin->size(m_path); } auto remove() noexcept { - return std::filesystem::remove(m_path); + return m_fs_plugin->unlink(m_path); } void @@ -196,13 +197,18 @@ public: } int ret = m_fs_plugin->fallocate(m_handle.native(), mode, offset, - static_cast(len)); + static_cast(len)); if(ret == -1) { throw io_error("posix_file::file::fallocate", errno); } } + void + close() noexcept { + m_fs_plugin->close(m_handle.native()); + } + template std::size_t pread(MemoryBuffer&& buf, offset offset, std::size_t size) const { @@ -284,7 +290,7 @@ public: protected: const std::filesystem::path m_path; file_handle m_handle; - std::unique_ptr m_fs_plugin; + std::shared_ptr m_fs_plugin; }; @@ -292,7 +298,7 @@ static inline file open(const std::filesystem::path& filepath, int flags, ::mode_t mode, cargo::FSPlugin::type t) { - std::unique_ptr fs_plugin; + std::shared_ptr fs_plugin; fs_plugin = cargo::FSPlugin::make_fs(t); // We don't check if it exists, we just create it if flags is set to O_CREAT @@ -300,9 +306,9 @@ open(const std::filesystem::path& filepath, int flags, ::mode_t mode, if(flags & O_CREAT) { fs_plugin->mkdir(filepath.parent_path().c_str(), 0755); } - + int fd = fs_plugin->open(filepath.c_str(), flags, mode); - + if(fd == -1) { throw io_error("posix_file::open ", errno); } diff --git a/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7181be21dc06027d82ca00aa1bd6ffd4c7c87ef3 --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.cpp @@ -0,0 +1,98 @@ + +#include "fs_plugin.hpp" +#include "dataclay_plugin.hpp" +extern "C" { +#include +} +#include +namespace cargo { +dataclay_plugin::dataclay_plugin() { + ::dataclay_plugin("cargo", DataClay_PATH); + std::cout << "dataclay_plugin loaded" << std::endl; +} + +dataclay_plugin::~dataclay_plugin() {} +// Override the open function +int +dataclay_plugin::open(const std::string& path, int flags, unsigned int mode) { + return dataclay_open((char *)path.c_str(), flags, mode); +} + +// Override the pread function +ssize_t +dataclay_plugin::pread(int fd, void* buf, size_t count, off_t offset) { + return dataclay_pread(fd, (char*) buf, count, offset); +} + +// Override the pwrite function +ssize_t +dataclay_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { + return dataclay_pwrite(fd, (char*) buf, count, offset); +} + + +bool +dataclay_plugin::mkdir(const std::string& path, mode_t mode) { + (void) path; + (void) mode; + return true; // We don't have directories +} + +bool +dataclay_plugin::close(int fd) { + dataclay_close(fd); + return true; +} + +off_t +dataclay_plugin::lseek(int fd, off_t offset, int whence) { + (void) fd; + (void) offset; + (void) whence; + std::cerr << "dataclay_plugin lseek not supported" << std::endl; + return 0; +} + +off_t +dataclay_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { + (void) fd; + (void) mode; + (void) offset; + (void) len; + return len; +} + +int +dataclay_plugin::unlink(const std::string& path) { + + (void) path; + std::cerr << "dataclay_plugin unlink not supported" << std::endl; + return 0; +} + + +std::vector +dataclay_plugin::readdir(const std::string& path) { + (void) path; + std::cerr << "dataclay_plugin readdir not supported" << std::endl; + return {}; +} + +// stat +int +dataclay_plugin::stat(const std::string& path, struct stat* buf) { + (void) path; + (void) buf; + std::cerr << "dataclay_plugin stat not supported" << std::endl; + return 0; +} + +ssize_t +dataclay_plugin::size(const std::string& path) { + (void) path; + std::cerr << "dataclay_plugin size not supported" << std::endl; + return 0; +} + + +} // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp new file mode 100644 index 0000000000000000000000000000000000000000..2d18de5f5d65bc15df3009e5453c5fb91b133e7e --- /dev/null +++ b/src/posix_file/posix_file/fs_plugin/dataclay_plugin.hpp @@ -0,0 +1,39 @@ + +#ifndef DATACLAY_PLUGIN_HPP +#define DATACLAY_PLUGIN_HPP + +#include "fs_plugin.hpp" + +namespace cargo { +class dataclay_plugin : public FSPlugin { + +public: + dataclay_plugin(); + ~dataclay_plugin(); + int + open(const std::string& path, int flags, unsigned int mode) final; + bool + close(int fd) final; + ssize_t + pread(int fd, void* buf, size_t count, off_t offset) final; + ssize_t + pwrite(int fd, const void* buf, size_t count, off_t offset) final; + bool + mkdir(const std::string& path, mode_t mode) final; + off_t + lseek(int fd, off_t offset, int whence) final; + // Fallocate is not needed in dataclay as pwrite takes care of it. + off_t + fallocate(int fd, int mode, off_t offset, off_t len) final; + std::vector + readdir(const std::string& path) final; + int + unlink(const std::string& path) final; + int + stat(const std::string& path, struct stat* buf) final; + ssize_t + size(const std::string& path) final; +}; +}; // namespace cargo + +#endif // dataclay_PLUGIN_HPP diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp index 7664aff90f1a4a2c3d58d3487ade80610992b86f..f69d5cdbfae321b3aa5c3aeb0c50ee0d8cbc347b 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.cpp @@ -9,18 +9,37 @@ #ifdef EXPAND_PLUGIN #include "expand_plugin.hpp" #endif - +#ifdef DATACLAY_PLUGIN +#include "dataclay_plugin.hpp" +#endif namespace cargo { -std::unique_ptr +static std::shared_ptr m_fs_posix; +static std::shared_ptr m_fs_gekkofs; +static std::shared_ptr m_fs_dataclay; + +std::shared_ptr FSPlugin::make_fs(type t) { switch(t) { case type::posix: - return std::make_unique(); + case type::parallel: + if(m_fs_posix == nullptr) + m_fs_posix = std::make_shared(); + return m_fs_posix; #ifdef GEKKOFS_PLUGIN case type::gekkofs: - return std::make_unique(); + if(m_fs_gekkofs == nullptr) + m_fs_gekkofs = std::make_shared(); + return m_fs_gekkofs; + +#endif +#ifdef DATACLAY_PLUGIN + case type::dataclay: + if(m_fs_dataclay == nullptr) + m_fs_dataclay = std::make_shared(); + return m_fs_dataclay; + #endif #ifdef HERCULES_PLUGIN case type::hercules: diff --git a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp index c98e74a5ab75aab6ea2c713d774dde49c91cfad9..495b10fcb2c35003bc19f2fff0fe56b5e86e4845 100644 --- a/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/fs_plugin.hpp @@ -2,6 +2,7 @@ #define FS_PLUGIN_HPP #include +#include #include #include #include @@ -18,7 +19,9 @@ public: expand, dataclay }; - static std::unique_ptr make_fs(type); + + static std::shared_ptr make_fs(type); + // One instance per fs type virtual ~FSPlugin() = default; @@ -36,6 +39,14 @@ public: lseek(int fd, off_t offset, int whence) = 0; virtual off_t fallocate(int fd, int mode, off_t offset, off_t len) = 0; + virtual std::vector + readdir(const std::string& path) = 0; + virtual int + unlink(const std::string& path) = 0; + virtual int + stat(const std::string& path, struct stat* buf) = 0; + virtual ssize_t + size(const std::string& path) = 0; }; } // namespace cargo #endif // FS_PLUGIN_HPP \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp index 1ff0caa70c449c7f3abb18bb85de8b9f354a30a6..aff3eacfe00782ff02638b3450bf79774afb72b1 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.cpp @@ -7,14 +7,14 @@ namespace cargo { gekko_plugin::gekko_plugin() { int result = gkfs_init(); - if (result != 0) { + if(result != 0) { std::cerr << "Failed to initialize gekkofs" << std::endl; } } gekko_plugin::~gekko_plugin() { int result = gkfs_end(); - if (result != 0) { + if(result != 0) { std::cerr << "Failed to finalize gekkofs" << std::endl; } } @@ -28,13 +28,13 @@ gekko_plugin::open(const std::string& path, int flags, unsigned int mode) { // Override the pread function ssize_t gekko_plugin::pread(int fd, void* buf, size_t count, off_t offset) { - return gkfs::syscall::gkfs_pread_ws(fd, buf, count, offset); + return gkfs::syscall::gkfs_pread(fd, buf, count, offset); } // Override the pwrite function ssize_t gekko_plugin::pwrite(int fd, const void* buf, size_t count, off_t offset) { - return gkfs::syscall::gkfs_pwrite_ws(fd, buf, count, offset); + return gkfs::syscall::gkfs_pwrite(fd, buf, count, offset); } @@ -62,4 +62,57 @@ gekko_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { (void) len; return len; } + +int +gekko_plugin::unlink(const std::string& path) { + return gkfs::syscall::gkfs_remove(path); +} + + +std::vector +gekko_plugin::readdir(const std::string& path) { + // Fill recursively the files, checking if the file is a directory + std::vector files; + std::vector final_list; + files = gkfs::syscall::gkfs_get_file_list(path); + + for(auto& file : files) { + + struct stat buf; + stat("/" + file, &buf); + if(S_ISDIR(buf.st_mode)) { + + std::vector subfiles = readdir("/" + file); + final_list.insert(final_list.end(), subfiles.begin(), + subfiles.end()); + } else { + + if(path.size() != 1) { + final_list.push_back(path + "/" + file); + } else { + final_list.push_back("/" + file); + } + } + } + + return final_list; +} + +// stat +int +gekko_plugin::stat(const std::string& path, struct stat* buf) { + return gkfs::syscall::gkfs_stat(path, buf); +} + +ssize_t +gekko_plugin::size(const std::string& path) { + struct stat buf; + int res = gekko_plugin::stat(path, &buf); + if(res != 0) { + return -1; + } + return buf.st_size; +} + + } // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp index 2d27132f8dab81060f5582d89a4fee1d09eb7db2..fd4e5c26b4da87d0b4739aa39f25f5c33afa3bc2 100644 --- a/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/gekko_plugin.hpp @@ -25,6 +25,14 @@ public: // Fallocate is not needed in GekkoFS as pwrite takes care of it. off_t fallocate(int fd, int mode, off_t offset, off_t len) final; + std::vector + readdir(const std::string& path) final; + int + unlink(const std::string& path) final; + int + stat(const std::string& path, struct stat* buf) final; + ssize_t + size(const std::string& path) final; }; }; // namespace cargo diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp index 4d0f13481c94b2b0cd1c597e2ef461489e6233f9..f0ec739aedc8e98f8c2b25bdd4fd01e29b10f0d0 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.cpp @@ -6,8 +6,7 @@ namespace cargo { -posix_plugin::posix_plugin() { -} +posix_plugin::posix_plugin() {} int @@ -45,4 +44,31 @@ posix_plugin::fallocate(int fd, int mode, off_t offset, off_t len) { (void) mode; return ::posix_fallocate(fd, offset, static_cast(len)); } + +std::vector +posix_plugin::readdir(const std::string& path) { + std::vector files; + for(const auto& f : std::filesystem::recursive_directory_iterator(path)) { + if(std::filesystem::is_regular_file(f)) { + files.push_back(f.path()); + } + } + return files; +} + + +int +posix_plugin::unlink(const std::string& path) { + return ::unlink(path.c_str()); +} +int +posix_plugin::stat(const std::string& path, struct stat* buf) { + return ::stat(path.c_str(), buf); +} + +ssize_t +posix_plugin::size(const std::string& path) { + return std::filesystem::file_size(path); +} + }; // namespace cargo \ No newline at end of file diff --git a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp index 011028443b10b961597eaa1c6897c628fc09d0e7..c57048877445600de8723aa7cacbe5b8e059019e 100644 --- a/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp +++ b/src/posix_file/posix_file/fs_plugin/posix_plugin.hpp @@ -24,6 +24,15 @@ public: lseek(int fd, off_t offset, int whence) final; off_t fallocate(int fd, int mode, off_t offset, off_t len) final; + std::vector + readdir(const std::string& path) final; + int + unlink(const std::string& path) final; + int + stat(const std::string& path, struct stat* buf) final; + + ssize_t + size(const std::string& path) final; }; } // namespace cargo #endif // POSIX_PLUGIN_HPP diff --git a/src/request_manager.cpp b/src/request_manager.cpp index 567446d4cab3aa898a29a4063a215f51857089ff..4d697dfc3341d50f08e24743c2e0a931f0cc753b 100644 --- a/src/request_manager.cpp +++ b/src/request_manager.cpp @@ -55,9 +55,30 @@ request_manager::create(std::size_t nfiles, std::size_t nworkers) { return parallel_request{tid, nfiles, nworkers}; } +/** + * @brief Update the request for ftio processing (as it is modified by readdir) + * + * @param request + * @param nfiles + * @param nworkers + * @return error_code + */ +error_code +request_manager::update(std::uint64_t tid, std::size_t nfiles, + std::size_t nworkers) { + abt::unique_lock lock(m_mutex); + m_requests[tid] = std::vector{nfiles, + std::vector{nworkers}}; + + return error_code::success; + + +} + + error_code -request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, - transfer_state s, float bw, +request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, + std::string name, transfer_state s, float bw, std::optional ec) { abt::unique_lock lock(m_mutex); @@ -92,7 +113,7 @@ request_manager::lookup(std::uint64_t tid) { return request_status{ps}; } } - // TODO : completed should have the name of the file if its not found + // TODO : completed should have the name of the file if its not found return request_status{"", transfer_state::completed, 0.0f}; } @@ -121,7 +142,7 @@ request_manager::lookup_all(std::uint64_t tid) { // not finished rs = request_status{ps}; } - rs.bw(bw/(double)fs.size()); + rs.bw(bw / (double) fs.size()); result.push_back(rs); } return result; diff --git a/src/request_manager.hpp b/src/request_manager.hpp index 46beddad3a703ec4c98f4f1ff668ea4a58848996..c813d45bfd2f5d053980dbe84c5be6c5ddd2ac02 100644 --- a/src/request_manager.hpp +++ b/src/request_manager.hpp @@ -58,6 +58,9 @@ public: tl::expected create(std::size_t nfiles, std::size_t nworkers); + error_code + update(std::uint64_t tid, std::size_t nfiles, std::size_t nworkers); + error_code update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, std::string name, transfer_state s, float bw, diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp index 92f0a05955f997f3bae9cf8c56bb01054695cd51..526ef1b7d673afffc56fb58a0d8f2830641c7bdd 100644 --- a/src/worker/mpio_read.cpp +++ b/src/worker/mpio_read.cpp @@ -47,6 +47,7 @@ mpio_read::operator()() { using posix_file::views::strided; m_status = error_code::transfer_in_progress; try { + const auto input_file = mpioxx::file::open( m_workers, m_input_path, mpioxx::file_open_mode::rdonly); @@ -167,9 +168,12 @@ mpio_read::progress(int ongoing_index) { using posix_file::views::strided; try { int index = 0; + // TODO : FS not defined... + m_status = error_code::transfer_in_progress; for(const auto& file_range : - all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + all_of(posix_file::file{m_input_path, m_fs_i_type}) | + as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { ++index; @@ -179,13 +183,25 @@ mpio_read::progress(int ongoing_index) { return index; } } + // LOG indexes and sizes assert(m_buffer_regions[index].size() >= file_range.size()); + auto start = std::chrono::steady_clock::now(); m_output_file->pwrite(m_buffer_regions[index], file_range.offset(), file_range.size()); // Do sleep - std::this_thread::sleep_for(sleep_value()); + auto total_sleep = sleep_value(); + auto small_sleep = total_sleep / 100; + if(small_sleep == std::chrono::milliseconds(0)) + small_sleep = std::chrono::milliseconds(1); + while(total_sleep > std::chrono::milliseconds(0)) { + std::this_thread::sleep_for(small_sleep); + total_sleep -= small_sleep; + if(total_sleep > sleep_value()) { + break; + } + } auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = @@ -221,6 +237,7 @@ mpio_read::progress(int ongoing_index) { } m_status = error_code::success; + m_output_file->close(); return -1; } diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp index 44b8070417f23d68f30ef71f90428eee0eaa6692..ab401d018ebbb914bae07e09856c0f12fec6b207 100644 --- a/src/worker/mpio_write.cpp +++ b/src/worker/mpio_write.cpp @@ -41,7 +41,11 @@ mpio_write::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; - std::size_t file_size = std::filesystem::file_size(m_input_path); + // We need to open the file and ask size (using fs_plugin) + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + + std::size_t file_size = m_input_file->size(); // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); @@ -68,8 +72,6 @@ mpio_write::operator()() { block_size); } - m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); m_workers_size = workers_size; m_workers_rank = workers_rank; @@ -142,8 +144,18 @@ mpio_write::progress(int ongoing_index) { m_bytes_per_rank += n; - // Do sleep - std::this_thread::sleep_for(sleep_value()); + // Do sleep (But be a bit reactive...) + auto total_sleep = sleep_value(); + auto small_sleep = total_sleep / 100; + if (small_sleep == std::chrono::milliseconds(0)) small_sleep = std::chrono::milliseconds(1); + while( total_sleep > std::chrono::milliseconds(0)) { + std::this_thread::sleep_for(small_sleep); + total_sleep -= small_sleep; + if (total_sleep > sleep_value()) { + break; + } + } + auto end = std::chrono::steady_clock::now(); // Send transfer bw double elapsed_seconds = diff --git a/src/worker/seq_mixed.cpp b/src/worker/seq_mixed.cpp index b0871223c5bbd99cef45321813a74f2ba734494f..aca9e4c7bebb375f1890b69fab0ea350f03012a1 100644 --- a/src/worker/seq_mixed.cpp +++ b/src/worker/seq_mixed.cpp @@ -39,7 +39,9 @@ seq_mixed_operation::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; - std::size_t file_size = std::filesystem::file_size(m_input_path); + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + std::size_t file_size = m_input_file->size(); // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); @@ -66,8 +68,6 @@ seq_mixed_operation::operator()() { block_size); } - m_input_file = std::make_unique( - posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index b91d865a902e4099b319e0914f6beaf84cc73ee2..dbd7ae3d6c1b350e2f6c376ac4557979f8e2c5c8 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -39,7 +39,10 @@ seq_operation::operator()() { const auto workers_size = m_workers.size(); const auto workers_rank = m_workers.rank(); std::size_t block_size = m_kb_size * 1024u; - std::size_t file_size = std::filesystem::file_size(m_input_path); + m_input_file = std::make_unique( + posix_file::open(m_input_path, O_RDONLY, 0, m_fs_i_type)); + std::size_t file_size = m_input_file->size(); + // compute the number of blocks in the file int total_blocks = static_cast(file_size / block_size); @@ -176,7 +179,7 @@ seq_operation::progress(int ongoing_index) { // We need to create the directory if it does not exists (using // FSPlugin) - if ( write and ongoing_index == 0 ) { + if(write and ongoing_index == 0) { m_output_file = std::make_unique(posix_file::create( m_output_path, O_WRONLY, S_IRUSR | S_IWUSR, m_fs_o_type)); @@ -187,7 +190,7 @@ seq_operation::progress(int ongoing_index) { int index = 0; m_status = error_code::transfer_in_progress; for(const auto& file_range : - all_of(posix_file::file{m_input_path}) | as_blocks(m_block_size) | + all_of(posix_file::file{m_input_path, m_fs_i_type}) | as_blocks(m_block_size) | strided(m_workers_size, m_workers_rank)) { if(index < ongoing_index) { ++index; diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index 0178b961c5206a60e77a30d3fa0fdd5a78192c53..8a6252732513a5badc9c144716c35838dbb0e68f 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -52,8 +52,8 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group, } void -update_state(int rank, std::uint64_t tid, std::uint32_t seqno, - std::string name, cargo::transfer_state st, float bw, +update_state(int rank, std::uint64_t tid, std::uint32_t seqno, std::string name, + cargo::transfer_state st, float bw, std::optional ec = std::nullopt) { mpi::communicator world; @@ -115,11 +115,30 @@ worker::run() { auto op = I->second.first.get(); int index = I->second.second; if(op) { + if(index == -1) { + // operation not started + // Print error message + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::running, + -1.0f); + cargo::error_code ec = (*op)(); + if(ec != cargo::error_code::transfer_in_progress) { + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::failed, + -1.0f, ec); + I = m_ops.erase(I); + break; + } + + index = 0; + } + // Operation in progress index = op->progress(index); if(index == -1) { // operation finished cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), ec ? transfer_state::failed : transfer_state::completed, 0.0f, ec); @@ -129,8 +148,9 @@ worker::run() { } else { // update only if BW is set if(op->bw() > 0.0f) { - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), - transfer_state::running, op->bw()); + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::running, + op->bw()); } I->second.second = index; ++I; @@ -166,7 +186,7 @@ worker::run() { t, workers, m.input_path(), m.output_path(), m_block_size, m.i_type(), m.o_type()), - 0))); + -1))); const auto op = m_ops[make_pair(m.input_path(), m.output_path())] @@ -174,16 +194,8 @@ worker::run() { op->set_comm(msg->source(), m.tid(), m.seqno(), t); - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), - transfer_state::running, -1.0f); - // Different scenarios read -> write | write -> read - - cargo::error_code ec = (*op)(); - if(ec != cargo::error_code::transfer_in_progress) { - update_state(op->source(), op->tid(), op->seqno(), op->output_path(), - transfer_state::failed, -1.0f, ec); - m_ops.erase(make_pair(m.input_path(), m.output_path())); - } + update_state(op->source(), op->tid(), op->seqno(), + op->output_path(), transfer_state::pending, -1.0f); break; } @@ -199,8 +211,6 @@ worker::run() { LOGGER_INFO("Operation non existent", msg->source(), m); } } - - break; } diff --git a/systemd/cargo@.service.in b/systemd/cargo@.service.in index 6e1300670b462639b50bb24bce3ddaa01277e2fd..425616bd7e6d7f903afd69e3deb11569dc84b40e 100644 --- a/systemd/cargo@.service.in +++ b/systemd/cargo@.service.in @@ -3,9 +3,9 @@ Description=Cargo parallel data stager [Service] Type=simple -EnvironmentFile=%S/cargo/%I.cfg +EnvironmentFile=%h/.config/cargo/%I.cfg ExecStart=@CMAKE_INSTALL_FULL_BINDIR@/cargoctl start -s ${CARGO_ADDRESS} -H ${CARGO_HOSTS} -n ${CARGO_NUM_NODES} ExecStop=@CMAKE_INSTALL_FULL_BINDIR@/cargoctl stop -s ${CARGO_ADDRESS} Restart=no -PrivateTmp=true +PrivateTmp=false NoNewPrivileges=true diff --git a/tests/posix_file_tests.cpp b/tests/posix_file_tests.cpp index 55265a12cbe12c86995eb9254f764a98863f759f..ee8331356ee435ba48dbe14eaa1cf6a83bea3945 100644 --- a/tests/posix_file_tests.cpp +++ b/tests/posix_file_tests.cpp @@ -48,8 +48,8 @@ struct StringMaker { struct scoped_file : public posix_file::file { - explicit scoped_file(std::filesystem::path filepath) - : posix_file::file(std::move(filepath)) {} + explicit scoped_file(std::filesystem::path filepath, cargo::FSPlugin::type type) + : posix_file::file(std::move(filepath), type) {} ~scoped_file() { remove(); @@ -72,7 +72,7 @@ create_temporary_file(std::size_t desired_size) { abort(); } - return scoped_file{std::filesystem::path{name}}; + return scoped_file{std::filesystem::path{name}, cargo::FSPlugin::type::posix}; } std::vector