From e013f22aba4a42feb9109a1ae360f16c144c26e6 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Thu, 9 Nov 2023 08:15:32 +0100 Subject: [PATCH] Sequential fix --- CMakeLists.txt | 2 +- src/worker/sequential.cpp | 7 +++++-- src/worker/sequential.hpp | 1 + src/worker/worker.cpp | 37 ++++++++++++++++++------------------- 4 files changed, 25 insertions(+), 22 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index fe8e8d4..46c54b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,7 +30,7 @@ cmake_minimum_required(VERSION 3.19) project( cargo - VERSION 0.3.0 + VERSION 0.3.1 LANGUAGES C CXX ) diff --git a/src/worker/sequential.cpp b/src/worker/sequential.cpp index 83d3da5..40f5e4c 100644 --- a/src/worker/sequential.cpp +++ b/src/worker/sequential.cpp @@ -31,17 +31,20 @@ namespace cargo { cargo::error_code seq_operation::operator()() { LOGGER_CRITICAL("{}: to be implemented", __FUNCTION__); + m_status = cargo::error_code::not_implemented; return cargo::error_code::not_implemented; } cargo::error_code seq_operation::progress() const { - return error_code::success; + return m_status; } int seq_operation::progress(int ongoing_index) { - return ++ongoing_index; + ongoing_index++; + m_status = cargo::error_code::not_implemented; + return -1; } } // namespace cargo diff --git a/src/worker/sequential.hpp b/src/worker/sequential.hpp index f5d54da..2595142 100644 --- a/src/worker/sequential.hpp +++ b/src/worker/sequential.hpp @@ -51,6 +51,7 @@ private: mpi::communicator m_comm; std::filesystem::path m_input_path; std::filesystem::path m_output_path; + cargo::error_code m_status; }; } // namespace cargo diff --git a/src/worker/worker.cpp b/src/worker/worker.cpp index b8f218f..56e2143 100644 --- a/src/worker/worker.cpp +++ b/src/worker/worker.cpp @@ -109,30 +109,29 @@ worker::run() { // FIXME: sleep time should be configurable // Progress through all transfers - + auto I = m_ops.begin(); auto IE = m_ops.end(); - if (I != IE) { + if(I != IE) { auto op = I->second.first.get(); int index = I->second.second; if(op) { - if(op->t() == tag::pread or op->t() == tag::pwrite) { - index = op->progress(index); - if(index == -1) { - // operation finished - cargo::error_code ec = op->progress(); - update_state(op->source(), op->tid(), op->seqno(), - ec ? transfer_state::failed - : transfer_state::completed, - 0.0f, ec); - - // Transfer finished - I = m_ops.erase(I); - } else { - update_state(op->source(), op->tid(), op->seqno(), - transfer_state::running, op->bw()); - I->second.second = index; - } + + index = op->progress(index); + if(index == -1) { + // operation finished + cargo::error_code ec = op->progress(); + update_state(op->source(), op->tid(), op->seqno(), + ec ? transfer_state::failed + : transfer_state::completed, + 0.0f, ec); + + // Transfer finished + I = m_ops.erase(I); + } else { + update_state(op->source(), op->tid(), op->seqno(), + transfer_state::running, op->bw()); + I->second.second = index; } } } -- GitLab