From 647addcf28240e9e9b0a773960de063af5984e7b Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 3 Feb 2023 22:15:18 +0100 Subject: [PATCH] API: cargo::datasets can now be created as posix/parallel --- lib/cargo.hpp | 17 +++++++++---- lib/fmt_formatters.hpp | 2 +- lib/libcargo.cpp | 12 +++++++--- src/master.cpp | 29 +++++------------------ tests/common.cpp | 5 ++-- tests/common.hpp | 3 ++- tests/tests.cpp | 54 +++++++++++++++--------------------------- 7 files changed, 53 insertions(+), 69 deletions(-) diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 92008e0..c1d4ee8 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -59,20 +59,29 @@ private: class dataset { public: + enum class type { posix, parallel }; + dataset() noexcept = default; - explicit dataset(std::string id) noexcept; + + explicit dataset(std::string path, + dataset::type type = dataset::type::posix) noexcept; [[nodiscard]] std::string - id() const noexcept; + path() const noexcept; + + [[nodiscard]] bool + supports_parallel_transfer() const noexcept; template void serialize(Archive& ar) { - ar& m_id; + ar& m_path; + ar& m_type; } private: - std::string m_id; + std::string m_path; + dataset::type m_type = dataset::type::posix; }; diff --git a/lib/fmt_formatters.hpp b/lib/fmt_formatters.hpp index 90e5e78..57934dc 100644 --- a/lib/fmt_formatters.hpp +++ b/lib/fmt_formatters.hpp @@ -41,7 +41,7 @@ struct fmt::formatter : formatter { template auto format(const cargo::dataset& d, FormatContext& ctx) const { - const auto str = fmt::format("{{id: {}}}", std::quoted(d.id())); + const auto str = fmt::format("{{path: {}}}", std::quoted(d.path())); return formatter::format(str, ctx); } }; diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 070766c..3a9181e 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -61,13 +61,19 @@ server::address() const noexcept { return m_address; } -dataset::dataset(std::string id) noexcept : m_id(std::move(id)) {} +dataset::dataset(std::string path, dataset::type type) noexcept + : m_path(std::move(path)), m_type(type) {} std::string -dataset::id() const noexcept { - return m_id; +dataset::path() const noexcept { + return m_path; }; +bool +dataset::supports_parallel_transfer() const noexcept { + return m_type == dataset::type::parallel; +} + transfer::transfer(transfer_id id) noexcept : m_id(id) {} [[nodiscard]] transfer_id diff --git a/src/master.cpp b/src/master.cpp index d4772e2..38240f4 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -41,39 +41,22 @@ get_address(auto&& req) { return req.get_endpoint(); } -std::tuple -split(const std::string& id) { - - constexpr auto delim = "://"sv; - const auto n = id.find(delim); - - if(n == std::string::npos) { - return {std::string{}, id}; - } - - return {id.substr(0, n), id.substr(n + delim.length(), id.length())}; -} - cargo::transfer_request_message create_request_message(const cargo::dataset& input, const cargo::dataset& output) { cargo::transfer_type tx_type; - const auto& [input_prefix, input_path] = split(input.id()); - const auto& [output_prefix, output_path] = split(output.id()); - - // FIXME: id should offer member functions to retrieve the parent - // namespace - if(input_prefix == "lustre") { + if(input.supports_parallel_transfer()) { tx_type = cargo::parallel_read; - } else if(output_prefix == "lustre") { + } else if(output.supports_parallel_transfer()) { tx_type = cargo::parallel_write; } else { tx_type = cargo::sequential; } - return cargo::transfer_request_message{input_path, output_path, tx_type}; + return cargo::transfer_request_message{input.path(), output.path(), + tx_type}; } } // namespace @@ -128,8 +111,8 @@ transfer_datasets(const net::request& req, boost::mpi::communicator world; for(auto i = 0u; i < sources.size(); ++i) { - const auto& input_path = sources[i].id(); - const auto& output_path = targets[i].id(); + const auto& input_path = sources[i].path(); + const auto& output_path = targets[i].path(); const auto m = ::create_request_message(sources[i], targets[i]); diff --git a/tests/common.cpp b/tests/common.cpp index 5cab1ff..a6f0036 100644 --- a/tests/common.cpp +++ b/tests/common.cpp @@ -2,11 +2,12 @@ #include "common.hpp" std::vector -prepare_datasets(const std::string& pattern, size_t n) { +prepare_datasets(cargo::dataset::type type, const std::string& pattern, + size_t n) { std::vector datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { - datasets.emplace_back(fmt::format(fmt::runtime(pattern), i)); + datasets.emplace_back(fmt::format(fmt::runtime(pattern), i), type); } return datasets; diff --git a/tests/common.hpp b/tests/common.hpp index a6d6fb0..3085ed6 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -88,6 +88,7 @@ public: }; std::vector -prepare_datasets(const std::string& pattern, size_t n); +prepare_datasets(cargo::dataset::type type, const std::string& pattern, + size_t n); #endif // CARGO_TESTS_COMMON_HPP diff --git a/tests/tests.cpp b/tests/tests.cpp index 62ebd03..96bc19e 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -189,19 +189,6 @@ equal(const std::filesystem::path& filepath1, return rv; } -std::tuple -split(const std::string& id) { - - constexpr auto delim = "://"sv; - const auto n = id.find(delim); - - if(n == std::string::npos) { - return {std::string{}, id}; - } - - return {id.substr(0, n), id.substr(n + delim.length(), id.length())}; -} - uint32_t catch2_seed; std::string server_address; @@ -218,22 +205,22 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { cargo::server server{server_address}; - const auto sources = - prepare_datasets("lustre://source-dataset-{}", NDATASETS); - const auto targets = prepare_datasets("target-dataset-{}", NDATASETS); + const auto sources = prepare_datasets(cargo::dataset::type::parallel, + "source-dataset-{}", NDATASETS); + const auto targets = prepare_datasets(cargo::dataset::type::posix, + "target-dataset-{}", NDATASETS); static std::vector input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { - const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( - create_temporary_file(filepath, 1000, rng)); + create_temporary_file(d.path(), 1000, rng)); } // ensure there are no dangling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { - std::filesystem::remove(dataset.id()); + std::filesystem::remove(dataset.path()); }); WHEN("Transferring datasets to a POSIX storage system") { @@ -248,10 +235,11 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { THEN("Output datasets are identical to input datasets") { std::vector output_files; - std::transform( - targets.cbegin(), targets.cend(), - std::back_inserter(output_files), - [](auto&& target) { return scoped_file{target.id()}; }); + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(output_files), + [](auto&& target) { + return scoped_file{target.path()}; + }); for(std::size_t i = 0; i < input_files.size(); ++i) { REQUIRE(std::filesystem::exists(output_files[i].path())); @@ -277,23 +265,22 @@ SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { cargo::server server{server_address}; - const auto sources = - prepare_datasets("source-dataset-{}", NDATASETS); - const auto targets = - prepare_datasets("lustre://target-dataset-{}", NDATASETS); + const auto sources = prepare_datasets(cargo::dataset::type::posix, + "source-dataset-{}", NDATASETS); + const auto targets = prepare_datasets(cargo::dataset::type::parallel, + "target-dataset-{}", NDATASETS); static std::vector input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { - const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( - create_temporary_file(d.id(), file_size, rng)); + create_temporary_file(d.path(), file_size, rng)); } // ensure there are no danling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { - std::filesystem::remove(dataset.id()); + std::filesystem::remove(dataset.path()); }); WHEN("Transferring datasets to a PFS") { @@ -311,14 +298,11 @@ SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { std::transform(targets.cbegin(), targets.cend(), std::back_inserter(output_files), [](auto&& target) { - const auto& [prefix, filepath] = - ::split(target.id()); - return scoped_file{filepath}; + return scoped_file{target.path()}; }); for(std::size_t i = 0; i < input_files.size(); ++i) { - const auto& [prefix, filepath] = ::split(targets[i].id()); - REQUIRE(::equal(input_files[i].path(), filepath)); + REQUIRE(::equal(input_files[i].path(), targets[i].path())); } } } -- GitLab