Commit fa560be0 authored by Ramon Nou's avatar Ramon Nou
Browse files

Reorder transfer op()()/progress(index) to seq them

parent f3d96593
Loading
Loading
Loading
Loading
Loading
+28 −18
Original line number Diff line number Diff line
@@ -52,8 +52,8 @@ make_communicator(const mpi::communicator& comm, const mpi::group& group,
}

void
update_state(int rank, std::uint64_t tid, std::uint32_t seqno,
             std::string name, cargo::transfer_state st, float bw,
update_state(int rank, std::uint64_t tid, std::uint32_t seqno, std::string name,
             cargo::transfer_state st, float bw,
             std::optional<cargo::error_code> ec = std::nullopt) {

    mpi::communicator world;
@@ -115,11 +115,30 @@ worker::run() {
            auto op = I->second.first.get();
            int index = I->second.second;
            if(op) {
                if(index == -1) {
                    // operation not started
                    // Print error message
                    update_state(op->source(), op->tid(), op->seqno(),
                                 op->output_path(), transfer_state::running,
                                 -1.0f);
                    cargo::error_code ec = (*op)();
                    if(ec != cargo::error_code::transfer_in_progress) {
                        update_state(op->source(), op->tid(), op->seqno(),
                                     op->output_path(), transfer_state::failed,
                                     -1.0f, ec);
                        I = m_ops.erase(I);
                        break;
                    }

                    index = 0;
                }
                // Operation in progress
                index = op->progress(index);
                if(index == -1) {
                    // operation finished
                    cargo::error_code ec = op->progress();
                    update_state(op->source(), op->tid(), op->seqno(), op->output_path(),
                    update_state(op->source(), op->tid(), op->seqno(),
                                 op->output_path(),
                                 ec ? transfer_state::failed
                                    : transfer_state::completed,
                                 0.0f, ec);
@@ -129,8 +148,9 @@ worker::run() {
                } else {
                    // update only if BW is set
                    if(op->bw() > 0.0f) {
                        update_state(op->source(), op->tid(), op->seqno(), op->output_path(),
                                     transfer_state::running, op->bw());
                        update_state(op->source(), op->tid(), op->seqno(),
                                     op->output_path(), transfer_state::running,
                                     op->bw());
                    }
                    I->second.second = index;
                    ++I;
@@ -166,7 +186,7 @@ worker::run() {
                                          t, workers, m.input_path(),
                                          m.output_path(), m_block_size,
                                          m.i_type(), m.o_type()),
                                  0)));
                                  -1)));

                const auto op =
                        m_ops[make_pair(m.input_path(), m.output_path())]
@@ -174,16 +194,8 @@ worker::run() {

                op->set_comm(msg->source(), m.tid(), m.seqno(), t);

                update_state(op->source(), op->tid(), op->seqno(), op->output_path(),
                             transfer_state::running, -1.0f);
                // Different scenarios read -> write | write -> read

                cargo::error_code ec = (*op)();
                if(ec != cargo::error_code::transfer_in_progress) {
                    update_state(op->source(), op->tid(), op->seqno(), op->output_path(),
                                 transfer_state::failed, -1.0f, ec);
                    m_ops.erase(make_pair(m.input_path(), m.output_path()));
                }
                update_state(op->source(), op->tid(), op->seqno(),
                             op->output_path(), transfer_state::pending, -1.0f);
                break;
            }

@@ -199,8 +211,6 @@ worker::run() {
                        LOGGER_INFO("Operation non existent", msg->source(), m);
                    }
                }


                break;
            }