Loading src/master.cpp +51 −1 Original line number Diff line number Diff line Loading @@ -96,6 +96,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); provider::define(EXPAND(bw_control)); provider::define(EXPAND(transfer_statuses)); #undef EXPAND Loading Loading @@ -135,7 +136,8 @@ master_server::mpi_listener_ult() { msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, m.state(), m.bw(), m.error_code()); m.name(), m.state(), m.bw(), m.error_code()); break; } Loading Loading @@ -367,4 +369,52 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { }); } void master_server::transfer_statuses(const network::request& req, std::uint64_t tid) { using network::get_address; using network::rpc_info; using proto::generic_response; using proto::statuses_response; using response_type = statuses_response<std::string, cargo::transfer_state, float, cargo::error_code>; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); // Get all the statuses of the associated transfer. returns a vector // of transfer_status objects m_request_manager.lookup_all(tid) .or_else([&](auto&& ec) { LOGGER_ERROR("Failed to lookup request: {}", ec); LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(generic_response{rpc.id(), ec}); }) .map([&](auto&& rs) { // We get a vector of request_status objects, we need to // convert them to a vector of tuples with the same // informations std::vector<std::tuple<std::string, cargo::transfer_state, float, std::optional<cargo::error_code>>> v{}; for(auto& r : rs) { v.push_back(std::make_tuple(r.name(), r.state(), r.bw(), r.error().value())); LOGGER_INFO( "rpc {:<} body: {{retval: {}, name: {}, status: {}}}", rpc, error_code::success, r.name(), r.state()); } // Generate a response type with the vector of tuples and // respond req.respond(response_type{rpc.id(), error_code::success, v}); }); } } // namespace cargo src/master.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -58,6 +58,9 @@ private: void transfer_status(const network::request& req, std::uint64_t tid); void transfer_statuses(const network::request& req, std::uint64_t tid); // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void Loading src/parallel_request.cpp +14 −4 Original line number Diff line number Diff line Loading @@ -47,17 +47,22 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} : m_name(s.name()), m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} request_status::request_status(transfer_state s, float bw, request_status::request_status(std::string name, transfer_state s, float bw, std::optional<error_code> ec) : m_state(s), m_bw(bw), m_error_code(ec) {} : m_name(name), m_state(s), m_bw(bw), m_error_code(ec) {} transfer_state request_status::state() const { return m_state; } std::string request_status::name() const { return m_name; } std::optional<error_code> request_status::error() const { return m_error_code; Loading @@ -67,6 +72,10 @@ float request_status::bw() const { return m_bw; } std::string part_status::name() const { return m_name; } transfer_state part_status::state() const { Loading @@ -84,8 +93,9 @@ part_status::error() const { } void part_status::update(transfer_state s, float bw, part_status::update(std::string name, transfer_state s, float bw, std::optional<error_code> ec) noexcept { m_name = std::move(name); m_state = s; m_bw = bw; m_error_code = ec; Loading src/parallel_request.hpp +10 −2 Original line number Diff line number Diff line Loading @@ -65,6 +65,9 @@ class part_status { public: part_status() = default; [[nodiscard]] std::string name() const; [[nodiscard]] transfer_state state() const; Loading @@ -75,9 +78,10 @@ public: bw() const; void update(transfer_state s, float bw, std::optional<error_code> ec) noexcept; update(std::string name, transfer_state s, float bw, std::optional<error_code> ec) noexcept; private: std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional<error_code> m_error_code{}; Loading @@ -86,10 +90,13 @@ private: class request_status { public: request_status() = default; explicit request_status(transfer_state s, float bw, explicit request_status(std::string name, transfer_state s, float bw, std::optional<error_code> ec = {}); explicit request_status(part_status s); [[nodiscard]] std::string name() const; [[nodiscard]] transfer_state state() const; Loading @@ -100,6 +107,7 @@ public: bw() const; private: std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional<error_code> m_error_code{}; Loading src/proto/mpi/message.hpp +9 −2 Original line number Diff line number Diff line Loading @@ -120,10 +120,10 @@ class status_message { public: status_message() = default; status_message(std::uint64_t tid, std::uint32_t seqno, status_message(std::uint64_t tid, std::uint32_t seqno, std::string name, cargo::transfer_state state, float bw, std::optional<cargo::error_code> error_code = std::nullopt) : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), : m_tid(tid), m_seqno(seqno), m_name(name), m_state(state), m_bw(bw), m_error_code(error_code) {} [[nodiscard]] std::uint64_t Loading @@ -136,6 +136,11 @@ public: return m_seqno; } [[nodiscard]] const std::string& name() const { return m_name; } [[nodiscard]] cargo::transfer_state state() const { return m_state; Loading @@ -160,6 +165,7 @@ private: ar& m_tid; ar& m_seqno; ar& m_name; ar& m_state; ar& m_bw; ar& m_error_code; Loading @@ -167,6 +173,7 @@ private: std::uint64_t m_tid{}; std::uint32_t m_seqno{}; std::string m_name{}; cargo::transfer_state m_state{}; float m_bw{}; std::optional<cargo::error_code> m_error_code{}; Loading Loading
src/master.cpp +51 −1 Original line number Diff line number Diff line Loading @@ -96,6 +96,7 @@ master_server::master_server(std::string name, std::string address, provider::define(EXPAND(transfer_datasets)); provider::define(EXPAND(transfer_status)); provider::define(EXPAND(bw_control)); provider::define(EXPAND(transfer_statuses)); #undef EXPAND Loading Loading @@ -135,7 +136,8 @@ master_server::mpi_listener_ult() { msg->source(), m); m_request_manager.update(m.tid(), m.seqno(), msg->source() - 1, m.state(), m.bw(), m.error_code()); m.name(), m.state(), m.bw(), m.error_code()); break; } Loading Loading @@ -367,4 +369,52 @@ master_server::transfer_status(const network::request& req, std::uint64_t tid) { }); } void master_server::transfer_statuses(const network::request& req, std::uint64_t tid) { using network::get_address; using network::rpc_info; using proto::generic_response; using proto::statuses_response; using response_type = statuses_response<std::string, cargo::transfer_state, float, cargo::error_code>; mpi::communicator world; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid); // Get all the statuses of the associated transfer. returns a vector // of transfer_status objects m_request_manager.lookup_all(tid) .or_else([&](auto&& ec) { LOGGER_ERROR("Failed to lookup request: {}", ec); LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(generic_response{rpc.id(), ec}); }) .map([&](auto&& rs) { // We get a vector of request_status objects, we need to // convert them to a vector of tuples with the same // informations std::vector<std::tuple<std::string, cargo::transfer_state, float, std::optional<cargo::error_code>>> v{}; for(auto& r : rs) { v.push_back(std::make_tuple(r.name(), r.state(), r.bw(), r.error().value())); LOGGER_INFO( "rpc {:<} body: {{retval: {}, name: {}, status: {}}}", rpc, error_code::success, r.name(), r.state()); } // Generate a response type with the vector of tuples and // respond req.respond(response_type{rpc.id(), error_code::success, v}); }); } } // namespace cargo
src/master.hpp +3 −0 Original line number Diff line number Diff line Loading @@ -58,6 +58,9 @@ private: void transfer_status(const network::request& req, std::uint64_t tid); void transfer_statuses(const network::request& req, std::uint64_t tid); // Receives a request to increase or decrease BW // -1 faster, 0 , +1 slower void Loading
src/parallel_request.cpp +14 −4 Original line number Diff line number Diff line Loading @@ -47,17 +47,22 @@ parallel_request::nworkers() const { } request_status::request_status(part_status s) : m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} : m_name(s.name()), m_state(s.state()), m_bw(s.bw()), m_error_code(s.error()) {} request_status::request_status(transfer_state s, float bw, request_status::request_status(std::string name, transfer_state s, float bw, std::optional<error_code> ec) : m_state(s), m_bw(bw), m_error_code(ec) {} : m_name(name), m_state(s), m_bw(bw), m_error_code(ec) {} transfer_state request_status::state() const { return m_state; } std::string request_status::name() const { return m_name; } std::optional<error_code> request_status::error() const { return m_error_code; Loading @@ -67,6 +72,10 @@ float request_status::bw() const { return m_bw; } std::string part_status::name() const { return m_name; } transfer_state part_status::state() const { Loading @@ -84,8 +93,9 @@ part_status::error() const { } void part_status::update(transfer_state s, float bw, part_status::update(std::string name, transfer_state s, float bw, std::optional<error_code> ec) noexcept { m_name = std::move(name); m_state = s; m_bw = bw; m_error_code = ec; Loading
src/parallel_request.hpp +10 −2 Original line number Diff line number Diff line Loading @@ -65,6 +65,9 @@ class part_status { public: part_status() = default; [[nodiscard]] std::string name() const; [[nodiscard]] transfer_state state() const; Loading @@ -75,9 +78,10 @@ public: bw() const; void update(transfer_state s, float bw, std::optional<error_code> ec) noexcept; update(std::string name, transfer_state s, float bw, std::optional<error_code> ec) noexcept; private: std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional<error_code> m_error_code{}; Loading @@ -86,10 +90,13 @@ private: class request_status { public: request_status() = default; explicit request_status(transfer_state s, float bw, explicit request_status(std::string name, transfer_state s, float bw, std::optional<error_code> ec = {}); explicit request_status(part_status s); [[nodiscard]] std::string name() const; [[nodiscard]] transfer_state state() const; Loading @@ -100,6 +107,7 @@ public: bw() const; private: std::string m_name; transfer_state m_state{transfer_state::pending}; float m_bw; std::optional<error_code> m_error_code{}; Loading
src/proto/mpi/message.hpp +9 −2 Original line number Diff line number Diff line Loading @@ -120,10 +120,10 @@ class status_message { public: status_message() = default; status_message(std::uint64_t tid, std::uint32_t seqno, status_message(std::uint64_t tid, std::uint32_t seqno, std::string name, cargo::transfer_state state, float bw, std::optional<cargo::error_code> error_code = std::nullopt) : m_tid(tid), m_seqno(seqno), m_state(state), m_bw(bw), : m_tid(tid), m_seqno(seqno), m_name(name), m_state(state), m_bw(bw), m_error_code(error_code) {} [[nodiscard]] std::uint64_t Loading @@ -136,6 +136,11 @@ public: return m_seqno; } [[nodiscard]] const std::string& name() const { return m_name; } [[nodiscard]] cargo::transfer_state state() const { return m_state; Loading @@ -160,6 +165,7 @@ private: ar& m_tid; ar& m_seqno; ar& m_name; ar& m_state; ar& m_bw; ar& m_error_code; Loading @@ -167,6 +173,7 @@ private: std::uint64_t m_tid{}; std::uint32_t m_seqno{}; std::string m_name{}; cargo::transfer_state m_state{}; float m_bw{}; std::optional<cargo::error_code> m_error_code{}; Loading