Verified Commit 9c80402d authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add `transfer_status` RPC to `master_server`

parent e3c05787
Loading
Loading
Loading
Loading
+28 −0
Original line number Diff line number Diff line
@@ -83,6 +83,7 @@ master_server::master_server(std::string name, std::string address,
#define EXPAND(rpc_name) #rpc_name##s, &master_server::rpc_name
    provider::define(EXPAND(ping));
    provider::define(EXPAND(transfer_datasets));
    provider::define(EXPAND(transfer_status));

#undef EXPAND

@@ -204,4 +205,31 @@ master_server::transfer_datasets(const network::request& req,
            });
}

void
master_server::transfer_status(const network::request& req, std::uint64_t tid) {

    using network::get_address;
    using network::rpc_info;
    using proto::generic_response;
    using proto::response_with_value;

    mpi::communicator world;
    const auto rpc = rpc_info::create(RPC_NAME(), get_address(req));

    LOGGER_INFO("rpc {:>} body: {{tid: {}}}", rpc, tid);

    m_request_manager.lookup(tid)
            .or_else([&](auto&& ec) {
                LOGGER_ERROR("Failed to lookup request: {}", ec);
                LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec);
                req.respond(generic_response{rpc.id(), ec});
            })
            .map([&](auto&& s) {
                LOGGER_INFO("rpc {:<} body: {{retval: {}, status: {}}}", rpc,
                            error_code::success, s);
                req.respond(
                        response_with_value{rpc.id(), error_code::success, s});
            });
}

} // namespace cargo
+3 −0
Original line number Diff line number Diff line
@@ -52,6 +52,9 @@ private:
                      const std::vector<cargo::dataset>& sources,
                      const std::vector<cargo::dataset>& targets);

    void
    transfer_status(const network::request& req, std::uint64_t tid);

private:
    // Dedicated execution stream for the MPI listener ULT
    thallium::managed<thallium::xstream> m_mpi_listener_ess;