diff --git a/lib/cargo.hpp b/lib/cargo.hpp index 92008e08b0dfc2988e69288ecb6bed3b7afa51e8..c1d4ee86964f85d990145375e91ee4e44940bc75 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -59,20 +59,29 @@ private: class dataset { public: + enum class type { posix, parallel }; + dataset() noexcept = default; - explicit dataset(std::string id) noexcept; + + explicit dataset(std::string path, + dataset::type type = dataset::type::posix) noexcept; [[nodiscard]] std::string - id() const noexcept; + path() const noexcept; + + [[nodiscard]] bool + supports_parallel_transfer() const noexcept; template void serialize(Archive& ar) { - ar& m_id; + ar& m_path; + ar& m_type; } private: - std::string m_id; + std::string m_path; + dataset::type m_type = dataset::type::posix; }; diff --git a/lib/fmt_formatters.hpp b/lib/fmt_formatters.hpp index 90e5e78114f8c65d3fc8e35e554174dcd5da0bab..57934dc018f1a44e2f84109bb3822460cb5b0648 100644 --- a/lib/fmt_formatters.hpp +++ b/lib/fmt_formatters.hpp @@ -41,7 +41,7 @@ struct fmt::formatter : formatter { template auto format(const cargo::dataset& d, FormatContext& ctx) const { - const auto str = fmt::format("{{id: {}}}", std::quoted(d.id())); + const auto str = fmt::format("{{path: {}}}", std::quoted(d.path())); return formatter::format(str, ctx); } }; diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 070766c55b4b09bfc817a79ae285c6d187c58fb8..3a9181ec942f9c54db3ab8e9fd5ee04c5d7703c2 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -61,13 +61,19 @@ server::address() const noexcept { return m_address; } -dataset::dataset(std::string id) noexcept : m_id(std::move(id)) {} +dataset::dataset(std::string path, dataset::type type) noexcept + : m_path(std::move(path)), m_type(type) {} std::string -dataset::id() const noexcept { - return m_id; +dataset::path() const noexcept { + return m_path; }; +bool +dataset::supports_parallel_transfer() const noexcept { + return m_type == dataset::type::parallel; +} + transfer::transfer(transfer_id id) noexcept : m_id(id) {} [[nodiscard]] transfer_id diff --git a/src/master.cpp b/src/master.cpp index d4772e24d89b1cbc75b45a47fd43b4de8b8dee8c..38240f45120be61b9e1359ccb8a81762a2ae2ad9 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -41,39 +41,22 @@ get_address(auto&& req) { return req.get_endpoint(); } -std::tuple -split(const std::string& id) { - - constexpr auto delim = "://"sv; - const auto n = id.find(delim); - - if(n == std::string::npos) { - return {std::string{}, id}; - } - - return {id.substr(0, n), id.substr(n + delim.length(), id.length())}; -} - cargo::transfer_request_message create_request_message(const cargo::dataset& input, const cargo::dataset& output) { cargo::transfer_type tx_type; - const auto& [input_prefix, input_path] = split(input.id()); - const auto& [output_prefix, output_path] = split(output.id()); - - // FIXME: id should offer member functions to retrieve the parent - // namespace - if(input_prefix == "lustre") { + if(input.supports_parallel_transfer()) { tx_type = cargo::parallel_read; - } else if(output_prefix == "lustre") { + } else if(output.supports_parallel_transfer()) { tx_type = cargo::parallel_write; } else { tx_type = cargo::sequential; } - return cargo::transfer_request_message{input_path, output_path, tx_type}; + return cargo::transfer_request_message{input.path(), output.path(), + tx_type}; } } // namespace @@ -128,8 +111,8 @@ transfer_datasets(const net::request& req, boost::mpi::communicator world; for(auto i = 0u; i < sources.size(); ++i) { - const auto& input_path = sources[i].id(); - const auto& output_path = targets[i].id(); + const auto& input_path = sources[i].path(); + const auto& output_path = targets[i].path(); const auto m = ::create_request_message(sources[i], targets[i]); diff --git a/tests/common.cpp b/tests/common.cpp index 5cab1ff6357b59e7ba631af693a162c2f698c7af..a6f00367601a568b116951d46a214291362f6582 100644 --- a/tests/common.cpp +++ b/tests/common.cpp @@ -2,11 +2,12 @@ #include "common.hpp" std::vector -prepare_datasets(const std::string& pattern, size_t n) { +prepare_datasets(cargo::dataset::type type, const std::string& pattern, + size_t n) { std::vector datasets; datasets.reserve(n); for(size_t i = 0; i < n; ++i) { - datasets.emplace_back(fmt::format(fmt::runtime(pattern), i)); + datasets.emplace_back(fmt::format(fmt::runtime(pattern), i), type); } return datasets; diff --git a/tests/common.hpp b/tests/common.hpp index a6d6fb0ba290c839b91c01c885134b6b9117faa3..3085ed616d2cb77198d3de442d4b3063a29d88ba 100644 --- a/tests/common.hpp +++ b/tests/common.hpp @@ -88,6 +88,7 @@ public: }; std::vector -prepare_datasets(const std::string& pattern, size_t n); +prepare_datasets(cargo::dataset::type type, const std::string& pattern, + size_t n); #endif // CARGO_TESTS_COMMON_HPP diff --git a/tests/tests.cpp b/tests/tests.cpp index 62ebd037167967640e320549894951049e63d7c1..96bc19efdad4e905b52cb0f68f3e5d93f45230b1 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -189,19 +189,6 @@ equal(const std::filesystem::path& filepath1, return rv; } -std::tuple -split(const std::string& id) { - - constexpr auto delim = "://"sv; - const auto n = id.find(delim); - - if(n == std::string::npos) { - return {std::string{}, id}; - } - - return {id.substr(0, n), id.substr(n + delim.length(), id.length())}; -} - uint32_t catch2_seed; std::string server_address; @@ -218,22 +205,22 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { cargo::server server{server_address}; - const auto sources = - prepare_datasets("lustre://source-dataset-{}", NDATASETS); - const auto targets = prepare_datasets("target-dataset-{}", NDATASETS); + const auto sources = prepare_datasets(cargo::dataset::type::parallel, + "source-dataset-{}", NDATASETS); + const auto targets = prepare_datasets(cargo::dataset::type::posix, + "target-dataset-{}", NDATASETS); static std::vector input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { - const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( - create_temporary_file(filepath, 1000, rng)); + create_temporary_file(d.path(), 1000, rng)); } // ensure there are no dangling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { - std::filesystem::remove(dataset.id()); + std::filesystem::remove(dataset.path()); }); WHEN("Transferring datasets to a POSIX storage system") { @@ -248,10 +235,11 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { THEN("Output datasets are identical to input datasets") { std::vector output_files; - std::transform( - targets.cbegin(), targets.cend(), - std::back_inserter(output_files), - [](auto&& target) { return scoped_file{target.id()}; }); + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(output_files), + [](auto&& target) { + return scoped_file{target.path()}; + }); for(std::size_t i = 0; i < input_files.size(); ++i) { REQUIRE(std::filesystem::exists(output_files[i].path())); @@ -277,23 +265,22 @@ SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { cargo::server server{server_address}; - const auto sources = - prepare_datasets("source-dataset-{}", NDATASETS); - const auto targets = - prepare_datasets("lustre://target-dataset-{}", NDATASETS); + const auto sources = prepare_datasets(cargo::dataset::type::posix, + "source-dataset-{}", NDATASETS); + const auto targets = prepare_datasets(cargo::dataset::type::parallel, + "target-dataset-{}", NDATASETS); static std::vector input_files; input_files.reserve(sources.size()); for(const auto& d : sources) { - const auto& [prefix, filepath] = ::split(d.id()); input_files.emplace_back( - create_temporary_file(d.id(), file_size, rng)); + create_temporary_file(d.path(), file_size, rng)); } // ensure there are no danling output files from another test run std::for_each(targets.begin(), targets.end(), [](auto&& dataset) { - std::filesystem::remove(dataset.id()); + std::filesystem::remove(dataset.path()); }); WHEN("Transferring datasets to a PFS") { @@ -311,14 +298,11 @@ SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { std::transform(targets.cbegin(), targets.cend(), std::back_inserter(output_files), [](auto&& target) { - const auto& [prefix, filepath] = - ::split(target.id()); - return scoped_file{filepath}; + return scoped_file{target.path()}; }); for(std::size_t i = 0; i < input_files.size(); ++i) { - const auto& [prefix, filepath] = ::split(targets[i].id()); - REQUIRE(::equal(input_files[i].path(), filepath)); + REQUIRE(::equal(input_files[i].path(), targets[i].path())); } } }