Loading CMakeLists.txt +3 −0 Original line number Diff line number Diff line Loading @@ -263,6 +263,9 @@ if (CARGO_BUILD_TESTS) enable_testing() ### boost::iostreams are required for some tests find_package(Boost 1.53 REQUIRED COMPONENTS iostreams) ### catch2: required for unit testing message(STATUS "[${PROJECT_NAME}] Downloading and building Catch2") FetchContent_Declare( Loading tests/CMakeLists.txt +14 −0 Original line number Diff line number Diff line Loading @@ -21,3 +21,17 @@ # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ include(Catch) add_executable(tests) target_sources(tests PRIVATE tests.cpp common.hpp common.cpp) target_link_libraries( tests PUBLIC Catch2::Catch2 Boost::iostreams fmt::fmt cargo ) catch_discover_tests( tests EXTRA_ARGS "-S ${CARGO_TRANSPORT_PROTOCOL}://${CARGO_BIND_ADDRESS}:${CARGO_BIND_PORT}" ) tests/common.cpp 0 → 100644 +13 −0 Original line number Diff line number Diff line #include <fmt/format.h> #include "common.hpp" std::vector<cargo::dataset> prepare_datasets(const std::string& pattern, size_t n) { std::vector<cargo::dataset> datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { datasets.emplace_back(fmt::format(fmt::runtime(pattern), i)); } return datasets; } tests/common.hpp 0 → 100644 +93 −0 Original line number Diff line number Diff line #ifndef CARGO_TESTS_COMMON_HPP #define CARGO_TESTS_COMMON_HPP #include <vector> #include <cargo.hpp> #include <unistd.h> class file_handle { private: constexpr static const int init_value{-1}; ///< initial file descriptor int m_fd{init_value}; ///< file descriptor public: file_handle() = default; explicit file_handle(int fd) noexcept : m_fd(fd) {} file_handle(file_handle&& rhs) noexcept { this->m_fd = rhs.m_fd; rhs.m_fd = init_value; } file_handle(const file_handle& other) = delete; file_handle& operator=(file_handle&& rhs) noexcept { this->m_fd = rhs.m_fd; rhs.m_fd = init_value; return *this; } file_handle& operator=(const file_handle& other) = delete; explicit operator bool() const noexcept { return valid(); } bool operator!() const noexcept { return !valid(); } /** * @brief Checks for valid file descriptor value. * @return boolean if valid file descriptor */ [[nodiscard]] bool valid() const noexcept { return m_fd != init_value; } /** * @brief Retusn the file descriptor value used in this file handle * operation. * @return file descriptor value */ [[nodiscard]] int native() const noexcept { return m_fd; } /** * @brief Closes file descriptor and resets it to initial value * @return boolean if file descriptor was successfully closed */ bool close() noexcept { if(m_fd != init_value) { if(::close(m_fd) < 0) { return false; } } m_fd = init_value; return true; } /** * @brief Destructor implicitly closes the internal file descriptor. */ ~file_handle() { if(m_fd != init_value) { close(); } } }; std::vector<cargo::dataset> prepare_datasets(const std::string& pattern, size_t n); #endif // CARGO_TESTS_COMMON_HPP tests/tests.cpp 0 → 100644 +351 −0 Original line number Diff line number Diff line /****************************************************************************** * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). * * This file is part of Cargo. * * Cargo is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Cargo is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Cargo. If not, see <https://www.gnu.org/licenses/>. * * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ #include <fmt/format.h> #include <fmt/chrono.h> #include <thread> #include <chrono> #include <string_view> #include <fstream> #include <random> #include <filesystem> #include <cargo.hpp> #include <boost/iostreams/device/mapped_file.hpp> #include <catch2/catch_config.hpp> #include <catch2/catch_session.hpp> #include <catch2/catch_test_macros.hpp> #include <catch2/generators/catch_generators_range.hpp> #include "common.hpp" using namespace std::literals; using namespace std::chrono_literals; struct scoped_file { explicit scoped_file(std::filesystem::path filepath) : m_filepath(std::move(filepath)) {} scoped_file(scoped_file&& rhs) noexcept : m_filepath(std::move(rhs.m_filepath)) {} scoped_file(scoped_file& other) = delete; scoped_file& operator=(scoped_file&& rhs) noexcept { if(this != &rhs) { m_filepath = std::move(rhs.m_filepath); } return *this; }; scoped_file& operator=(scoped_file& other) = delete; ~scoped_file() { if(!m_filepath.empty()) { std::filesystem::remove(m_filepath); } } auto path() const { return m_filepath; } std::filesystem::path m_filepath; }; struct ascii_data_generator { static constexpr std::array<char, 26> letters = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'}; using result_type = char; explicit ascii_data_generator(std::size_t block_size) : m_block_size(block_size) {} char operator()() const noexcept { const auto rv = *m_it; if((++m_index % m_block_size) == 0) { if(++m_it == letters.end()) { m_it = letters.begin(); } } return rv; } mutable std::size_t m_index = 0; mutable decltype(letters)::const_iterator m_it = letters.begin(); std::size_t m_block_size; }; struct random_data_generator { using result_type = std::mt19937::result_type; explicit random_data_generator(std::size_t seed, std::size_t min, std::size_t max) : m_seed(seed), m_rng(seed), m_dist(min, max) {} auto operator()() const noexcept { return m_dist(m_rng); } constexpr auto seed() const noexcept { return m_seed; } std::size_t m_seed; std::random_device m_device; mutable std::mt19937 m_rng; mutable std::uniform_int_distribution<std::mt19937::result_type> m_dist; }; template <typename DataGenerator> scoped_file create_temporary_file(const std::filesystem::path& name, std::size_t desired_size, const DataGenerator& generator) { char tmpname[] = "posix_file_tests_XXXXXX"; file_handle fh{mkstemp(tmpname)}; if(!fh) { fmt::print(stderr, "mkstemp() error: {}\n", strerror(errno)); abort(); } if(ftruncate(fh.native(), static_cast<off_t>(desired_size)) != 0) { fmt::print(stderr, "ftruncate() error: {}\n", strerror(errno)); abort(); } using result_type = DataGenerator::result_type; assert(desired_size % sizeof(result_type) == 0); const std::size_t n = desired_size / sizeof(result_type); std::vector<result_type> data(n); std::generate(data.begin(), data.end(), std::ref(generator)); std::fstream f{tmpname, std::ios::out | std::ios::binary}; f.write(reinterpret_cast<char*>(data.data()), desired_size); f.close(); std::filesystem::rename(tmpname, name); REQUIRE(std::filesystem::exists(name)); REQUIRE(std::filesystem::file_size(name) == desired_size); return scoped_file{name}; } bool equal(const std::filesystem::path& filepath1, const std::filesystem::path& filepath2) { namespace io = boost::iostreams; io::mapped_file_source f1(filepath1); io::mapped_file_source f2(filepath2); const auto start_time = std::chrono::steady_clock::now(); bool rv = f1.size() == f2.size() && std::equal(f1.data(), f1.data() + f1.size(), f2.data()); const auto end_time = std::chrono::steady_clock::now(); const auto duration = std::chrono::duration<double, std::micro>(end_time - start_time); fmt::print(stderr, "::equal(\"{}\", \"{}\"): {}\n", filepath1.string(), filepath2.string(), duration); return rv; } std::tuple<std::string, std::string> split(const std::string& id) { constexpr auto delim = "://"sv; const auto n = id.find(delim); if(n == std::string::npos) { return {std::string{}, id}; } return {id.substr(0, n), id.substr(n + delim.length(), id.length())}; } uint32_t catch2_seed; std::string server_address; #define NDATASETS 10 SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { random_data_generator rng{catch2_seed, 0, std::numeric_limits<std::uint64_t>::max() - 1u}; GIVEN("Some input datasets from a PFS") { REQUIRE(!server_address.empty()); cargo::server server{server_address}; const auto sources = prepare_datasets("lustre://source-dataset-{}", NDATASETS); const auto targets = prepare_datasets("target-dataset-{}", NDATASETS); static std::vector<scoped_file> input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( create_temporary_file(filepath, 1000, rng)); } // ensure there are no dangling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { std::filesystem::remove(dataset.id()); }); WHEN("Transferring datasets to a POSIX storage system") { const auto tx = cargo::transfer_datasets(server, sources, targets); (void) tx; // give time for transfers to complete before removing input files // FIXME: replace with proper status checking for the transfer std::this_thread::sleep_for(1s); THEN("Output datasets are identical to input datasets") { std::vector<scoped_file> output_files; std::transform( targets.cbegin(), targets.cend(), std::back_inserter(output_files), [](auto&& target) { return scoped_file{target.id()}; }); for(std::size_t i = 0; i < input_files.size(); ++i) { REQUIRE(std::filesystem::exists(output_files[i].path())); REQUIRE(::equal(input_files[i].path(), output_files[i].path())); } } } } } SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { std::size_t file_size = 10000; // GENERATE(1000, 10000); [[maybe_unused]] ascii_data_generator ascii_gen{512}; [[maybe_unused]] random_data_generator rng{ catch2_seed, 0, std::numeric_limits<std::uint64_t>::max() - 1u}; GIVEN("Some input datasets from a POSIX storage system") { REQUIRE(!server_address.empty()); cargo::server server{server_address}; const auto sources = prepare_datasets("source-dataset-{}", NDATASETS); const auto targets = prepare_datasets("lustre://target-dataset-{}", NDATASETS); static std::vector<scoped_file> input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( create_temporary_file(d.id(), file_size, rng)); } // ensure there are no danling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { std::filesystem::remove(dataset.id()); }); WHEN("Transferring datasets to a PFS") { const auto tx = cargo::transfer_datasets(server, sources, targets); (void) tx; // give time for transfers to complete before removing input files // FIXME: replace with proper status checking for the transfer std::this_thread::sleep_for(1s); THEN("Output datasets are identical to input datasets") { std::vector<scoped_file> output_files; std::transform(targets.cbegin(), targets.cend(), std::back_inserter(output_files), [](auto&& target) { const auto& [prefix, filepath] = ::split(target.id()); return scoped_file{filepath}; }); for(std::size_t i = 0; i < input_files.size(); ++i) { const auto& [prefix, filepath] = ::split(targets[i].id()); REQUIRE(::equal(input_files[i].path(), filepath)); } } } } } int main(int argc, char* argv[]) { Catch::Session session; // There must be exactly one instance // Build a new parser on top of Catch2's using namespace Catch::Clara; auto cli = session.cli() // Get Catch2's command line parser | Opt(server_address, "server_address")["-S"]["--server-address"]("server address"); // Now pass the new composite back to Catch2, so it uses that session.cli(cli); // Let Catch2 (using Clara) parse the command line if(int returnCode = session.applyCommandLine(argc, argv); returnCode != 0) { return returnCode; } catch2_seed = session.config().rngSeed(); return session.run(); } Loading
CMakeLists.txt +3 −0 Original line number Diff line number Diff line Loading @@ -263,6 +263,9 @@ if (CARGO_BUILD_TESTS) enable_testing() ### boost::iostreams are required for some tests find_package(Boost 1.53 REQUIRED COMPONENTS iostreams) ### catch2: required for unit testing message(STATUS "[${PROJECT_NAME}] Downloading and building Catch2") FetchContent_Declare( Loading
tests/CMakeLists.txt +14 −0 Original line number Diff line number Diff line Loading @@ -21,3 +21,17 @@ # # # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ include(Catch) add_executable(tests) target_sources(tests PRIVATE tests.cpp common.hpp common.cpp) target_link_libraries( tests PUBLIC Catch2::Catch2 Boost::iostreams fmt::fmt cargo ) catch_discover_tests( tests EXTRA_ARGS "-S ${CARGO_TRANSPORT_PROTOCOL}://${CARGO_BIND_ADDRESS}:${CARGO_BIND_PORT}" )
tests/common.cpp 0 → 100644 +13 −0 Original line number Diff line number Diff line #include <fmt/format.h> #include "common.hpp" std::vector<cargo::dataset> prepare_datasets(const std::string& pattern, size_t n) { std::vector<cargo::dataset> datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { datasets.emplace_back(fmt::format(fmt::runtime(pattern), i)); } return datasets; }
tests/common.hpp 0 → 100644 +93 −0 Original line number Diff line number Diff line #ifndef CARGO_TESTS_COMMON_HPP #define CARGO_TESTS_COMMON_HPP #include <vector> #include <cargo.hpp> #include <unistd.h> class file_handle { private: constexpr static const int init_value{-1}; ///< initial file descriptor int m_fd{init_value}; ///< file descriptor public: file_handle() = default; explicit file_handle(int fd) noexcept : m_fd(fd) {} file_handle(file_handle&& rhs) noexcept { this->m_fd = rhs.m_fd; rhs.m_fd = init_value; } file_handle(const file_handle& other) = delete; file_handle& operator=(file_handle&& rhs) noexcept { this->m_fd = rhs.m_fd; rhs.m_fd = init_value; return *this; } file_handle& operator=(const file_handle& other) = delete; explicit operator bool() const noexcept { return valid(); } bool operator!() const noexcept { return !valid(); } /** * @brief Checks for valid file descriptor value. * @return boolean if valid file descriptor */ [[nodiscard]] bool valid() const noexcept { return m_fd != init_value; } /** * @brief Retusn the file descriptor value used in this file handle * operation. * @return file descriptor value */ [[nodiscard]] int native() const noexcept { return m_fd; } /** * @brief Closes file descriptor and resets it to initial value * @return boolean if file descriptor was successfully closed */ bool close() noexcept { if(m_fd != init_value) { if(::close(m_fd) < 0) { return false; } } m_fd = init_value; return true; } /** * @brief Destructor implicitly closes the internal file descriptor. */ ~file_handle() { if(m_fd != init_value) { close(); } } }; std::vector<cargo::dataset> prepare_datasets(const std::string& pattern, size_t n); #endif // CARGO_TESTS_COMMON_HPP
tests/tests.cpp 0 → 100644 +351 −0 Original line number Diff line number Diff line /****************************************************************************** * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). * * This file is part of Cargo. * * Cargo is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * Cargo is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Cargo. If not, see <https://www.gnu.org/licenses/>. * * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ #include <fmt/format.h> #include <fmt/chrono.h> #include <thread> #include <chrono> #include <string_view> #include <fstream> #include <random> #include <filesystem> #include <cargo.hpp> #include <boost/iostreams/device/mapped_file.hpp> #include <catch2/catch_config.hpp> #include <catch2/catch_session.hpp> #include <catch2/catch_test_macros.hpp> #include <catch2/generators/catch_generators_range.hpp> #include "common.hpp" using namespace std::literals; using namespace std::chrono_literals; struct scoped_file { explicit scoped_file(std::filesystem::path filepath) : m_filepath(std::move(filepath)) {} scoped_file(scoped_file&& rhs) noexcept : m_filepath(std::move(rhs.m_filepath)) {} scoped_file(scoped_file& other) = delete; scoped_file& operator=(scoped_file&& rhs) noexcept { if(this != &rhs) { m_filepath = std::move(rhs.m_filepath); } return *this; }; scoped_file& operator=(scoped_file& other) = delete; ~scoped_file() { if(!m_filepath.empty()) { std::filesystem::remove(m_filepath); } } auto path() const { return m_filepath; } std::filesystem::path m_filepath; }; struct ascii_data_generator { static constexpr std::array<char, 26> letters = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'}; using result_type = char; explicit ascii_data_generator(std::size_t block_size) : m_block_size(block_size) {} char operator()() const noexcept { const auto rv = *m_it; if((++m_index % m_block_size) == 0) { if(++m_it == letters.end()) { m_it = letters.begin(); } } return rv; } mutable std::size_t m_index = 0; mutable decltype(letters)::const_iterator m_it = letters.begin(); std::size_t m_block_size; }; struct random_data_generator { using result_type = std::mt19937::result_type; explicit random_data_generator(std::size_t seed, std::size_t min, std::size_t max) : m_seed(seed), m_rng(seed), m_dist(min, max) {} auto operator()() const noexcept { return m_dist(m_rng); } constexpr auto seed() const noexcept { return m_seed; } std::size_t m_seed; std::random_device m_device; mutable std::mt19937 m_rng; mutable std::uniform_int_distribution<std::mt19937::result_type> m_dist; }; template <typename DataGenerator> scoped_file create_temporary_file(const std::filesystem::path& name, std::size_t desired_size, const DataGenerator& generator) { char tmpname[] = "posix_file_tests_XXXXXX"; file_handle fh{mkstemp(tmpname)}; if(!fh) { fmt::print(stderr, "mkstemp() error: {}\n", strerror(errno)); abort(); } if(ftruncate(fh.native(), static_cast<off_t>(desired_size)) != 0) { fmt::print(stderr, "ftruncate() error: {}\n", strerror(errno)); abort(); } using result_type = DataGenerator::result_type; assert(desired_size % sizeof(result_type) == 0); const std::size_t n = desired_size / sizeof(result_type); std::vector<result_type> data(n); std::generate(data.begin(), data.end(), std::ref(generator)); std::fstream f{tmpname, std::ios::out | std::ios::binary}; f.write(reinterpret_cast<char*>(data.data()), desired_size); f.close(); std::filesystem::rename(tmpname, name); REQUIRE(std::filesystem::exists(name)); REQUIRE(std::filesystem::file_size(name) == desired_size); return scoped_file{name}; } bool equal(const std::filesystem::path& filepath1, const std::filesystem::path& filepath2) { namespace io = boost::iostreams; io::mapped_file_source f1(filepath1); io::mapped_file_source f2(filepath2); const auto start_time = std::chrono::steady_clock::now(); bool rv = f1.size() == f2.size() && std::equal(f1.data(), f1.data() + f1.size(), f2.data()); const auto end_time = std::chrono::steady_clock::now(); const auto duration = std::chrono::duration<double, std::micro>(end_time - start_time); fmt::print(stderr, "::equal(\"{}\", \"{}\"): {}\n", filepath1.string(), filepath2.string(), duration); return rv; } std::tuple<std::string, std::string> split(const std::string& id) { constexpr auto delim = "://"sv; const auto n = id.find(delim); if(n == std::string::npos) { return {std::string{}, id}; } return {id.substr(0, n), id.substr(n + delim.length(), id.length())}; } uint32_t catch2_seed; std::string server_address; #define NDATASETS 10 SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { random_data_generator rng{catch2_seed, 0, std::numeric_limits<std::uint64_t>::max() - 1u}; GIVEN("Some input datasets from a PFS") { REQUIRE(!server_address.empty()); cargo::server server{server_address}; const auto sources = prepare_datasets("lustre://source-dataset-{}", NDATASETS); const auto targets = prepare_datasets("target-dataset-{}", NDATASETS); static std::vector<scoped_file> input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( create_temporary_file(filepath, 1000, rng)); } // ensure there are no dangling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { std::filesystem::remove(dataset.id()); }); WHEN("Transferring datasets to a POSIX storage system") { const auto tx = cargo::transfer_datasets(server, sources, targets); (void) tx; // give time for transfers to complete before removing input files // FIXME: replace with proper status checking for the transfer std::this_thread::sleep_for(1s); THEN("Output datasets are identical to input datasets") { std::vector<scoped_file> output_files; std::transform( targets.cbegin(), targets.cend(), std::back_inserter(output_files), [](auto&& target) { return scoped_file{target.id()}; }); for(std::size_t i = 0; i < input_files.size(); ++i) { REQUIRE(std::filesystem::exists(output_files[i].path())); REQUIRE(::equal(input_files[i].path(), output_files[i].path())); } } } } } SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { std::size_t file_size = 10000; // GENERATE(1000, 10000); [[maybe_unused]] ascii_data_generator ascii_gen{512}; [[maybe_unused]] random_data_generator rng{ catch2_seed, 0, std::numeric_limits<std::uint64_t>::max() - 1u}; GIVEN("Some input datasets from a POSIX storage system") { REQUIRE(!server_address.empty()); cargo::server server{server_address}; const auto sources = prepare_datasets("source-dataset-{}", NDATASETS); const auto targets = prepare_datasets("lustre://target-dataset-{}", NDATASETS); static std::vector<scoped_file> input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( create_temporary_file(d.id(), file_size, rng)); } // ensure there are no danling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { std::filesystem::remove(dataset.id()); }); WHEN("Transferring datasets to a PFS") { const auto tx = cargo::transfer_datasets(server, sources, targets); (void) tx; // give time for transfers to complete before removing input files // FIXME: replace with proper status checking for the transfer std::this_thread::sleep_for(1s); THEN("Output datasets are identical to input datasets") { std::vector<scoped_file> output_files; std::transform(targets.cbegin(), targets.cend(), std::back_inserter(output_files), [](auto&& target) { const auto& [prefix, filepath] = ::split(target.id()); return scoped_file{filepath}; }); for(std::size_t i = 0; i < input_files.size(); ++i) { const auto& [prefix, filepath] = ::split(targets[i].id()); REQUIRE(::equal(input_files[i].path(), filepath)); } } } } } int main(int argc, char* argv[]) { Catch::Session session; // There must be exactly one instance // Build a new parser on top of Catch2's using namespace Catch::Clara; auto cli = session.cli() // Get Catch2's command line parser | Opt(server_address, "server_address")["-S"]["--server-address"]("server address"); // Now pass the new composite back to Catch2, so it uses that session.cli(cli); // Let Catch2 (using Clara) parse the command line if(int returnCode = session.applyCommandLine(argc, argv); returnCode != 0) { return returnCode; } catch2_seed = session.config().rngSeed(); return session.run(); }