Verified Commit 20dd602e authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Refactor `worker` into `cargo::worker` class

parent c3e1e80e
Loading
Loading
Loading
Loading
+2 −4
Original line number Diff line number Diff line
@@ -104,8 +104,7 @@ main(int argc, char* argv[]) {
    mpi::communicator world;

    try {
        if(world.rank() == 0) {

        if(const auto rank = world.rank(); rank == 0) {
            cargo::master_server srv{cfg.progname, cfg.address, cfg.daemonize,
                                     fs::current_path()};

@@ -115,9 +114,8 @@ main(int argc, char* argv[]) {
            }

            return srv.run();

        } else {
            worker();
            return cargo::worker{rank}.run();
        }
    } catch(const std::exception& ex) {
        fmt::print(stderr,
+22 −14
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
#include <thread>
#include <posix_file/file.hpp>
#include <posix_file/views.hpp>
#include "worker.hpp"
#include "message.hpp"
#include "mpioxx.hpp"

@@ -57,6 +58,8 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group,

} // namespace

namespace cargo {

using memory_buffer = std::vector<char>;
using buffer_region = std::span<char>;

@@ -299,8 +302,10 @@ sequential_transfer(const std::filesystem::path& input_path,
    LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__);
}

void
worker() {
worker::worker(int rank) : m_rank(rank) {}

int
worker::run() {

    // Create a separate communicator only for worker processes
    const mpi::communicator world;
@@ -330,21 +335,21 @@ worker() {
            continue;
        }

        switch(static_cast<cargo::tag>(msg->tag())) {
            case cargo::tag::transfer: {
                cargo::transfer_request m;
        switch(static_cast<tag>(msg->tag())) {
            case tag::transfer: {
                transfer_request m;
                world.recv(0, msg->tag(), m);
                LOGGER_DEBUG("Transfer request received!: {}", m);

                switch(m.type()) {
                    case cargo::parallel_read:
                        ::mpio_read(workers, m.input_path(), m.output_path());
                    case parallel_read:
                        mpio_read(workers, m.input_path(), m.output_path());
                        break;
                    case cargo::parallel_write:
                        ::mpio_write(workers, m.input_path(), m.output_path());
                    case parallel_write:
                        mpio_write(workers, m.input_path(), m.output_path());
                        break;
                    case cargo::sequential:
                        ::sequential_transfer(m.input_path(), m.output_path());
                    case sequential:
                        sequential_transfer(m.input_path(), m.output_path());
                        break;
                }

@@ -352,9 +357,8 @@ worker() {
                        "Transfer finished! (world_rank {}, workers_rank: {})",
                        world.rank(), workers.rank());

                world.send(msg->source(),
                           static_cast<int>(cargo::tag::status),
                           cargo::transfer_status{m.id()});
                world.send(msg->source(), static_cast<int>(tag::status),
                           transfer_status{m.id()});

                break;
            }
@@ -365,4 +369,8 @@ worker() {
                break;
        }
    }

    return 0;
}

} // namespace cargo
+15 −2
Original line number Diff line number Diff line
@@ -26,7 +26,20 @@
#ifndef CARGO_WORKER_HPP
#define CARGO_WORKER_HPP

void
worker();
namespace cargo {

class worker {
public:

    worker(int rank);

    int
    run();

private:
    int m_rank;
};

} // namespace cargo

#endif // CARGO_WORKER_HPP