diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 3e426476b705372b5eefb20f82409e1f7d26c81c..0b82c04a3f05512e2c23586bdcaad9ee7d8abcb6 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -24,9 +24,10 @@ add_library(cargo SHARED) -target_sources(cargo PRIVATE cargo.hpp fmt_formatters.hpp libcargo.cpp) +target_sources(cargo PRIVATE cargo.hpp fmt_formatters.hpp cargo/error.hpp + libcargo.cpp error.cpp) -list(APPEND public_headers "cargo.hpp") +list(APPEND public_headers "cargo.hpp;cargo/error.hpp") list(APPEND public_headers "fmt_formatters.hpp") target_include_directories( @@ -36,7 +37,15 @@ target_include_directories( set_target_properties(cargo PROPERTIES PUBLIC_HEADER "${public_headers}") -target_link_libraries(cargo PRIVATE logger::logger fmt::fmt thallium) +target_link_libraries(cargo PRIVATE + logger::logger + fmt::fmt + thallium + MPI::MPI_CXX + Boost::serialization + Boost::mpi + net::rpc_client +) ## Install library + targets ################################################### diff --git a/lib/cargo.hpp b/lib/cargo.hpp index c1d4ee86964f85d990145375e91ee4e44940bc75..cd0f04dbd802ae692ff35365b66bae89616b2177 100644 --- a/lib/cargo.hpp +++ b/lib/cargo.hpp @@ -28,11 +28,12 @@ #include #include +#include +#include namespace cargo { using transfer_id = std::uint64_t; -using error_code = std::int32_t; /** * A Cargo server @@ -75,8 +76,8 @@ public: template void serialize(Archive& ar) { - ar& m_path; - ar& m_type; + ar & m_path; + ar & m_type; } private: @@ -85,28 +86,111 @@ private: }; +/** + * The status of a Cargo transfer + */ +enum class transfer_state { pending, running, completed, failed }; + +class transfer_status; + /** * A transfer handler */ class transfer { -public: - transfer() noexcept = default; - explicit transfer(transfer_id id) noexcept; + friend transfer + transfer_datasets(const server& srv, const std::vector& sources, + const std::vector& targets); + + explicit transfer(transfer_id id, server srv) noexcept; [[nodiscard]] transfer_id id() const noexcept; - template - void - serialize(Archive& ar) { - ar& m_id; - } +public: + /** + * Get the current status of the associated transfer. + * + * @return A `transfer_status` object containing detailed information about + * the transfer status. + */ + [[nodiscard]] transfer_status + status() const; + + /** + * Wait for the associated transfer to complete. + * + * @return A `transfer_status` object containing detailed information about + * the transfer status. + */ + [[nodiscard]] transfer_status + wait() const; + + /** + * Wait for the associated transfer to complete or for a timeout to occur. + * @param timeout The maximum amount of time to wait for the transfer to + * complete. + * @return A `transfer_status` object containing detailed information about + * the transfer status. + */ + [[nodiscard]] transfer_status + wait_for(const std::chrono::nanoseconds& timeout) const; private: transfer_id m_id; + server m_srv; }; +/** + * Detailed status information for a transfer + */ +class transfer_status { + + friend transfer_status + transfer::status() const; + + transfer_status(transfer_state status, error_code error) noexcept; + +public: + /** + * Get the current status of the associated transfer. + * + * @return A `transfer_state` enum value representing the current status. + */ + [[nodiscard]] transfer_state + state() const noexcept; + + /** + * Check whether the transfer has completed. + * + * @return true if the transfer has completed, false otherwise. + */ + [[nodiscard]] bool + done() const noexcept; + + /** + * Check whether the transfer has failed. + * + * @return true if the transfer has failed, false otherwise. + */ + [[nodiscard]] bool + failed() const noexcept; + + /** + * Retrieve the error code associated with a failed transfer. + * + * @return An error code describing a transfer failure or + * `error_code::success` if the transfer succeeded. + * If the transfer has not yet completed, + * `error_code::transfer_in_progress` is returned. + */ + [[nodiscard]] error_code + error() const; + +private: + transfer_state m_state; + error_code m_error; +}; /** * Request the transfer of a dataset collection. @@ -116,10 +200,24 @@ private: * @param targets The output datasets that should be generated. * @return A transfer */ -cargo::transfer +transfer transfer_datasets(const server& srv, const std::vector& sources, const std::vector& targets); +/** + * Request the transfer of a single dataset. + * This function is a convenience wrapper around the previous one. + * It takes a single source and a single target dataset. + * + * @param srv The Cargo server that should execute the transfer. + * @param source The input dataset that should be transferred. + * @param target The output dataset that should be generated. + * @return A transfer + */ +transfer +transfer_dataset(const server& srv, const dataset& source, + const dataset& target); + } // namespace cargo #endif // CARGO_HPP diff --git a/lib/cargo/error.hpp b/lib/cargo/error.hpp new file mode 100644 index 0000000000000000000000000000000000000000..fe9ee8689706948ca2fb0d6f323ad6cc18cd8724 --- /dev/null +++ b/lib/cargo/error.hpp @@ -0,0 +1,120 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_ERROR_HPP +#define CARGO_ERROR_HPP + +#include +#include + +namespace cargo { + +enum class error_category : std::uint32_t { + generic_error = 0, + system_error = 1, + mpi_error = 2, +}; + +class error_code { + + enum class error_value : std::uint32_t { + success = 0, + snafu = 1, + not_implemented = 2, + no_such_transfer = 3, + transfer_in_progress = 4, + }; + +public: + static const error_code success; + static const error_code snafu; + static const error_code not_implemented; + static const error_code no_such_transfer; + static const error_code transfer_in_progress; + + constexpr error_code() : error_code(error_value::success) {} + constexpr explicit error_code(error_value v) + : m_category(error_category::generic_error), m_value(v) {} + constexpr error_code(error_category c, std::uint32_t v) + : m_category(c), m_value(static_cast(v)) {} + + constexpr explicit operator bool() const { + return m_value != error_value::success; + } + + [[nodiscard]] constexpr error_category + category() const { + return m_category; + } + + [[nodiscard]] constexpr std::uint32_t + value() const { + return static_cast(m_value); + } + + [[nodiscard]] std::string_view + name() const; + + [[nodiscard]] std::string + message() const; + + template + void + serialize(Archive&& ar, std::uint32_t version) { + (void) version; + ar & m_category; + ar & m_value; + } + +private: + error_category m_category; + error_value m_value; +}; + +constexpr error_code error_code::success{error_value::success}; +constexpr error_code error_code::snafu{error_value::snafu}; +constexpr error_code error_code::not_implemented{error_value::not_implemented}; +constexpr error_code error_code::no_such_transfer{ + error_value::no_such_transfer}; +constexpr error_code error_code::transfer_in_progress{ + error_value::transfer_in_progress}; + +static constexpr cargo::error_code +make_system_error(std::uint32_t ec) { + return cargo::error_code{cargo::error_category::system_error, ec}; +} + +static constexpr cargo::error_code +make_mpi_error(std::uint32_t ec) { + return cargo::error_code{cargo::error_category::mpi_error, ec}; +} + +constexpr bool +operator==(const error_code& lhs, const error_code& rhs) { + return lhs.category() == rhs.category() && lhs.value() == rhs.value(); +} + +} // namespace cargo + +#endif // CARGO_ERROR_HPP diff --git a/lib/error.cpp b/lib/error.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9987a4572d91fade361b61a74902c22beb74d56f --- /dev/null +++ b/lib/error.cpp @@ -0,0 +1,320 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "cargo/error.hpp" + +// clang-format off +#define EXPAND(s) case s: return #s +// clang-format on + +constexpr std::string_view +errno_name(int ec) { + switch(ec) { + EXPAND(EPERM); + EXPAND(ENOENT); + EXPAND(ESRCH); + EXPAND(EINTR); + EXPAND(EIO); + EXPAND(ENXIO); + EXPAND(E2BIG); + EXPAND(ENOEXEC); + EXPAND(EBADF); + EXPAND(ECHILD); + EXPAND(EAGAIN); + EXPAND(ENOMEM); + EXPAND(EACCES); + EXPAND(EFAULT); + EXPAND(ENOTBLK); + EXPAND(EBUSY); + EXPAND(EEXIST); + EXPAND(EXDEV); + EXPAND(ENODEV); + EXPAND(ENOTDIR); + EXPAND(EISDIR); + EXPAND(EINVAL); + EXPAND(ENFILE); + EXPAND(EMFILE); + EXPAND(ENOTTY); + EXPAND(ETXTBSY); + EXPAND(EFBIG); + EXPAND(ENOSPC); + EXPAND(ESPIPE); + EXPAND(EROFS); + EXPAND(EMLINK); + EXPAND(EPIPE); + EXPAND(EDOM); + EXPAND(ERANGE); + EXPAND(EDEADLK); + EXPAND(ENAMETOOLONG); + EXPAND(ENOLCK); + EXPAND(ENOSYS); + EXPAND(ENOTEMPTY); + EXPAND(ELOOP); + // EXPAND(EWOULDBLOCK); + EXPAND(ENOMSG); + EXPAND(EIDRM); + EXPAND(ECHRNG); + EXPAND(EL2NSYNC); + EXPAND(EL3HLT); + EXPAND(EL3RST); + EXPAND(ELNRNG); + EXPAND(EUNATCH); + EXPAND(ENOCSI); + EXPAND(EL2HLT); + EXPAND(EBADE); + EXPAND(EBADR); + EXPAND(EXFULL); + EXPAND(ENOANO); + EXPAND(EBADRQC); + EXPAND(EBADSLT); + // EXPAND(EDEADLOCK); + EXPAND(EBFONT); + EXPAND(ENOSTR); + EXPAND(ENODATA); + EXPAND(ETIME); + EXPAND(ENOSR); + EXPAND(ENONET); + EXPAND(ENOPKG); + EXPAND(EREMOTE); + EXPAND(ENOLINK); + EXPAND(EADV); + EXPAND(ESRMNT); + EXPAND(ECOMM); + EXPAND(EPROTO); + EXPAND(EMULTIHOP); + EXPAND(EDOTDOT); + EXPAND(EBADMSG); + EXPAND(EOVERFLOW); + EXPAND(ENOTUNIQ); + EXPAND(EBADFD); + EXPAND(EREMCHG); + EXPAND(ELIBACC); + EXPAND(ELIBBAD); + EXPAND(ELIBSCN); + EXPAND(ELIBMAX); + EXPAND(ELIBEXEC); + EXPAND(EILSEQ); + EXPAND(ERESTART); + EXPAND(ESTRPIPE); + EXPAND(EUSERS); + EXPAND(ENOTSOCK); + EXPAND(EDESTADDRREQ); + EXPAND(EMSGSIZE); + EXPAND(EPROTOTYPE); + EXPAND(ENOPROTOOPT); + EXPAND(EPROTONOSUPPORT); + EXPAND(ESOCKTNOSUPPORT); + EXPAND(EOPNOTSUPP); + EXPAND(EPFNOSUPPORT); + EXPAND(EAFNOSUPPORT); + EXPAND(EADDRINUSE); + EXPAND(EADDRNOTAVAIL); + EXPAND(ENETDOWN); + EXPAND(ENETUNREACH); + EXPAND(ENETRESET); + EXPAND(ECONNABORTED); + EXPAND(ECONNRESET); + EXPAND(ENOBUFS); + EXPAND(EISCONN); + EXPAND(ENOTCONN); + EXPAND(ESHUTDOWN); + EXPAND(ETOOMANYREFS); + EXPAND(ETIMEDOUT); + EXPAND(ECONNREFUSED); + EXPAND(EHOSTDOWN); + EXPAND(EHOSTUNREACH); + EXPAND(EALREADY); + EXPAND(EINPROGRESS); + EXPAND(ESTALE); + EXPAND(EUCLEAN); + EXPAND(ENOTNAM); + EXPAND(ENAVAIL); + EXPAND(EISNAM); + EXPAND(EREMOTEIO); + EXPAND(EDQUOT); + EXPAND(ENOMEDIUM); + EXPAND(EMEDIUMTYPE); + EXPAND(ECANCELED); + EXPAND(ENOKEY); + EXPAND(EKEYEXPIRED); + EXPAND(EKEYREVOKED); + EXPAND(EKEYREJECTED); + EXPAND(EOWNERDEAD); + EXPAND(ENOTRECOVERABLE); + EXPAND(ERFKILL); + EXPAND(EHWPOISON); + default: + return "EUNKNOWN"; + } +} + +constexpr std::string_view +mpi_error_name(int ec) { + + switch(ec) { + EXPAND(MPI_SUCCESS); + EXPAND(MPI_ERR_BUFFER); + EXPAND(MPI_ERR_COUNT); + EXPAND(MPI_ERR_TYPE); + EXPAND(MPI_ERR_TAG); + EXPAND(MPI_ERR_COMM); + EXPAND(MPI_ERR_RANK); + EXPAND(MPI_ERR_REQUEST); + EXPAND(MPI_ERR_ROOT); + EXPAND(MPI_ERR_GROUP); + EXPAND(MPI_ERR_OP); + EXPAND(MPI_ERR_TOPOLOGY); + EXPAND(MPI_ERR_DIMS); + EXPAND(MPI_ERR_ARG); + EXPAND(MPI_ERR_UNKNOWN); + EXPAND(MPI_ERR_TRUNCATE); + EXPAND(MPI_ERR_OTHER); + EXPAND(MPI_ERR_INTERN); + EXPAND(MPI_ERR_IN_STATUS); + EXPAND(MPI_ERR_PENDING); + EXPAND(MPI_ERR_ACCESS); + EXPAND(MPI_ERR_AMODE); + EXPAND(MPI_ERR_ASSERT); + EXPAND(MPI_ERR_BAD_FILE); + EXPAND(MPI_ERR_BASE); + EXPAND(MPI_ERR_CONVERSION); + EXPAND(MPI_ERR_DISP); + EXPAND(MPI_ERR_DUP_DATAREP); + EXPAND(MPI_ERR_FILE_EXISTS); + EXPAND(MPI_ERR_FILE_IN_USE); + EXPAND(MPI_ERR_FILE); + EXPAND(MPI_ERR_INFO_KEY); + EXPAND(MPI_ERR_INFO_NOKEY); + EXPAND(MPI_ERR_INFO_VALUE); + EXPAND(MPI_ERR_INFO); + EXPAND(MPI_ERR_IO); + EXPAND(MPI_ERR_KEYVAL); + EXPAND(MPI_ERR_LOCKTYPE); + EXPAND(MPI_ERR_NAME); + EXPAND(MPI_ERR_NO_MEM); + EXPAND(MPI_ERR_NOT_SAME); + EXPAND(MPI_ERR_NO_SPACE); + EXPAND(MPI_ERR_NO_SUCH_FILE); + EXPAND(MPI_ERR_PORT); + EXPAND(MPI_ERR_QUOTA); + EXPAND(MPI_ERR_READ_ONLY); + EXPAND(MPI_ERR_RMA_CONFLICT); + EXPAND(MPI_ERR_RMA_SYNC); + EXPAND(MPI_ERR_SERVICE); + EXPAND(MPI_ERR_SIZE); + EXPAND(MPI_ERR_SPAWN); + EXPAND(MPI_ERR_UNSUPPORTED_DATAREP); + EXPAND(MPI_ERR_UNSUPPORTED_OPERATION); + EXPAND(MPI_ERR_WIN); + EXPAND(MPI_T_ERR_MEMORY); + EXPAND(MPI_T_ERR_NOT_INITIALIZED); + EXPAND(MPI_T_ERR_CANNOT_INIT); + EXPAND(MPI_T_ERR_INVALID_INDEX); + EXPAND(MPI_T_ERR_INVALID_ITEM); + EXPAND(MPI_T_ERR_INVALID_HANDLE); + EXPAND(MPI_T_ERR_OUT_OF_HANDLES); + EXPAND(MPI_T_ERR_OUT_OF_SESSIONS); + EXPAND(MPI_T_ERR_INVALID_SESSION); + EXPAND(MPI_T_ERR_CVAR_SET_NOT_NOW); + EXPAND(MPI_T_ERR_CVAR_SET_NEVER); + EXPAND(MPI_T_ERR_PVAR_NO_STARTSTOP); + EXPAND(MPI_T_ERR_PVAR_NO_WRITE); + EXPAND(MPI_T_ERR_PVAR_NO_ATOMIC); + EXPAND(MPI_ERR_RMA_RANGE); + EXPAND(MPI_ERR_RMA_ATTACH); + EXPAND(MPI_ERR_RMA_FLAVOR); + EXPAND(MPI_ERR_RMA_SHARED); + EXPAND(MPI_T_ERR_INVALID); + EXPAND(MPI_T_ERR_INVALID_NAME); + default: + return "MPI_ERR_UNKNOWN"; + } +} + +namespace cargo { + +[[nodiscard]] std::string_view +error_code::name() const { + + switch(m_category) { + case error_category::generic_error: + break; + case error_category::system_error: + return errno_name(static_cast(m_value)); + case error_category::mpi_error: + return mpi_error_name(static_cast(m_value)); + default: + return "CARGO_UNKNOWN_ERROR"; + } + + switch(m_value) { + case error_value::success: + return "CARGO_SUCCESS"; + case error_value::snafu: + return "CARGO_SNAFU"; + case error_value::not_implemented: + return "CARGO_NOT_IMPLEMENTED"; + case error_value::no_such_transfer: + return "CARGO_NO_SUCH_TRANSFER"; + case error_value::transfer_in_progress: + return "CARGO_TRANSFER_IN_PROGRESS"; + default: + return "CARGO_UNKNOWN_ERROR"; + } +}; + +[[nodiscard]] std::string +error_code::message() const { + + switch(m_category) { + case error_category::generic_error: + switch(m_value) { + case error_value::success: + return "success"; + case error_value::snafu: + return "snafu"; + case error_value::not_implemented: + return "not implemented"; + case error_value::no_such_transfer: + return "no such transfer"; + case error_value::transfer_in_progress: + return "transfer in progress"; + default: + return "unknown error"; + } + case error_category::system_error: { + std::error_code std_ec = + std::make_error_code(static_cast(m_value)); + return std_ec.message(); + } + case error_category::mpi_error: + return boost::mpi::error_string(static_cast(m_value)); + default: + return "unknown error category"; + } +} + +} // namespace cargo \ No newline at end of file diff --git a/lib/fmt_formatters.hpp b/lib/fmt_formatters.hpp index 57934dc018f1a44e2f84109bb3822460cb5b0648..dbcb9aa03856f697c0aa9b7a4cfbb308244ced3f 100644 --- a/lib/fmt_formatters.hpp +++ b/lib/fmt_formatters.hpp @@ -27,7 +27,9 @@ #include #include +#include #include +#include "cargo/error.hpp" namespace cargo { @@ -64,9 +66,52 @@ struct fmt::formatter : formatter { template auto format(const cargo::transfer& tx, FormatContext& ctx) const { - const auto str = fmt::format("{{id: {}}}", tx.id()); + const auto str = fmt::format("{{tid: {}}}", tx.id()); return formatter::format(str, ctx); } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::error_code& ec, FormatContext& ctx) const { + return formatter::format(ec.name(), ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::transfer_state& s, FormatContext& ctx) const { + switch(s) { + case cargo::transfer_state::pending: + return formatter::format("pending", ctx); + case cargo::transfer_state::running: + return formatter::format("running", ctx); + case cargo::transfer_state::completed: + return formatter::format("completed", ctx); + case cargo::transfer_state::failed: + return formatter::format("failed", ctx); + default: + return formatter::format("unknown", ctx); + } + } +}; + +template +struct fmt::formatter> : formatter { + // parse is inherited from formatter. + template + auto + format(const std::optional& v, FormatContext& ctx) const { + return formatter::format( + v ? fmt::format("{}", v.value()) : "none", ctx); + } +}; + + #endif // CARGO_FMT_FORMATTERS_HPP diff --git a/lib/libcargo.cpp b/lib/libcargo.cpp index 3a9181ec942f9c54db3ab8e9fd5ee04c5d7703c2..93eed114e0bb2848bd6b933259e1bd9e5432a8cf 100644 --- a/lib/libcargo.cpp +++ b/lib/libcargo.cpp @@ -24,23 +24,26 @@ #include #include -#include -#include -#include +#include #include #include +#include +#include +#include +#include +#include +#include using namespace std::literals; namespace cargo { -struct remote_procedure { - static std::uint64_t - new_id() { - static std::atomic_uint64_t current_id; - return current_id++; - } -}; +using generic_response = proto::generic_response; +template +using response_with_value = proto::response_with_value; +using response_with_id = proto::response_with_id; + +#define RPC_NAME() (__FUNCTION__) server::server(std::string address) noexcept : m_address(std::move(address)) { @@ -74,39 +77,141 @@ dataset::supports_parallel_transfer() const noexcept { return m_type == dataset::type::parallel; } -transfer::transfer(transfer_id id) noexcept : m_id(id) {} +transfer::transfer(transfer_id id, server srv) noexcept + : m_id(id), m_srv(std::move(srv)) {} [[nodiscard]] transfer_id transfer::id() const noexcept { return m_id; } -cargo::transfer +transfer_status +transfer::status() const { + + using proto::status_response; + + network::client rpc_client{m_srv.protocol()}; + const auto rpc = + network::rpc_info::create("transfer_status", m_srv.address()); + using response_type = status_response; + + if(const auto lookup_rv = rpc_client.lookup(m_srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{tid: {}}}", rpc, m_id); + + if(const auto call_rv = endp.call(rpc.name(), m_id); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + const auto& [s, ec] = resp.value(); + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); + + if(resp.error_code()) { + throw std::runtime_error( + fmt::format("rpc call failed: {}", resp.error_code())); + } + + return transfer_status{s, ec.value_or(error_code::success)}; + } + } + + throw std::runtime_error("rpc lookup failed"); +} + +transfer_status::transfer_status(transfer_state status, + error_code error) noexcept + : m_state(status), m_error(error) {} + +transfer_state +transfer_status::state() const noexcept { + return m_state; +} + +bool +transfer_status::done() const noexcept { + return m_state == transfer_state::completed; +} + +bool +transfer_status::failed() const noexcept { + return m_state == transfer_state::failed; +} + +error_code +transfer_status::error() const { + switch(m_state) { + case transfer_state::pending: + [[fallthrough]]; + case transfer_state::running: + return error_code::transfer_in_progress; + default: + return m_error; + } +} + +transfer transfer_datasets(const server& srv, const std::vector& sources, const std::vector& targets) { - thallium::engine engine(srv.protocol(), THALLIUM_CLIENT_MODE); - thallium::remote_procedure transfer_datasets = - engine.define("transfer_datasets"); - thallium::endpoint endp = engine.lookup(srv.address()); + network::client rpc_client{srv.protocol()}; + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); + + if(const auto lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc {:<} body: {{sources: {}, targets: {}}}", rpc, sources, + targets); + + if(const auto call_rv = endp.call(rpc.name(), sources, targets); + call_rv.has_value()) { - const auto rpc_id = remote_procedure::new_id(); + const response_with_id resp{call_rv.value()}; - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{sources: {}, targets: {}}}", - rpc_id, std::quoted(__FUNCTION__), - std::quoted(std::string{engine.self()}), sources, targets); + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); - cargo::error_code ec = transfer_datasets.on(endp)(sources, targets); - const auto tx = cargo::transfer{42}; - const auto op_id = 42; + if(resp.error_code()) { + throw std::runtime_error( + fmt::format("rpc call failed: {}", resp.error_code())); + } - LOGGER_INFO("rpc id: {} name: {} from: {} <= " - "body: {{retval: {}, transfer: {}}} [op_id: {}]", - rpc_id, std::quoted(__FUNCTION__), - std::quoted(std::string{endp}), ec, tx, op_id); + return transfer{resp.value(), srv}; + } + } + + throw std::runtime_error("rpc lookup failed"); +} + +transfer_status +transfer::wait() const { + // wait for the transfer to complete + auto s = status(); + + while(!s.done() && !s.failed()) { + s = wait_for(150ms); + } + + return s; +} + +transfer_status +transfer::wait_for(const std::chrono::nanoseconds& timeout) const { + std::this_thread::sleep_for(timeout); + return status(); +} - return tx; +transfer +transfer_dataset(const server& srv, const dataset& source, + const dataset& target) { + return transfer_datasets(srv, std::vector{source}, + std::vector{target}); } } // namespace cargo \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ca9ff8c7802e686aadd46564e98f18b2746c0631..ab210a2d862dcd506bb8143e4a035d636a8eb0ac 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -34,11 +34,27 @@ target_sources( PRIVATE cargo.cpp master.cpp master.hpp - worker.cpp - worker.hpp + worker/memory.hpp + worker/mpio_read.cpp + worker/mpio_read.hpp + worker/mpio_write.hpp + worker/mpio_write.cpp + worker/ops.cpp + worker/ops.hpp + worker/sequential.cpp + worker/sequential.hpp + worker/worker.cpp + worker/worker.hpp env.hpp mpioxx.hpp - message.hpp + parallel_request.cpp + parallel_request.hpp + request_manager.cpp + request_manager.hpp + shared_mutex.hpp + proto/rpc/response.hpp + proto/mpi/message.hpp + boost_serialization_std_optional.hpp ) target_include_directories( diff --git a/src/boost_serialization_std_optional.hpp b/src/boost_serialization_std_optional.hpp new file mode 100644 index 0000000000000000000000000000000000000000..dc936e39b1972598d4495c98d0a71f1090ac2e04 --- /dev/null +++ b/src/boost_serialization_std_optional.hpp @@ -0,0 +1,89 @@ +/////////1/////////2/////////3/////////4/////////5/////////6/////////7/////////8 +// optional.hpp - non-intrusive serialization of optional types +// +// copyright (c) 2019 Samuel Debionne, ESRF +// +// Use, modification and distribution is subject to the Boost Software +// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +// See http://www.boost.org for updates, documentation, and revision history. +// +// Widely inspired form boost::optional serialization +// + +#ifndef CARGO_BOOST_SERIALIZATION_STD_OPTIONAL_HPP +#define CARGO_BOOST_SERIALIZATION_STD_OPTIONAL_HPP + +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +// function specializations must be defined in the appropriate +// namespace - boost::serialization +namespace boost { +namespace serialization { + +template +void +save(Archive& ar, const std::optional& t, const unsigned int /*version*/ +) { + // It is an inherent limitation to the serialization of optional.hpp + // that the underlying type must be either a pointer or must have a + // default constructor. It's possible that this could change sometime + // in the future, but for now, one will have to work around it. This can + // be done by serialization the optional as optional +#if !defined(BOOST_NO_CXX11_HDR_TYPE_TRAITS) + BOOST_STATIC_ASSERT( + boost::serialization::detail::is_default_constructible::value || + boost::is_pointer::value); +#endif + const bool tflag = t.has_value(); + ar << boost::serialization::make_nvp("has_value", tflag); + if(tflag) { + ar << boost::serialization::make_nvp("value", *t); + } +} + +template +void +load(Archive& ar, std::optional& t, const unsigned int version) { + + (void) version; + + bool tflag; + ar >> boost::serialization::make_nvp("has_value", tflag); + if(!tflag) { + t.reset(); + return; + } + + if(!t.has_value()) + t = T(); + ar >> boost::serialization::make_nvp("value", *t); +} + +template +void +serialize(Archive& ar, std::optional& t, const unsigned int version) { + boost::serialization::split_free(ar, t, version); +} + +template +struct version> { + BOOST_STATIC_CONSTANT(int, value = 1); +}; + +} // namespace serialization +} // namespace boost + +#endif // CARGO_BOOST_SERIALIZATION_STD_OPTIONAL_HPP diff --git a/src/cargo.cpp b/src/cargo.cpp index 889a1d509884e76646619c70b278cc768d9af5f5..a0f3a0cc7164e603cc5131943a7dd998cafa9c5c 100644 --- a/src/cargo.cpp +++ b/src/cargo.cpp @@ -35,7 +35,7 @@ #include #include "master.hpp" -#include "worker.hpp" +#include "worker/worker.hpp" #include "env.hpp" namespace fs = std::filesystem; @@ -63,8 +63,8 @@ parse_command_line(int argc, char* argv[]) { // force logging messages to file app.add_option("-o,--output", cfg.output_file, - "Write any output to FILENAME rather than sending it to the " - "console") + "Write any output to FILENAME. rather than sending it " + "to the console.") ->option_text("FILENAME"); app.add_option("-l,--listen", cfg.address, @@ -92,6 +92,12 @@ parse_command_line(int argc, char* argv[]) { } } +std::filesystem::path +get_process_output_file(std::filesystem::path base) { + base += fmt::format(".{}", ::getpid()); + return base; +} + } // namespace int @@ -104,20 +110,25 @@ main(int argc, char* argv[]) { mpi::communicator world; try { - if(world.rank() == 0) { - - master_server srv{cfg.progname, cfg.address, cfg.daemonize, - fs::current_path()}; + if(const auto rank = world.rank(); rank == 0) { + cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize, + fs::current_path()}; if(cfg.output_file) { srv.configure_logger(logger::logger_type::file, - *cfg.output_file); + get_process_output_file(*cfg.output_file)); } return srv.run(); - } else { - worker(); + + cargo::worker w{cfg.progname, rank}; + + if(cfg.output_file) { + w.set_output_file(get_process_output_file(*cfg.output_file)); + } + + return w.run(); } } catch(const std::exception& ex) { fmt::print(stderr, diff --git a/src/master.cpp b/src/master.cpp index cca97040532efdf59b5571820462b8a1fc615b6d..e274eb2b28b6afa0ecb4fda2c823aee232d6c005 100644 --- a/src/master.cpp +++ b/src/master.cpp @@ -30,57 +30,122 @@ #include #include #include -#include "message.hpp" +#include #include "master.hpp" #include "net/utilities.hpp" #include "net/request.hpp" #include "proto/rpc/response.hpp" +#include "proto/mpi/message.hpp" +#include "parallel_request.hpp" using namespace std::literals; +namespace mpi = boost::mpi; namespace { -cargo::transfer_request_message -create_request_message(const cargo::dataset& input, - const cargo::dataset& output) { - - cargo::transfer_type tx_type; +std::tuple +make_message(std::uint64_t tid, std::uint32_t seqno, + const cargo::dataset& input, const cargo::dataset& output) { if(input.supports_parallel_transfer()) { - tx_type = cargo::parallel_read; - } else if(output.supports_parallel_transfer()) { - tx_type = cargo::parallel_write; - } else { - tx_type = cargo::sequential; + return std::make_tuple(static_cast(cargo::tag::pread), + cargo::transfer_message{tid, seqno, input.path(), + output.path()}); + } + + if(output.supports_parallel_transfer()) { + return std::make_tuple(static_cast(cargo::tag::pwrite), + cargo::transfer_message{tid, seqno, input.path(), + output.path()}); } - return cargo::transfer_request_message{input.path(), output.path(), - tx_type}; + return std::make_tuple( + static_cast(cargo::tag::sequential), + cargo::transfer_message{tid, seqno, input.path(), output.path()}); } } // namespace using namespace std::literals; +namespace cargo { + master_server::master_server(std::string name, std::string address, bool daemonize, std::filesystem::path rundir, std::optional pidfile) : server(std::move(name), std::move(address), daemonize, std::move(rundir), std::move(pidfile)), - provider(m_network_engine, 0) { + provider(m_network_engine, 0), + m_mpi_listener_ess(thallium::xstream::create()), + m_mpi_listener_ult(m_mpi_listener_ess->make_thread( + [this]() { mpi_listener_ult(); })) { #define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name provider::define(EXPAND(ping)); provider::define(EXPAND(transfer_datasets)); + provider::define(EXPAND(transfer_status)); #undef EXPAND + + // ESs and ULTs need to be joined before the network engine is + // actually finalized, and ~master_server() is too late for that. + // The push_prefinalize_callback() and push_finalize_callback() functions + // serve this purpose. The former is called before Mercury is finalized, + // while the latter is called in between that and Argobots finalization. + m_network_engine.push_finalize_callback([this]() { + m_mpi_listener_ult->join(); + m_mpi_listener_ult = thallium::managed{}; + m_mpi_listener_ess->join(); + m_mpi_listener_ess = thallium::managed{}; + }); } -#define RPC_NAME() ("ADM_"s + __FUNCTION__) +master_server::~master_server() {} void -master_server::ping(const network::request& req) { +master_server::mpi_listener_ult() { + + mpi::communicator world; + + while(!m_shutting_down) { + + auto msg = world.iprobe(); + + if(!msg) { + thallium::thread::self().sleep(m_network_engine, 150); + continue; + } + + switch(static_cast(msg->tag())) { + case tag::status: { + status_message m; + world.recv(msg->source(), msg->tag(), m); + LOGGER_INFO("msg => from: {} body: {{payload: {}}}", + msg->source(), m); + + m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, + m.state(), m.error_code()); + break; + } + + default: + LOGGER_WARN("msg => from: {} body: {{Unexpected tag: {}}}", + msg->source(), msg->tag()); + break; + } + } + + // shutting down, notify all workers + for(int rank = 1; rank < world.size(); ++rank) { + LOGGER_INFO("msg <= to: {} body: {{shutdown}}", rank); + world.send(static_cast(rank), static_cast(tag::shutdown)); + } +} +#define RPC_NAME() (__FUNCTION__) + +void +master_server::ping(const network::request& req) { using network::get_address; using network::rpc_info; using proto::generic_response; @@ -89,7 +154,7 @@ master_server::ping(const network::request& req) { LOGGER_INFO("rpc {:>} body: {{}}", rpc); - const auto resp = generic_response{rpc.id(), cargo::error_code{0}}; + const auto resp = generic_response{rpc.id(), error_code::success}; LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); @@ -98,40 +163,75 @@ master_server::ping(const network::request& req) { void master_server::transfer_datasets(const network::request& req, - const std::vector& sources, - const std::vector& targets) { - + const std::vector& sources, + const std::vector& targets) { using network::get_address; using network::rpc_info; using proto::generic_response; + using proto::response_with_id; + mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{sources: {}, targets: {}}}", rpc, sources, targets); - const auto resp = generic_response{rpc.id(), cargo::error_code{0}}; - - assert(sources.size() == targets.size()); - - boost::mpi::communicator world; - for(auto i = 0u; i < sources.size(); ++i) { - - const auto& input_path = sources[i].path(); - const auto& output_path = targets[i].path(); + m_request_manager.create(sources.size(), world.size() - 1) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Failed to create request: {}", ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + req.respond(generic_response{rpc.id(), ec}); + }) + .map([&](auto&& r) { + assert(sources.size() == targets.size()); + + for(auto i = 0u; i < sources.size(); ++i) { + const auto& s = sources[i]; + const auto& d = targets[i]; + + for(std::size_t rank = 1; rank <= r.nworkers(); ++rank) { + const auto [t, m] = make_message(r.tid(), i, s, d); + LOGGER_INFO("msg <= to: {} body: {}", rank, m); + world.send(static_cast(rank), t, m); + } + } + + LOGGER_INFO("rpc {:<} body: {{retval: {}, tid: {}}}", rpc, + error_code::success, r.tid()); + req.respond(response_with_id{rpc.id(), error_code::success, + r.tid()}); + }); +} - const auto m = ::create_request_message(sources[i], targets[i]); +void +master_server::transfer_status(const network::request& req, std::uint64_t tid) { - for(int rank = 1; rank < world.size(); ++rank) { - world.send(rank, static_cast(cargo::message_tags::transfer), - m); - } - } + using network::get_address; + using network::rpc_info; + using proto::generic_response; + using proto::status_response; - cargo::transfer tx{42}; + using response_type = + status_response; - LOGGER_INFO("rpc {:<} body: {{retval: {}, transfer: {}}}", rpc, - resp.error_code(), tx); + mpi::communicator world; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - req.respond(resp); + LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); + + m_request_manager.lookup(tid) + .or_else([&](auto&& ec) { + LOGGER_ERROR("Failed to lookup request: {}", ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + req.respond(generic_response{rpc.id(), ec}); + }) + .map([&](auto&& rs) { + LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc, + error_code::success, rs); + req.respond( + response_type{rpc.id(), error_code::success, + std::make_pair(rs.state(), rs.error())}); + }); } + +} // namespace cargo diff --git a/src/master.hpp b/src/master.hpp index 93acbae6cb86c9608169b0c83bd5231a66167bd0..77ca0fd4236ba88c3fe474c59813a6e080fe76d6 100644 --- a/src/master.hpp +++ b/src/master.hpp @@ -27,6 +27,9 @@ #include "net/server.hpp" #include "cargo.hpp" +#include "request_manager.hpp" + +namespace cargo { class master_server : public network::server, public network::provider { @@ -35,7 +38,12 @@ public: std::filesystem::path rundir, std::optional pidfile = {}); + ~master_server(); + private: + void + mpi_listener_ult(); + void ping(const network::request& req); @@ -43,13 +51,19 @@ private: transfer_datasets(const network::request& req, const std::vector& sources, const std::vector& targets); -}; -namespace config { -struct settings; -} // namespace config + void + transfer_status(const network::request& req, std::uint64_t tid); + +private: + // Dedicated execution stream for the MPI listener ULT + thallium::managed m_mpi_listener_ess; + // ULT for the MPI listener + thallium::managed m_mpi_listener_ult; + // Request manager + request_manager m_request_manager; +}; -void -master(const config::settings& cfg); +} // namespace cargo #endif // CARGO_MASTER_HPP diff --git a/src/message.hpp b/src/message.hpp deleted file mode 100644 index 1e61fdd19a020aa4fcd1cd141b94950eac1df092..0000000000000000000000000000000000000000 --- a/src/message.hpp +++ /dev/null @@ -1,135 +0,0 @@ -/****************************************************************************** - * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain - * - * This software was partially supported by the EuroHPC-funded project ADMIRE - * (Project ID: 956748, https://www.admire-eurohpc.eu). - * - * This file is part of Cargo. - * - * Cargo is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Cargo is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Cargo. If not, see . - * - * SPDX-License-Identifier: GPL-3.0-or-later - *****************************************************************************/ - -#ifndef CARGO_MESSAGE_HPP -#define CARGO_MESSAGE_HPP - -#include -#include -#include -#include - -namespace cargo { - -enum transfer_type { parallel_read, parallel_write, sequential }; -enum class message_tags { transfer, status, shutdown }; - -class transfer_request_message { - - friend class boost::serialization::access; - -public: - transfer_request_message() = default; - - transfer_request_message(const std::filesystem::path& input_path, - const std::filesystem::path& output_path, - transfer_type type) - : m_input_path(input_path), m_output_path(output_path), m_type(type) {} - - std::filesystem::path - input_path() const { - return m_input_path; - } - - std::filesystem::path - output_path() const { - return m_output_path; - } - - transfer_type - type() const { - return m_type; - } - -private: - template - void - serialize(Archive& ar, const unsigned int version) { - (void) version; - - ar& m_input_path; - ar& m_output_path; - ar& m_type; - } - - std::string m_input_path; - std::string m_output_path; - transfer_type m_type; -}; - -class transfer_status_message { - - friend class boost::serialization::access; - -public: - transfer_status_message() = default; - - explicit transfer_status_message(std::uint64_t transfer_id) - : m_transfer_id(transfer_id) {} - - std::uint64_t - transfer_id() const { - return m_transfer_id; - } - -private: - template - void - serialize(Archive& ar, const unsigned int version) { - (void) version; - - ar& m_transfer_id; - } - - std::uint64_t m_transfer_id{}; -}; - -} // namespace cargo - -template <> -struct fmt::formatter - : formatter { - // parse is inherited from formatter. - template - auto - format(const cargo::transfer_request_message& r, FormatContext& ctx) const { - const auto str = fmt::format("{{input_path: {}, output_path: {}}}", - r.input_path(), r.output_path()); - return formatter::format(str, ctx); - } -}; - -template <> -struct fmt::formatter - : formatter { - // parse is inherited from formatter. - template - auto - format(const cargo::transfer_status_message& s, FormatContext& ctx) const { - const auto str = fmt::format("{{transfer_id: {}}}", s.transfer_id()); - return formatter::format(str, ctx); - } -}; - -#endif // CARGO_MESSAGE_HPP diff --git a/src/mpioxx.hpp b/src/mpioxx.hpp index 915479aff763315ff479777adc7c63c7dfbc88ca..35183220c61de077271ff8872245e41bdbee0185 100644 --- a/src/mpioxx.hpp +++ b/src/mpioxx.hpp @@ -30,11 +30,39 @@ #include #include #include +#include // very simple RAII wrappers for some MPI types + utility functions namespace mpioxx { +class io_error : public std::exception { + +public: + io_error(std::string_view fun, int ec) : m_fun(fun), m_error_code(ec) {} + + [[nodiscard]] std::uint32_t + error_code() const noexcept { + return m_error_code; + } + + [[nodiscard]] const char* + what() const noexcept override { + m_message.assign(boost::mpi::error_string(m_error_code)); + return m_message.c_str(); + } + + [[nodiscard]] std::string_view + where() const noexcept { + return m_fun; + } + +private: + mutable std::string m_message; + std::string_view m_fun; + int m_error_code; +}; + using offset = MPI_Offset; enum file_open_mode : int { @@ -86,6 +114,7 @@ operator^=(file_open_mode& a, file_open_mode b) { } class file { + public: explicit file(const MPI_File& file) : m_file(file) {} @@ -109,9 +138,7 @@ public: MPI_File_open(comm, filepath.c_str(), static_cast(mode), MPI_INFO_NULL, &result); ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_open() failed: {}", - boost::mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + throw io_error("MPI_File_open", ec); } return file{result}; @@ -120,20 +147,16 @@ public: void close() { if(const auto ec = MPI_File_close(&m_file); ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_close() failed: {}", - boost::mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + throw io_error("MPI_File_close", ec); } } - offset + [[nodiscard]] offset size() const { MPI_Offset result; if(const auto ec = MPI_File_get_size(m_file, &result); ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_get_size() failed: {}", - boost::mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + throw io_error("MPI_File_get_size", ec); } return result; } diff --git a/src/net/serialization.hpp b/src/net/serialization.hpp index 3939fd7948b081a3a7f3fc7fda51d324d47be139..1aef87d991f09d06b1f2c224f5eba60507dd07bd 100644 --- a/src/net/serialization.hpp +++ b/src/net/serialization.hpp @@ -34,6 +34,7 @@ #include #include #include +#include // Cereal does not serialize std::filesystem::path's by default #include diff --git a/src/net/server.cpp b/src/net/server.cpp index 1bca7a03f317861121735f4aa7ba5bc55a9eb3ae..ac9c297fb2dac4df70588b14adba4998989e53f5 100644 --- a/src/net/server.cpp +++ b/src/net/server.cpp @@ -379,6 +379,7 @@ server::teardown_and_exit() { void server::shutdown() { + m_shutting_down = true; m_network_engine.finalize(); } diff --git a/src/net/server.hpp b/src/net/server.hpp index 5d5e66c605c3f29952e5f6188bc1ea5c146e8e24..9d9855345f9bcbe9ba23754f256bff8af45013d1 100644 --- a/src/net/server.hpp +++ b/src/net/server.hpp @@ -110,6 +110,7 @@ private: protected: thallium::engine m_network_engine; + std::atomic m_shutting_down; private: signal_listener m_signal_listener; diff --git a/src/parallel_request.cpp b/src/parallel_request.cpp new file mode 100644 index 0000000000000000000000000000000000000000..15cbeabecbd86a87d7d1616883ef93bc47da19e9 --- /dev/null +++ b/src/parallel_request.cpp @@ -0,0 +1,81 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include "cargo.hpp" +#include "parallel_request.hpp" + +namespace cargo { + +parallel_request::parallel_request(std::uint64_t tid, std::size_t nfiles, + std::size_t nworkers) + : m_tid(tid), m_nfiles(nfiles), m_nworkers(nworkers) {} + +[[nodiscard]] std::uint64_t +parallel_request::tid() const { + return m_tid; +} + +[[nodiscard]] std::size_t +parallel_request::nfiles() const { + return m_nfiles; +} + +[[nodiscard]] std::size_t +parallel_request::nworkers() const { + return m_nworkers; +} + +request_status::request_status(part_status s) + : m_state(s.state()), m_error_code(s.error()) {} + +request_status::request_status(transfer_state s, std::optional ec) + : m_state(s), m_error_code(ec) {} + +transfer_state +request_status::state() const { + return m_state; +} + +std::optional +request_status::error() const { + return m_error_code; +} + +transfer_state +part_status::state() const { + return m_state; +} + +std::optional +part_status::error() const { + return m_error_code; +} + +void +part_status::update(transfer_state s, std::optional ec) noexcept { + m_state = s; + m_error_code = ec; +} + +} // namespace cargo \ No newline at end of file diff --git a/src/parallel_request.hpp b/src/parallel_request.hpp new file mode 100644 index 0000000000000000000000000000000000000000..d2e78d9a1fbe6a30620217d4a170e1ee925e9ad8 --- /dev/null +++ b/src/parallel_request.hpp @@ -0,0 +1,130 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_PARALLEL_REQUEST_HPP +#define CARGO_PARALLEL_REQUEST_HPP + +#include +#include +#include +#include + +namespace cargo { + +class dataset; + +class parallel_request { + +public: + parallel_request(std::uint64_t id, std::size_t nfiles, + std::size_t nworkers); + + [[nodiscard]] std::uint64_t + tid() const; + + [[nodiscard]] std::size_t + nfiles() const; + + [[nodiscard]] std::size_t + nworkers() const; + +private: + /** Unique identifier for the request */ + std::uint64_t m_tid; + /** Number of files to be processed by the request */ + std::size_t m_nfiles; + /** Number of workers to be used for the request */ + std::size_t m_nworkers; +}; + +/** + * The status of a single file part. + */ +class part_status { +public: + part_status() = default; + + [[nodiscard]] transfer_state + state() const; + + [[nodiscard]] std::optional + error() const; + + void + update(transfer_state s, std::optional ec) noexcept; + +private: + transfer_state m_state{transfer_state::pending}; + std::optional m_error_code{}; +}; + +class request_status { +public: + request_status() = default; + explicit request_status(transfer_state s, + std::optional ec = {}); + explicit request_status(part_status s); + + [[nodiscard]] transfer_state + state() const; + + [[nodiscard]] std::optional + error() const; + +private: + transfer_state m_state{transfer_state::pending}; + std::optional m_error_code{}; +}; + +} // namespace cargo + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::request_status& s, FormatContext& ctx) const { + + const auto state_name = [](auto&& s) { + switch(s.state()) { + case cargo::transfer_state::pending: + return "pending"; + case cargo::transfer_state::running: + return "running"; + case cargo::transfer_state::completed: + return "completed"; + case cargo::transfer_state::failed: + return "failed"; + default: + return "unknown"; + } + }; + + const auto str = fmt::format("{{state: {}, error_code: {}}}", + state_name(s), s.error()); + return formatter::format(str, ctx); + } +}; + +#endif // CARGO_PARALLEL_REQUEST_HPP diff --git a/src/posix_file/posix_file/file.hpp b/src/posix_file/posix_file/file.hpp index 6fa2a7edf172d85e1d588440f595e51a3675130e..f94bd0fc1b2e6be59c7d9e74ff5d2332381b4a3c 100644 --- a/src/posix_file/posix_file/file.hpp +++ b/src/posix_file/posix_file/file.hpp @@ -120,6 +120,35 @@ public: } }; +class io_error : public std::exception { + +public: + io_error(std::string_view fun, int ec) : m_fun(fun), m_error_code(ec) {} + + [[nodiscard]] std::uint32_t + error_code() const noexcept { + return m_error_code; + } + + [[nodiscard]] const char* + what() const noexcept override { + m_message.assign( + std::make_error_code(static_cast(m_error_code)) + .message()); + return m_message.c_str(); + } + + [[nodiscard]] std::string_view + where() const noexcept { + return m_fun; + } + +private: + mutable std::string m_message; + std::string_view m_fun; + int m_error_code; +}; + class file { public: @@ -149,34 +178,29 @@ public: return std::filesystem::remove(m_path); } - tl::expected - fallocate(int mode, offset offset, std::size_t len) const noexcept { + void + fallocate(int mode, offset offset, std::size_t len) const { if(!m_handle) { - return tl::make_unexpected( - std::error_code{EBADF, std::generic_category()}); + throw io_error("posix_file::file::fallocate", EBADF); } int ret = ::fallocate(m_handle.native(), mode, offset, static_cast(len)); if(ret == -1) { - return tl::make_unexpected( - std::error_code{errno, std::generic_category()}); + throw io_error("posix_file::file::fallocate", errno); } - - return {}; } template - tl::expected - pread(MemoryBuffer&& buf, offset offset, std::size_t size) const noexcept { + std::size_t + pread(MemoryBuffer&& buf, offset offset, std::size_t size) const { assert(buf.size() >= size); if(!m_handle) { - return tl::make_unexpected( - std::error_code{EBADF, std::generic_category()}); + throw io_error("posix_file::file::pread", EBADF); } std::size_t bytes_read = 0; @@ -199,8 +223,7 @@ public: } // Some other error condition, report - return tl::make_unexpected( - std::error_code{errno, std::generic_category()}); + throw io_error("posix_file::file::pread", errno); } bytes_read += n; @@ -211,14 +234,13 @@ public: } template - tl::expected - pwrite(MemoryBuffer&& buf, offset offset, std::size_t size) const noexcept { + std::size_t + pwrite(MemoryBuffer&& buf, offset offset, std::size_t size) const { assert(buf.size() >= size); if(!m_handle) { - return tl::make_unexpected( - std::error_code{EBADF, std::generic_category()}); + throw io_error("posix_file::file::pwrite", EBADF); } std::size_t bytes_written = 0; @@ -236,8 +258,7 @@ public: } // Some other error condition, report - return tl::make_unexpected( - std::error_code{errno, std::generic_category()}); + throw io_error("posix_file::file::pwrite", errno); } bytes_written += n; @@ -252,20 +273,19 @@ protected: file_handle m_handle; }; -static inline tl::expected +static inline file open(const std::filesystem::path& filepath, int flags, ::mode_t mode = 0) { int fd = ::open(filepath.c_str(), flags, mode); if(fd == -1) { - return tl::make_unexpected( - std::error_code{errno, std::generic_category()}); + throw io_error("posix_file::open", errno); } return file{filepath, fd}; } -static inline tl::expected +static inline file create(const std::filesystem::path& filepath, int flags, ::mode_t mode) { return open(filepath, O_CREAT | flags, mode); } diff --git a/src/posix_file/posix_file/types.hpp b/src/posix_file/posix_file/types.hpp index b6cf67019a9cc38e77deab8e33a255264d2d8cc5..126edcd575d64702b61450734355cdecbfccdec1 100644 --- a/src/posix_file/posix_file/types.hpp +++ b/src/posix_file/posix_file/types.hpp @@ -26,6 +26,8 @@ #ifndef POSIX_FILE_TYPES_HPP #define POSIX_FILE_TYPES_HPP +#include + namespace posix_file { using offset = std::size_t; diff --git a/src/proto/mpi/message.hpp b/src/proto/mpi/message.hpp new file mode 100644 index 0000000000000000000000000000000000000000..bc8c5a1a88ced96ca1020e619f86177354900b7d --- /dev/null +++ b/src/proto/mpi/message.hpp @@ -0,0 +1,200 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_PROTO_MPI_MESSAGE_HPP +#define CARGO_PROTO_MPI_MESSAGE_HPP + +#include +#include +#include +#include +#include +#include "cargo.hpp" +#include "boost_serialization_std_optional.hpp" + +namespace cargo { + +enum class tag : int { pread, pwrite, sequential, status, shutdown }; + +class transfer_message { + + friend class boost::serialization::access; + +public: + transfer_message() = default; + + transfer_message(std::uint64_t tid, std::uint32_t seqno, + std::string input_path, std::string output_path) + : m_tid(tid), m_seqno(seqno), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)) {} + + [[nodiscard]] std::uint64_t + tid() const { + return m_tid; + } + + [[nodiscard]] std::uint32_t + seqno() const { + return m_seqno; + } + + [[nodiscard]] const std::string& + input_path() const { + return m_input_path; + } + + [[nodiscard]] const std::string& + output_path() const { + return m_output_path; + } + +private: + template + void + serialize(Archive& ar, const unsigned int version) { + (void) version; + + ar & m_tid; + ar & m_seqno; + ar & m_input_path; + ar & m_output_path; + } + + std::uint64_t m_tid{}; + std::uint32_t m_seqno{}; + std::string m_input_path; + std::string m_output_path; +}; + +class status_message { + + friend class boost::serialization::access; + +public: + status_message() = default; + + status_message(std::uint64_t tid, std::uint32_t seqno, + cargo::transfer_state state, + std::optional error_code = std::nullopt) + : m_tid(tid), m_seqno(seqno), m_state(state), m_error_code(error_code) { + } + + [[nodiscard]] std::uint64_t + tid() const { + return m_tid; + } + + [[nodiscard]] std::uint32_t + seqno() const { + return m_seqno; + } + + [[nodiscard]] cargo::transfer_state + state() const { + return m_state; + } + + [[nodiscard]] std::optional + error_code() const { + return m_error_code; + } + +private: + template + void + serialize(Archive& ar, const unsigned int version) { + (void) version; + + ar & m_tid; + ar & m_seqno; + ar & m_state; + ar & m_error_code; + } + + std::uint64_t m_tid{}; + std::uint32_t m_seqno{}; + cargo::transfer_state m_state{}; + std::optional m_error_code{}; +}; + +class shutdown_message { + + friend class boost::serialization::access; + +public: + shutdown_message() = default; + + template + void + serialize(Archive& ar, const unsigned int version) { + (void) ar; + (void) version; + } +}; + +} // namespace cargo + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::transfer_message& r, FormatContext& ctx) const { + const auto str = fmt::format( + "{{tid: {}, seqno: {}, input_path: {}, output_path: {}}}", + r.tid(), r.seqno(), r.input_path(), r.output_path()); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::status_message& s, FormatContext& ctx) const { + const auto str = + s.error_code() + ? fmt::format("{{tid: {}, seqno: {}, state: {}, " + "error_code: {}}}", + s.tid(), s.seqno(), s.state(), + *s.error_code()) + : fmt::format("{{tid: {}, seqno: {}, state: {}}}", + s.tid(), s.seqno(), s.state()); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const cargo::shutdown_message& s, FormatContext& ctx) const { + (void) s; + return formatter::format("{{shutdown}}", ctx); + } +}; + +#endif // CARGO_PROTO_MPI_MESSAGE_HPP diff --git a/src/proto/rpc/response.hpp b/src/proto/rpc/response.hpp index 2cc833b1ce5fd156777e0279902d866da1105bb8..5aa487c3fd0d5b4eed8e94a6be1954087837eb35 100644 --- a/src/proto/rpc/response.hpp +++ b/src/proto/rpc/response.hpp @@ -27,8 +27,9 @@ #include #include +#include -namespace proto { +namespace cargo::proto { template class generic_response { @@ -36,9 +37,12 @@ class generic_response { public: constexpr generic_response() noexcept = default; - constexpr generic_response(std::uint64_t op_id, Error&& ec) noexcept + constexpr generic_response(std::uint64_t op_id, const Error& ec) noexcept : m_op_id(op_id), m_error_code(ec) {} + constexpr generic_response(std::uint64_t op_id, Error&& ec) noexcept + : m_op_id(op_id), m_error_code(std::move(ec)) {} + [[nodiscard]] constexpr std::uint64_t op_id() const noexcept { return m_op_id; @@ -61,16 +65,21 @@ private: Error m_error_code{}; }; -template +template class response_with_value : public generic_response { public: constexpr response_with_value() noexcept = default; - constexpr response_with_value(std::uint64_t op_id, Error&& ec, + constexpr response_with_value(std::uint64_t op_id, const Error& ec, std::optional value) noexcept : generic_response(op_id, ec), m_value(std::move(value)) {} + constexpr response_with_value(std::uint64_t op_id, Error&& ec, + std::optional value) noexcept + : generic_response(op_id, std::move(ec)), + m_value(std::move(value)) {} + constexpr auto value() const noexcept { return m_value.value(); @@ -92,8 +101,13 @@ private: }; template -using response_with_id = response_with_value; +using response_with_id = response_with_value; + + +template +using status_response = + response_with_value>, Error>; -} // namespace proto +} // namespace cargo::proto #endif // CARGO_PROTO_RPC_RESPONSE_HPP diff --git a/src/request_manager.cpp b/src/request_manager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f39dea9237db15086596fa03c7bd5cb9da6a6415 --- /dev/null +++ b/src/request_manager.cpp @@ -0,0 +1,116 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include "cargo.hpp" +#include "cargo/error.hpp" +#include "parallel_request.hpp" +#include "request_manager.hpp" + +#include +#include "logger/logger.hpp" + +namespace {} // namespace + +namespace cargo { + +tl::expected +request_manager::create(std::size_t nfiles, std::size_t nworkers) { + + std::uint64_t tid = current_tid++; + abt::unique_lock lock(m_mutex); + + if(const auto it = m_requests.find(tid); it == m_requests.end()) { + + const auto& [it_req, inserted] = m_requests.emplace( + tid, std::vector{ + nfiles, std::vector{nworkers}}); + + if(!inserted) { + LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); + return tl::make_unexpected(error_code::snafu); + } + } + + return parallel_request{tid, nfiles, nworkers}; +} + +error_code +request_manager::update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, + transfer_state s, std::optional ec) { + + abt::unique_lock lock(m_mutex); + + if(const auto it = m_requests.find(tid); it != m_requests.end()) { + assert(seqno < it->second.size()); + assert(wid < it->second[seqno].size()); + it->second[seqno][wid].update(s, ec); + return error_code::success; + } + + LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); + return error_code::no_such_transfer; +} + +tl::expected +request_manager::lookup(std::uint64_t tid) { + + abt::shared_lock lock(m_mutex); + + if(const auto it = m_requests.find(tid); it != m_requests.end()) { + + const auto& file_statuses = it->second; + + for(const auto& fs : file_statuses) { + for(const auto& ps : fs) { + + if(ps.state() == transfer_state::completed) { + continue; + } + + return request_status{ps}; + } + } + + return request_status{transfer_state::completed}; + } + + LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); + return tl::make_unexpected(error_code::no_such_transfer); +} + +error_code +request_manager::remove(std::uint64_t tid) { + + abt::unique_lock lock(m_mutex); + + if(const auto it = m_requests.find(tid); it != m_requests.end()) { + m_requests.erase(it); + return error_code::success; + } + + LOGGER_ERROR("{}: Request {} not found", __FUNCTION__, tid); + return error_code::no_such_transfer; +} + +} // namespace cargo \ No newline at end of file diff --git a/src/request_manager.hpp b/src/request_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..ba26cee5765708b7dceebdee2cf32e757eeb4377 --- /dev/null +++ b/src/request_manager.hpp @@ -0,0 +1,79 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_REQUEST_MANAGER_HPP +#define CARGO_REQUEST_MANAGER_HPP + +#include +#include +#include "parallel_request.hpp" +#include "shared_mutex.hpp" + +namespace cargo { + +class dataset; + +/** + * A manager for transfer requests. + * + * A single transfer requests may involve `N` files and each file may + * be served by `W` MPI workers. Thus, the manager keeps a map of request IDs + * to a vector of `N` `file_status`es, where each element is in turn also + * a vector with `W` `part_status` values, one for each worker in charge of + * processing a particular file region. + * + * For example: + * request 42 -> file_status[0] -> worker [0] -> pending + * worker [1] -> pending + * -> file_status[1] -> worker [0] -> complete + * worker [1] -> complete + * worker [2] -> running + */ +class request_manager { + + using file_status = std::vector; + +public: + tl::expected + create(std::size_t nfiles, std::size_t nworkers); + + error_code + update(std::uint64_t tid, std::uint32_t seqno, std::size_t wid, + transfer_state s, std::optional ec = std::nullopt); + + tl::expected + lookup(std::uint64_t tid); + + error_code + remove(std::uint64_t tid); + +private: + std::atomic current_tid = 0; + mutable abt::shared_mutex m_mutex; + std::unordered_map> m_requests; +}; + +} // namespace cargo + +#endif // CARGO_REQUEST_MANAGER_HPP diff --git a/src/shared_mutex.hpp b/src/shared_mutex.hpp new file mode 100644 index 0000000000000000000000000000000000000000..bbe4f78bb53fdbafe6315606c1cd303c690be59d --- /dev/null +++ b/src/shared_mutex.hpp @@ -0,0 +1,339 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + + +#include +#include +#include +#include + +#ifndef CARGO_ABT_SHARED_MUTEX_HPP +#define CARGO_ABT_SHARED_MUTEX_HPP + +namespace cargo::abt { + +#define ABT_RWLOCK_ASSERT(__expr) \ + { \ + if(const auto ret = (__expr); ret != ABT_SUCCESS) { \ + size_t n; \ + ABT_error_get_str(ret, NULL, &n); \ + std::vector tmp; \ + tmp.reserve(n + 1); \ + ABT_error_get_str(ret, tmp.data(), &n); \ + \ + throw std::runtime_error(fmt::format("{} failed: {} in {}:{}", \ + __FUNCTION__, tmp.data(), \ + ret, __FILE__, __LINE__)); \ + } \ + } + +class shared_mutex { +public: + explicit shared_mutex() { + ABT_RWLOCK_ASSERT(ABT_rwlock_create(&m_lock)); + } + + ~shared_mutex() noexcept { + ABT_rwlock_free(&m_lock); + } + + // copy constructor and copy assignment operator are disabled + shared_mutex(const shared_mutex&) = delete; + + shared_mutex(shared_mutex&& rhs) noexcept { + m_lock = rhs.m_lock; + rhs.m_lock = ABT_RWLOCK_NULL; + } + + shared_mutex& + operator=(const shared_mutex&) = delete; + + shared_mutex& + operator=(shared_mutex&& other) noexcept { + + if(this == &other) { + return *this; + } + + [[maybe_unused]] const auto ret = ABT_rwlock_free(&m_lock); + assert(ret == ABT_SUCCESS); + m_lock = other.m_lock; + other.m_lock = ABT_RWLOCK_NULL; + + return *this; + } + + + // Exclusive ownership + + void + lock() { + ABT_RWLOCK_ASSERT(ABT_rwlock_wrlock(m_lock)); + } + + void + unlock() { + ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock)); + } + + // Shared ownership + + void + lock_shared() { + ABT_RWLOCK_ASSERT(ABT_rwlock_rdlock(m_lock)); + } + + void + unlock_shared() { + ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock)); + } + +private: + ABT_rwlock m_lock = ABT_RWLOCK_NULL; +}; + +#undef ABT_RWLOCK_ASSERT + + +/// unique_lock +template +class unique_lock { +public: + typedef Mutex mutex_type; + + unique_lock() noexcept : m_device(0), m_owns(false) {} + + explicit unique_lock(mutex_type& m) + : m_device(std::addressof(m)), m_owns(false) { + lock(); + m_owns = true; + } + + ~unique_lock() { + if(m_owns) + unlock(); + } + + unique_lock(const unique_lock&) = delete; + unique_lock& + operator=(const unique_lock&) = delete; + + unique_lock(unique_lock&& u) noexcept + : m_device(u.m_device), m_owns(u.m_owns) { + u.m_device = 0; + u.m_owns = false; + } + + unique_lock& + operator=(unique_lock&& u) noexcept { + if(m_owns) + unlock(); + + unique_lock(std::move(u)).swap(*this); + + u.m_device = 0; + u.m_owns = false; + + return *this; + } + + void + lock() { + if(!m_device) { + throw std::system_error(int(std::errc::operation_not_permitted), + std::system_category()); + } else if(m_owns) { + throw std::system_error( + int(std::errc::resource_deadlock_would_occur), + std::system_category()); + } else { + m_device->lock(); + m_owns = true; + } + } + + void + unlock() { + if(!m_owns) { + throw std::system_error(int(std::errc::operation_not_permitted), + std::system_category()); + } else if(m_device) { + m_device->unlock(); + m_owns = false; + } + } + + void + swap(unique_lock& u) noexcept { + std::swap(m_device, u.m_device); + std::swap(m_owns, u.m_owns); + } + + mutex_type* + release() noexcept { + mutex_type* ret = m_device; + m_device = 0; + m_owns = false; + return ret; + } + + bool + owns_lock() const noexcept { + return m_owns; + } + + explicit operator bool() const noexcept { + return owns_lock(); + } + + mutex_type* + mutex() const noexcept { + return m_device; + } + +private: + mutex_type* m_device; + bool m_owns; +}; + +/// Swap overload for unique_lock objects. +/// @relates unique_lock +template +inline void +swap(unique_lock& x, unique_lock& y) noexcept { + x.swap(y); +} + +/// shared_lock +template +class shared_lock { +public: + typedef Mutex mutex_type; + + // Shared locking + + shared_lock() noexcept : m_device(nullptr), m_owns(false) {} + + explicit shared_lock(mutex_type& m) + : m_device(std::addressof(m)), m_owns(true) { + m.lock_shared(); + } + + ~shared_lock() { + if(m_owns) { + m_device->unlock_shared(); + } + } + + shared_lock(shared_lock const&) = delete; + shared_lock& + operator=(shared_lock const&) = delete; + + shared_lock(shared_lock&& sl) noexcept : shared_lock() { + swap(sl); + } + + shared_lock& + operator=(shared_lock&& sl) noexcept { + shared_lock(std::move(sl)).swap(*this); + return *this; + } + + void + lock() { + lockable(); + m_device->lock_shared(); + m_owns = true; + } + + void + unlock() { + if(!m_owns) { + throw std::system_error( + int(std::errc::resource_deadlock_would_occur), + std::system_category()); + } + m_device->unlock_shared(); + m_owns = false; + } + + // Setters + + void + swap(shared_lock& u) noexcept { + std::swap(m_device, u.m_device); + std::swap(m_owns, u.m_owns); + } + + mutex_type* + release() noexcept { + m_owns = false; + return std::exchange(m_device, nullptr); + } + + // Getters + + bool + owns_lock() const noexcept { + return m_owns; + } + + explicit operator bool() const noexcept { + return m_owns; + } + + mutex_type* + mutex() const noexcept { + return m_device; + } + +private: + void + lockable() const { + if(m_device == nullptr) { + throw std::system_error(int(std::errc::operation_not_permitted), + std::system_category()); + } + if(m_owns) { + throw std::system_error( + int(std::errc::resource_deadlock_would_occur), + std::system_category()); + } + } + + mutex_type* m_device; + bool m_owns; +}; + +/// Swap specialization for shared_lock +/// @relates shared_mutex +template +void +swap(shared_lock& x, shared_lock& y) noexcept { + x.swap(y); +} + +} // namespace cargo::abt + +#endif // CARGO_ABT_SHARED_MUTEX_HPP diff --git a/src/worker.cpp b/src/worker.cpp deleted file mode 100644 index c4aebdacac385deeee431f8967aa49f8f15f1621..0000000000000000000000000000000000000000 --- a/src/worker.cpp +++ /dev/null @@ -1,365 +0,0 @@ -/****************************************************************************** - * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain - * - * This software was partially supported by the EuroHPC-funded project ADMIRE - * (Project ID: 956748, https://www.admire-eurohpc.eu). - * - * This file is part of Cargo. - * - * Cargo is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * Cargo is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with Cargo. If not, see . - * - * SPDX-License-Identifier: GPL-3.0-or-later - *****************************************************************************/ - -#include -#include -#include -#include -#include -#include -#include -#include -#include "message.hpp" -#include "mpioxx.hpp" - -namespace mpi = boost::mpi; -using namespace std::chrono_literals; - - -namespace { - -// boost MPI doesn't have a communicator constructor that uses -// MPI_Comm_create_group() -mpi::communicator -make_communicator(const mpi::communicator& comm, const mpi::group& group, - int tag) { - MPI_Comm newcomm; - if(const auto ec = MPI_Comm_create_group(comm, group, tag, &newcomm); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_Comm_create_group() failed: {}", - boost::mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - return mpi::communicator{newcomm, boost::mpi::comm_take_ownership}; -} - - -} // namespace - -using memory_buffer = std::vector; -using buffer_region = std::span; - -void -mpio_read(const mpi::communicator& workers, - const std::filesystem::path& input_path, - const std::filesystem::path& output_path) { - - using posix_file::views::all_of; - using posix_file::views::as_blocks; - using posix_file::views::strided; - - const auto input_file = mpioxx::file::open(workers, input_path, - mpioxx::file_open_mode::rdonly); - - mpioxx::offset file_size = input_file.size(); - std::size_t block_size = 512u; - - // create block type - MPI_Datatype block_type; - MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, &block_type); - MPI_Type_commit(&block_type); - - // compute the number of blocks in the file - int total_blocks = static_cast(file_size / block_size); - - if(file_size % block_size != 0) { - ++total_blocks; - } - - const auto workers_size = workers.size(); - const auto workers_rank = workers.rank(); - - // create file type - MPI_Datatype file_type; - /* - * count: number of blocks in the type - * blocklen: number of elements in each block - * stride: number of elements between start of each block - */ - MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, - /* stride: */ workers_size, /* oldtype: */ block_type, - &file_type); - MPI_Type_commit(&file_type); - - MPI_Offset disp = workers_rank * block_size; - MPI_Datatype etype = block_type; - MPI_Datatype filetype = file_type; - - if(const auto ec = MPI_File_set_view(input_file, disp, etype, filetype, - "native", MPI_INFO_NULL); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_set_view() failed: {}", mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - - // find how many blocks this rank is responsible for - std::size_t blocks_per_rank = total_blocks / workers_size; - - if(int64_t n = total_blocks % workers_size; n != 0 && workers_rank < n) { - ++blocks_per_rank; - } - - // step 1. acquire buffers - memory_buffer buffer; - buffer.resize(blocks_per_rank * block_size); - - std::vector buffer_regions; - buffer_regions.reserve(blocks_per_rank); - - for(std::size_t i = 0; i < blocks_per_rank; ++i) { - buffer_regions.emplace_back(buffer.data() + i * block_size, block_size); - } - - MPI_Datatype datatype = block_type; - - // step2. parallel read data into buffers - if(const auto ec = MPI_File_read_all(input_file, buffer.data(), - static_cast(blocks_per_rank), - datatype, MPI_STATUS_IGNORE); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_read_all() failed: {}", mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - - // step3. POSIX write data - if(const auto rv = - posix_file::create(output_path, O_WRONLY, S_IRUSR | S_IWUSR); - !rv) { - LOGGER_ERROR("posix_file::create({}) failed: {}", output_path, - rv.error().message()); - } else { - - const auto& output_file = rv.value(); - - if(const auto ret = output_file.fallocate(0, 0, file_size); !rv) { - LOGGER_ERROR("posix_file::fallocate({}, {}, {}) failed: {}", 0, 0, - file_size, ret.error().message()); - // TODO : gracefully fail - } - - int index = 0; - for(const auto& file_range : - all_of(posix_file::file{input_path}) | as_blocks(block_size) | - strided(workers_size, workers_rank)) { - assert(buffer_regions[index].size() >= file_range.size()); - const auto ret = - output_file.pwrite(buffer_regions[index], - file_range.offset(), file_range.size()); - - if(!ret) { - LOGGER_ERROR("pwrite() failed: {}", ret.error().message()); - } - - ++index; - } - } -} - -void -mpio_write(const mpi::communicator& workers, - const std::filesystem::path& input_path, - const std::filesystem::path& output_path) { - - using posix_file::views::all_of; - using posix_file::views::as_blocks; - using posix_file::views::strided; - - const auto workers_size = workers.size(); - const auto workers_rank = workers.rank(); - std::size_t block_size = 512u; - std::size_t file_size = std::filesystem::file_size(input_path); - - // compute the number of blocks in the file - int total_blocks = static_cast(file_size / block_size); - - if(file_size % block_size != 0) { - ++total_blocks; - } - - // find how many blocks this rank is responsible for - std::size_t blocks_per_rank = total_blocks / workers_size; - - if(int64_t n = total_blocks % workers_size; n != 0 && workers_rank < n) { - ++blocks_per_rank; - } - - // step 1. acquire buffers - memory_buffer buffer; - buffer.resize(blocks_per_rank * block_size); - - std::vector buffer_regions; - buffer_regions.reserve(blocks_per_rank); - - for(std::size_t i = 0; i < blocks_per_rank; ++i) { - buffer_regions.emplace_back(buffer.data() + i * block_size, block_size); - } - - const auto rv = posix_file::open(input_path, O_RDONLY); - - if(!rv) { - LOGGER_ERROR("posix_file::open({}) failed: {} ", input_path, - rv.error().message()); - // TODO : gracefully fail - } - - int index = 0; - std::size_t bytes_per_rank = 0; - - for(const auto& input_file = rv.value(); - const auto& file_range : all_of(input_file) | as_blocks(block_size) | - strided(workers_size, workers_rank)) { - - assert(buffer_regions[index].size() >= file_range.size()); - const auto ret = input_file.pread( - buffer_regions[index], file_range.offset(), file_range.size()); - - if(!ret) { - LOGGER_ERROR("pread() failed: {}", ret.error().message()); - } - - LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", - fmt::join(buffer_regions[index].begin(), - buffer_regions[index].begin() + 10, ""), - fmt::join(buffer_regions[index].end() - 10, - buffer_regions[index].end(), "")); - - bytes_per_rank += ret.value(); - ++index; - } - - // step 2. write buffer data in parallel to the PFS - const auto output_file = mpioxx::file::open( - workers, output_path, - mpioxx::file_open_mode::create | mpioxx::file_open_mode::wronly); - - // create block type - MPI_Datatype block_type; - MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, &block_type); - MPI_Type_commit(&block_type); - - // create file type - MPI_Datatype file_type; - - /* - * count: number of blocks in the type - * blocklen: number of `oldtype` elements in each block - * stride: number of `oldtype` elements between start of each block - */ - MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, - /* stride: */ workers_size, /* oldtype: */ block_type, - &file_type); - MPI_Type_commit(&file_type); - - if(const auto ec = MPI_File_set_view(output_file, - /* disp: */ workers_rank * block_size, - /* elementary_type: */ block_type, - file_type, "native", MPI_INFO_NULL); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_set_view() failed: {}", mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } - - // step 3. parallel write data from buffers - if(const auto ec = MPI_File_write_all(output_file, buffer.data(), - static_cast(bytes_per_rank), - MPI_BYTE, MPI_STATUS_IGNORE); - ec != MPI_SUCCESS) { - LOGGER_ERROR("MPI_File_write_all() failed: {}", mpi::error_string(ec)); - MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); - } -} - -void -sequential_transfer(const std::filesystem::path& input_path, - const std::filesystem::path& output_path) { - (void) input_path; - (void) output_path; - - LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); -} - -void -worker() { - - // Create a separate communicator only for worker processes - const mpi::communicator world; - const auto ranks_to_exclude = std::array{0}; - const auto workers = - ::make_communicator(world, - world.group().exclude(ranks_to_exclude.begin(), - ranks_to_exclude.end()), - 0); - - LOGGER_INIT(fmt::format("worker_{:03}", world.rank()), - logger::console_color); - - // Initialization finished - LOGGER_INFO("Staging process initialized (world_rank {}, workers_rank: {})", - world.rank(), workers.rank()); - - bool done = false; - - while(!done) { - - auto msg = world.iprobe(); - - if(!msg) { - // FIXME: sleep time should be configurable - std::this_thread::sleep_for(150ms); - continue; - } - - switch(static_cast(msg->tag())) { - case cargo::message_tags::transfer: { - cargo::transfer_request_message m; - world.recv(mpi::any_source, msg->tag(), m); - LOGGER_DEBUG("Transfer request received!: {}", m); - - switch(m.type()) { - case cargo::parallel_read: - ::mpio_read(workers, m.input_path(), m.output_path()); - break; - case cargo::parallel_write: - ::mpio_write(workers, m.input_path(), m.output_path()); - break; - case cargo::sequential: - ::sequential_transfer(m.input_path(), m.output_path()); - break; - } - break; - } - - case cargo::message_tags::status: { - cargo::transfer_status_message m; - world.recv(mpi::any_source, msg->tag(), m); - LOGGER_DEBUG("Transfer status query received!: {}", m); - break; - } - - case cargo::message_tags::shutdown: - done = true; - break; - } - } -} diff --git a/src/worker/memory.hpp b/src/worker/memory.hpp new file mode 100644 index 0000000000000000000000000000000000000000..2d424ac73ef64d75888e6b42cf81594fa0fb0f63 --- /dev/null +++ b/src/worker/memory.hpp @@ -0,0 +1,38 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_MEMORY_HPP +#define CARGO_WORKER_MEMORY_HPP + +#include +#include + +namespace cargo { + +using memory_buffer = std::vector; +using buffer_region = std::span; + +} // namespace cargo + +#endif // CARGO_WORKER_MEMORY_HPP diff --git a/src/worker/mpio_read.cpp b/src/worker/mpio_read.cpp new file mode 100644 index 0000000000000000000000000000000000000000..dedd28428a80e19e55ec8c116a4430704d0121b1 --- /dev/null +++ b/src/worker/mpio_read.cpp @@ -0,0 +1,154 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "mpio_read.hpp" +#include "mpioxx.hpp" +#include "memory.hpp" + +namespace cargo { + +mpio_read::mpio_read(mpi::communicator workers, + std::filesystem::path input_path, + std::filesystem::path output_path) + : m_workers(std::move(workers)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)) {} + +cargo::error_code +mpio_read::operator()() const { + + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + try { + const auto input_file = mpioxx::file::open( + m_workers, m_input_path, mpioxx::file_open_mode::rdonly); + + mpioxx::offset file_size = input_file.size(); + std::size_t block_size = 512u; + + // create block type + MPI_Datatype block_type; + MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, + &block_type); + MPI_Type_commit(&block_type); + + // compute the number of blocks in the file + int total_blocks = static_cast(file_size / block_size); + + if(file_size % block_size != 0) { + ++total_blocks; + } + + const auto workers_size = m_workers.size(); + const auto workers_rank = m_workers.rank(); + + // create file type + MPI_Datatype file_type; + /* + * count: number of blocks in the type + * blocklen: number of elements in each block + * stride: number of elements between start of each block + */ + MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, + /* stride: */ workers_size, /* oldtype: */ block_type, + &file_type); + MPI_Type_commit(&file_type); + + MPI_Offset disp = workers_rank * block_size; + MPI_Datatype etype = block_type; + MPI_Datatype filetype = file_type; + + if(const auto ec = MPI_File_set_view(input_file, disp, etype, filetype, + "native", MPI_INFO_NULL); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_File_set_view() failed: {}", + mpi::error_string(ec)); + return make_mpi_error(ec); + } + + // find how many blocks this rank is responsible for + std::size_t blocks_per_rank = total_blocks / workers_size; + + if(int64_t n = total_blocks % workers_size; + n != 0 && workers_rank < n) { + ++blocks_per_rank; + } + + // step 1. acquire buffers + memory_buffer buffer; + buffer.resize(blocks_per_rank * block_size); + + std::vector buffer_regions; + buffer_regions.reserve(blocks_per_rank); + + for(std::size_t i = 0; i < blocks_per_rank; ++i) { + buffer_regions.emplace_back(buffer.data() + i * block_size, + block_size); + } + + MPI_Datatype datatype = block_type; + + // step2. parallel read data into buffers + if(const auto ec = MPI_File_read_all(input_file, buffer.data(), + static_cast(blocks_per_rank), + datatype, MPI_STATUS_IGNORE); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_File_read_all() failed: {}", + mpi::error_string(ec)); + return make_mpi_error(ec); + } + + // step3. POSIX write data + const auto output_file = + posix_file::create(m_output_path, O_WRONLY, S_IRUSR | S_IWUSR); + + output_file.fallocate(0, 0, file_size); + + int index = 0; + for(const auto& file_range : + all_of(posix_file::file{m_input_path}) | as_blocks(block_size) | + strided(workers_size, workers_rank)) { + assert(buffer_regions[index].size() >= file_range.size()); + output_file.pwrite(buffer_regions[index], file_range.offset(), + file_range.size()); + + ++index; + } + } catch(const mpioxx::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + return make_mpi_error(e.error_code()); + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + return make_system_error(e.error_code()); + } catch(const std::exception& e) { + std::cerr << e.what() << '\n'; + } + + return error_code::success; +} + +} // namespace cargo \ No newline at end of file diff --git a/src/worker/mpio_read.hpp b/src/worker/mpio_read.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5ce311dfb1f7efae78b8889acbf163e06505887f --- /dev/null +++ b/src/worker/mpio_read.hpp @@ -0,0 +1,51 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_MPIO_READ_HPP +#define CARGO_WORKER_MPIO_READ_HPP + +#include "ops.hpp" + +namespace mpi = boost::mpi; + +namespace cargo { + +class mpio_read : public operation { + +public: + mpio_read(mpi::communicator workers, std::filesystem::path input_path, + std::filesystem::path output_path); + + cargo::error_code + operator()() const final; + +private: + mpi::communicator m_workers; + std::filesystem::path m_input_path; + std::filesystem::path m_output_path; +}; + +} // namespace cargo + +#endif // CARGO_WORKER_MPIO_READ_HPP diff --git a/src/worker/mpio_write.cpp b/src/worker/mpio_write.cpp new file mode 100644 index 0000000000000000000000000000000000000000..26cb8125bad24ae9abc8306ea3d0b6843d72c1ab --- /dev/null +++ b/src/worker/mpio_write.cpp @@ -0,0 +1,156 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "mpio_write.hpp" +#include "mpioxx.hpp" +#include "memory.hpp" + +namespace cargo { + +cargo::error_code +mpio_write::operator()() const { + + using posix_file::views::all_of; + using posix_file::views::as_blocks; + using posix_file::views::strided; + + try { + + const auto workers_size = m_workers.size(); + const auto workers_rank = m_workers.rank(); + std::size_t block_size = 512u; + std::size_t file_size = std::filesystem::file_size(m_input_path); + + // compute the number of blocks in the file + int total_blocks = static_cast(file_size / block_size); + + if(file_size % block_size != 0) { + ++total_blocks; + } + + // find how many blocks this rank is responsible for + std::size_t blocks_per_rank = total_blocks / workers_size; + + if(int64_t n = total_blocks % workers_size; + n != 0 && workers_rank < n) { + ++blocks_per_rank; + } + + // step 1. acquire buffers + memory_buffer buffer; + buffer.resize(blocks_per_rank * block_size); + + std::vector buffer_regions; + buffer_regions.reserve(blocks_per_rank); + + for(std::size_t i = 0; i < blocks_per_rank; ++i) { + buffer_regions.emplace_back(buffer.data() + i * block_size, + block_size); + } + + const auto input_file = posix_file::open(m_input_path, O_RDONLY); + + int index = 0; + std::size_t bytes_per_rank = 0; + + for(const auto& file_range : + all_of(input_file) | as_blocks(block_size) | + strided(workers_size, workers_rank)) { + + assert(buffer_regions[index].size() >= file_range.size()); + const std::size_t n = + input_file.pread(buffer_regions[index], file_range.offset(), + file_range.size()); + + LOGGER_DEBUG("Buffer contents: [\"{}\" ... \"{}\"]", + fmt::join(buffer_regions[index].begin(), + buffer_regions[index].begin() + 10, ""), + fmt::join(buffer_regions[index].end() - 10, + buffer_regions[index].end(), "")); + + bytes_per_rank += n; + ++index; + } + + // step 2. write buffer data in parallel to the PFS + const auto output_file = + mpioxx::file::open(m_workers, m_output_path, + mpioxx::file_open_mode::create | + mpioxx::file_open_mode::wronly); + + // create block type + MPI_Datatype block_type; + MPI_Type_contiguous(static_cast(block_size), MPI_BYTE, + &block_type); + MPI_Type_commit(&block_type); + + // create file type + MPI_Datatype file_type; + + /* + * count: number of blocks in the type + * blocklen: number of `oldtype` elements in each block + * stride: number of `oldtype` elements between start of each block + */ + MPI_Type_vector(/* count: */ total_blocks, /* blocklength: */ 1, + /* stride: */ workers_size, /* oldtype: */ block_type, + &file_type); + MPI_Type_commit(&file_type); + + if(const auto ec = + MPI_File_set_view(output_file, + /* disp: */ workers_rank * block_size, + /* elementary_type: */ block_type, + file_type, "native", MPI_INFO_NULL); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_File_set_view() failed: {}", + mpi::error_string(ec)); + return make_mpi_error(ec); + } + + // step 3. parallel write data from buffers + if(const auto ec = MPI_File_write_all(output_file, buffer.data(), + static_cast(bytes_per_rank), + MPI_BYTE, MPI_STATUS_IGNORE); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_File_write_all() failed: {}", + mpi::error_string(ec)); + return make_mpi_error(ec); + } + } catch(const mpioxx::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + return make_mpi_error(e.error_code()); + } catch(const posix_file::io_error& e) { + LOGGER_ERROR("{}() failed: {}", e.where(), e.what()); + return make_system_error(e.error_code()); + } catch(const std::exception& e) { + std::cerr << e.what() << '\n'; + } + + return error_code::success; +} + +} // namespace cargo diff --git a/src/worker/mpio_write.hpp b/src/worker/mpio_write.hpp new file mode 100644 index 0000000000000000000000000000000000000000..69b2ced46e6c3770e599780ee100919ac41e4063 --- /dev/null +++ b/src/worker/mpio_write.hpp @@ -0,0 +1,53 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_MPIO_WRITE_HPP +#define CARGO_WORKER_MPIO_WRITE_HPP + +#include "ops.hpp" + +namespace mpi = boost::mpi; + +namespace cargo { + +class mpio_write : public operation { + +public: + mpio_write(mpi::communicator workers, std::filesystem::path input_path, + std::filesystem::path output_path) + : m_workers(std::move(workers)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)) {} + + cargo::error_code + operator()() const final; + +private: + mpi::communicator m_workers; + std::filesystem::path m_input_path; + std::filesystem::path m_output_path; +}; + +} // namespace cargo + +#endif // CARGO_WORKER_MPIO_WRITE_HPP diff --git a/src/worker/ops.cpp b/src/worker/ops.cpp new file mode 100644 index 0000000000000000000000000000000000000000..17c16c9bfe4627b04777790d1bba7f69fca6be61 --- /dev/null +++ b/src/worker/ops.cpp @@ -0,0 +1,57 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include "ops.hpp" +#include "mpio_read.hpp" +#include "mpio_write.hpp" +#include "sequential.hpp" + +namespace mpi = boost::mpi; + +namespace cargo { + +std::unique_ptr +operation::make_operation(cargo::tag t, mpi::communicator workers, + std::filesystem::path input_path, + std::filesystem::path output_path) { + using cargo::tag; + switch(t) { + case tag::pread: + return std::make_unique(std::move(workers), + std::move(input_path), + std::move(output_path)); + case tag::pwrite: + return std::make_unique(std::move(workers), + std::move(input_path), + std::move(output_path)); + case tag::sequential: + return std::make_unique(std::move(workers), + std::move(input_path), + std::move(output_path)); + default: + return {}; + } +} + +} // namespace cargo diff --git a/src/worker/ops.hpp b/src/worker/ops.hpp new file mode 100644 index 0000000000000000000000000000000000000000..b9a20cfada8a627cd08e87d5bf796e757c9b09e0 --- /dev/null +++ b/src/worker/ops.hpp @@ -0,0 +1,55 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_OPS_HPP +#define CARGO_WORKER_OPS_HPP + +#include +#include +#include +#include "proto/mpi/message.hpp" +#include "cargo.hpp" + +namespace cargo { + +/** + * Interface for transfer operations + */ +class operation { + +public: + static std::unique_ptr + make_operation(cargo::tag t, boost::mpi::communicator workers, + std::filesystem::path input_path, + std::filesystem::path output_path); + + virtual ~operation() = default; + + virtual cargo::error_code + operator()() const = 0; +}; + +} // namespace cargo + +#endif // CARGO_WORKER_OPS_HPP diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a1cf8ed9f60707ba676981d84520acfae8810992 --- /dev/null +++ b/src/worker/sequential.cpp @@ -0,0 +1,37 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include "sequential.hpp" + + +namespace cargo { + +cargo::error_code +seq_operation::operator()() const { + LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); + return cargo::error_code::not_implemented; +} + +} // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp new file mode 100644 index 0000000000000000000000000000000000000000..82fa6196cecd80ba4f600e43b3306b34bb194545 --- /dev/null +++ b/src/worker/sequential.hpp @@ -0,0 +1,53 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef CARGO_WORKER_SEQUENTIAL_HPP +#define CARGO_WORKER_SEQUENTIAL_HPP + +#include "ops.hpp" + +namespace mpi = boost::mpi; + +namespace cargo { + +class seq_operation : public operation { + +public: + seq_operation(mpi::communicator comm, std::filesystem::path input_path, + std::filesystem::path output_path) + : m_comm(std::move(comm)), m_input_path(std::move(input_path)), + m_output_path(std::move(output_path)) {} + + cargo::error_code + operator()() const final; + +private: + mpi::communicator m_comm; + std::filesystem::path m_input_path; + std::filesystem::path m_output_path; +}; + +} // namespace cargo + +#endif // CARGO_WORKER_SEQUENTIAL_HPP diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp new file mode 100644 index 0000000000000000000000000000000000000000..167891f2ed23cd3ab80135c567bddfa37785989e --- /dev/null +++ b/src/worker/worker.cpp @@ -0,0 +1,158 @@ +/****************************************************************************** + * Copyright 2022-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of Cargo. + * + * Cargo is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Cargo is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Cargo. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include "ops.hpp" +#include "worker.hpp" +#include "proto/mpi/message.hpp" +#include "fmt_formatters.hpp" + +namespace mpi = boost::mpi; +using namespace std::chrono_literals; + +namespace { + +// boost MPI doesn't have a communicator constructor that uses +// MPI_Comm_create_group() +mpi::communicator +make_communicator(const mpi::communicator& comm, const mpi::group& group, + int tag) { + MPI_Comm newcomm; + if(const auto ec = MPI_Comm_create_group(comm, group, tag, &newcomm); + ec != MPI_SUCCESS) { + LOGGER_ERROR("MPI_Comm_create_group() failed: {}", + boost::mpi::error_string(ec)); + MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE); + } + return mpi::communicator{newcomm, boost::mpi::comm_take_ownership}; +} + +void +update_state(int rank, std::uint64_t tid, std::uint32_t seqno, + cargo::transfer_state st, + std::optional ec = std::nullopt) { + + mpi::communicator world; + const cargo::status_message m{tid, seqno, st, ec}; + LOGGER_INFO("msg <= to: {} body: {{payload: {}}}", rank, m); + world.send(rank, static_cast(cargo::tag::status), m); +} + +} // namespace + +namespace cargo { + +worker::worker(std::string name, int rank) + : m_name(std::move(name)), m_rank(rank) {} + +void +worker::set_output_file(std::filesystem::path output_file) { + m_output_file = std::move(output_file); +} + +int +worker::run() { + + // Create a separate communicator only for worker processes + const mpi::communicator world; + const auto ranks_to_exclude = std::array{0}; + const auto workers = + ::make_communicator(world, + world.group().exclude(ranks_to_exclude.begin(), + ranks_to_exclude.end()), + 0); + + const logger::logger_config cfg{ + fmt::format("{}:{:03}", m_name, world.rank()), + m_output_file ? logger::file : logger::console_color, + m_output_file}; + + logger::create_default_logger(cfg); + + const auto greeting = + fmt::format("Starting staging process (pid {})", getpid()); + + LOGGER_INFO("{:=>{}}", "", greeting.size()); + LOGGER_INFO(greeting); + LOGGER_INFO("{:=>{}}", "", greeting.size()); + + bool done = false; + + while(!done) { + + auto msg = world.iprobe(); + + if(!msg) { + // FIXME: sleep time should be configurable + std::this_thread::sleep_for(150ms); + continue; + } + + switch(const auto t = static_cast(msg->tag())) { + case tag::pread: + [[fallthrough]]; + case tag::pwrite: + [[fallthrough]]; + case tag::sequential: { + transfer_message m; + world.recv(msg->source(), msg->tag(), m); + LOGGER_INFO("msg => from: {} body: {}", msg->source(), m); + + const auto op = operation::make_operation( + t, workers, m.input_path(), m.output_path()); + + update_state(msg->source(), m.tid(), m.seqno(), + transfer_state::running); + + cargo::error_code ec = (*op)(); + + update_state(msg->source(), m.tid(), m.seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + ec); + break; + } + + case tag::shutdown: + LOGGER_INFO("msg => from: {} body: {{shutdown}}", + msg->source()); + world.recv(msg->source(), msg->tag()); + done = true; + break; + + default: + LOGGER_WARN("[{}] Unexpected message tag: {}", msg->source(), + msg->tag()); + break; + } + } + + return 0; +} + +} // namespace cargo diff --git a/src/worker.hpp b/src/worker/worker.hpp similarity index 79% rename from src/worker.hpp rename to src/worker/worker.hpp index c0a9a9eb38cd533a8736ad7d30890c1d42aaab56..aaeea068e40b0ef7812bb9d9d4dda264262b8257 100644 --- a/src/worker.hpp +++ b/src/worker/worker.hpp @@ -26,7 +26,24 @@ #ifndef CARGO_WORKER_HPP #define CARGO_WORKER_HPP -void -worker(); +namespace cargo { + +class worker { +public: + worker(std::string name, int rank); + + void + set_output_file(std::filesystem::path output_file); + + int + run(); + +private: + std::string m_name; + int m_rank; + std::optional m_output_file; +}; + +} // namespace cargo #endif // CARGO_WORKER_HPP diff --git a/tests/tests.cpp b/tests/tests.cpp index 96bc19efdad4e905b52cb0f68f3e5d93f45230b1..15b682bb5a2e17546aa9dd1ef1934258b86525d6 100644 --- a/tests/tests.cpp +++ b/tests/tests.cpp @@ -24,7 +24,6 @@ #include #include -#include #include #include #include @@ -41,6 +40,17 @@ using namespace std::literals; using namespace std::chrono_literals; +std::ostream& +operator<<(std::ostream& os, const cargo::error_code& ec) { + os << ec.name(); + return os; +} + +CATCH_REGISTER_ENUM(cargo::transfer_state, cargo::transfer_state::pending, + cargo::transfer_state::running, + cargo::transfer_state::completed, + cargo::transfer_state::failed); + struct scoped_file { explicit scoped_file(std::filesystem::path filepath) : m_filepath(std::move(filepath)) {} @@ -226,11 +236,11 @@ SCENARIO("Parallel reads", "[flex_stager][parallel_reads]") { WHEN("Transferring datasets to a POSIX storage system") { const auto tx = cargo::transfer_datasets(server, sources, targets); - (void) tx; + // wait for the transfer to complete + auto s = tx.wait(); - // give time for transfers to complete before removing input files - // FIXME: replace with proper status checking for the transfer - std::this_thread::sleep_for(1s); + REQUIRE(s.state() == cargo::transfer_state::completed); + REQUIRE(s.error() == cargo::error_code::success); THEN("Output datasets are identical to input datasets") { @@ -286,11 +296,11 @@ SCENARIO("Parallel writes", "[flex_stager][parallel_writes]") { WHEN("Transferring datasets to a PFS") { const auto tx = cargo::transfer_datasets(server, sources, targets); - (void) tx; + // wait for the transfer to complete + auto s = tx.wait(); - // give time for transfers to complete before removing input files - // FIXME: replace with proper status checking for the transfer - std::this_thread::sleep_for(1s); + REQUIRE(s.state() == cargo::transfer_state::completed); + REQUIRE(s.error() == cargo::error_code::success); THEN("Output datasets are identical to input datasets") {