Verified Commit bd9011fc authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Refactor and simplify request status propagation

parent 9c80402d
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ target_sources(
          shared_mutex.hpp
          proto/rpc/response.hpp
          proto/mpi/message.hpp
          boost_serialization_std_optional.hpp
)

target_include_directories(
+89 −0
Original line number Diff line number Diff line
/////////1/////////2/////////3/////////4/////////5/////////6/////////7/////////8
// optional.hpp - non-intrusive serialization of optional types
//
// copyright (c) 2019 Samuel Debionne, ESRF
//
// Use, modification and distribution is subject to the Boost Software
// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org for updates, documentation, and revision history.
//
// Widely inspired form boost::optional serialization
//

#ifndef CARGO_BOOST_SERIALIZATION_STD_OPTIONAL_HPP
#define CARGO_BOOST_SERIALIZATION_STD_OPTIONAL_HPP

#include <optional>

#include <boost/config.hpp>

#include <boost/serialization/split_free.hpp>
#include <boost/serialization/level.hpp>
#include <boost/serialization/nvp.hpp>
#include <boost/serialization/version.hpp>
#include <boost/type_traits/is_pointer.hpp>
#include <boost/serialization/detail/stack_constructor.hpp>
#include <boost/serialization/detail/is_default_constructible.hpp>
#include <boost/serialization/force_include.hpp>

// function specializations must be defined in the appropriate
// namespace - boost::serialization
namespace boost {
namespace serialization {

template <class Archive, class T>
void
save(Archive& ar, const std::optional<T>& t, const unsigned int /*version*/
) {
    // It is an inherent limitation to the serialization of optional.hpp
    // that the underlying type must be either a pointer or must have a
    // default constructor.  It's possible that this could change sometime
    // in the future, but for now, one will have to work around it.  This can
    // be done by serialization the optional<T> as optional<T *>
#if !defined(BOOST_NO_CXX11_HDR_TYPE_TRAITS)
    BOOST_STATIC_ASSERT(
            boost::serialization::detail::is_default_constructible<T>::value ||
            boost::is_pointer<T>::value);
#endif
    const bool tflag = t.has_value();
    ar << boost::serialization::make_nvp("has_value", tflag);
    if(tflag) {
        ar << boost::serialization::make_nvp("value", *t);
    }
}

template <class Archive, class T>
void
load(Archive& ar, std::optional<T>& t, const unsigned int version) {

    (void) version;

    bool tflag;
    ar >> boost::serialization::make_nvp("has_value", tflag);
    if(!tflag) {
        t.reset();
        return;
    }

    if(!t.has_value())
        t = T();
    ar >> boost::serialization::make_nvp("value", *t);
}

template <class Archive, class T>
void
serialize(Archive& ar, std::optional<T>& t, const unsigned int version) {
    boost::serialization::split_free(ar, t, version);
}

template <class T>
struct version<std::optional<T>> {
    BOOST_STATIC_CONSTANT(int, value = 1);
};

} // namespace serialization
} // namespace boost

#endif // CARGO_BOOST_SERIALIZATION_STD_OPTIONAL_HPP
+9 −7
Original line number Diff line number Diff line
@@ -124,9 +124,7 @@ master_server::mpi_listener_ult() {
                            msg->source(), m);

                m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1,
                                         m.error_code()
                                                 ? part_status::failed
                                                 : part_status::completed);
                                         m.state(), m.error_code());
                break;
            }

@@ -211,7 +209,10 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
    using proto::response_with_value;
    using proto::status_response;

    using response_type =
            status_response<cargo::transfer_state, cargo::error_code>;

    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));
@@ -224,11 +225,12 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) {
                LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);
                req.respond(generic_response{rpc.id(), ec});
            })
            .map([&](auto&& s) {
            .map([&](auto&& rs) {
                LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc,
                            error_code::success, s);
                            error_code::success, rs);
                req.respond(
                        response_with_value{rpc.id(), error_code::success, s});
                        response_type{rpc.id(), error_code::success,
                                      std::make_pair(rs.state(), rs.error())});
            });
}

+32 −0
Original line number Diff line number Diff line
@@ -46,4 +46,36 @@ parallel_request::nworkers() const {
    return m_nworkers;
}

request_status::request_status(part_status s)
    : m_state(s.state()), m_error_code(s.error()) {}

request_status::request_status(transfer_state s, std::optional<error_code> ec)
    : m_state(s), m_error_code(ec) {}

transfer_state
request_status::state() const {
    return m_state;
}

std::optional<error_code>
request_status::error() const {
    return m_error_code;
}

transfer_state
part_status::state() const {
    return m_state;
}

std::optional<error_code>
part_status::error() const {
    return m_error_code;
}

void
part_status::update(transfer_state s, std::optional<error_code> ec) noexcept {
    m_state = s;
    m_error_code = ec;
}

} // namespace cargo
 No newline at end of file
+68 −2
Original line number Diff line number Diff line
@@ -27,6 +27,8 @@

#include <cstdint>
#include <vector>
#include <optional>
#include <fmt/format.h>

namespace cargo {

@@ -56,9 +58,73 @@ private:
    std::size_t m_nworkers;
};

enum class part_status { pending, running, completed, failed };
enum class request_status { pending, running, completed, failed };
/**
 * The status of a single file part.
 */
class part_status {
public:
    part_status() = default;

    [[nodiscard]] transfer_state
    state() const;

    [[nodiscard]] std::optional<error_code>
    error() const;

    void
    update(transfer_state s, std::optional<error_code> ec) noexcept;

private:
    transfer_state m_state{transfer_state::pending};
    std::optional<error_code> m_error_code{};
};

class request_status {
public:
    request_status() = default;
    explicit request_status(transfer_state s,
                            std::optional<error_code> ec = {});
    explicit request_status(part_status s);

    [[nodiscard]] transfer_state
    state() const;

    [[nodiscard]] std::optional<error_code>
    error() const;

private:
    transfer_state m_state{transfer_state::pending};
    std::optional<error_code> m_error_code{};
};

} // namespace cargo

template <>
struct fmt::formatter<cargo::request_status> : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const cargo::request_status& s, FormatContext& ctx) const {

        const auto state_name = [](auto&& s) {
            switch(s.state()) {
                case cargo::transfer_state::pending:
                    return "pending";
                case cargo::transfer_state::running:
                    return "running";
                case cargo::transfer_state::completed:
                    return "completed";
                case cargo::transfer_state::failed:
                    return "failed";
                default:
                    return "unknown";
            }
        };

        const auto str = fmt::format("{{state: {}, error_code: {}}}",
                                     state_name(s), s.error());
        return formatter<std::string_view>::format(str, ctx);
    }
};

#endif // CARGO_PARALLEL_REQUEST_HPP
Loading