Verified Commit e3c05787 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

End workers if master is asked to shut down

parent 28e1ece9
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -135,6 +135,12 @@ master_server::mpi_listener_ult() {
                break;
        }
    }

    // shutting down, notify all workers
    for(int rank = 1; rank < world.size(); ++rank) {
        LOGGER_INFO("msg <= to: {} body: {{shutdown}}", rank);
        world.send(static_cast<int>(rank), static_cast<int>(tag::shutdown));
    }
}

#define RPC_NAME() ("ADM_"s + __FUNCTION__)
+27 −1
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@

namespace cargo {

enum class tag : int { pread, pwrite, sequential, status };
enum class tag : int { pread, pwrite, sequential, status, shutdown };

class transfer_message {

@@ -127,6 +127,21 @@ private:
    cargo::error_code m_error_code{};
};

class shutdown_message {

    friend class boost::serialization::access;

public:
    shutdown_message() = default;

    template <typename Archive>
    void
    serialize(Archive& ar, const unsigned int version) {
        (void) ar;
        (void) version;
    }
};

} // namespace cargo

template <>
@@ -154,4 +169,15 @@ struct fmt::formatter<cargo::status_message> : formatter<std::string_view> {
    }
};

template <>
struct fmt::formatter<cargo::shutdown_message> : formatter<std::string_view> {
    // parse is inherited from formatter<string_view>.
    template <typename FormatContext>
    auto
    format(const cargo::shutdown_message& s, FormatContext& ctx) const {
        (void) s;
        return formatter<std::string_view>::format("{{shutdown}}", ctx);
    }
};

#endif // CARGO_PROTO_MPI_MESSAGE_HPP
+7 −0
Original line number Diff line number Diff line
@@ -124,6 +124,13 @@ worker::run() {
                break;
            }

            case tag::shutdown:
                LOGGER_INFO("msg => from: {} body: {{shutdown}}",
                            msg->source());
                world.recv(msg->source(), msg->tag());
                done = true;
                break;

            default:
                LOGGER_WARN("[{}] Unexpected message tag: {}", msg->source(),
                            msg->tag());