Commit 3278a2fa authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add bandwidth monitoring to remote pull task

parent 95c268b3
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -87,7 +87,9 @@ task<iotask_type::copy>::operator()() {
        return;
    }

    LOGGER_WARN("[{}] I/O task completed successfully", tid);
    LOGGER_WARN("[{}] I/O task completed successfully [{} MiB/s]", 
                tid, m_task_info->bandwidth());

    m_task_info->update_status(task_status::finished, urd_error::success, 
                    std::make_error_code(static_cast<std::errc>(ec.value())));
}
+3 −1
Original line number Diff line number Diff line
@@ -87,7 +87,9 @@ task<iotask_type::move>::operator()() {
        return;
    }

    LOGGER_WARN("[{}] I/O task completed successfully", tid);
    LOGGER_WARN("[{}] I/O task completed successfully [{} MiB/s]", 
                tid, m_task_info->bandwidth());

    m_task_info->update_status(task_status::finished, urd_error::success, 
                    std::make_error_code(static_cast<std::errc>(ec.value())));
}
+8 −1
Original line number Diff line number Diff line
@@ -93,7 +93,14 @@ task<iotask_type::remote_transfer>::operator()() {
        return;
    }

    if(m_task_info->is_remote()) {
        LOGGER_WARN("[{}] I/O task completed successfully", tid);
    }
    else {
        LOGGER_WARN("[{}] I/O task completed successfully [{} MiB/s]", 
                    tid, m_task_info->bandwidth());
    }

    m_task_info->update_status(task_status::finished, urd_error::success, 
                    std::make_error_code(static_cast<std::errc>(ec.value())));
}
+2 −2
Original line number Diff line number Diff line
@@ -251,7 +251,7 @@ local_path_to_remote_resource_transferor::transfer(
        task_info->record_transfer(input_buffer.size(), 
                                   resp.at(0).elapsed_time());

        LOGGER_DEBUG("Remote request completed with output "
        LOGGER_DEBUG("Remote pull request completed with output "
                     "{{status: {}, task_error: {}, sys_errnum: {}}} "
                     "({} bytes, {} usecs)",
                    resp.at(0).status(), resp.at(0).task_error(), 
@@ -365,7 +365,7 @@ local_path_to_remote_resource_transferor::accept_transfer(
                static_cast<uint32_t>(task_status::finished),
                static_cast<uint32_t>(urd_error::success),
                0,
                0};
                usecs};

        if(is_collection) {
            std::error_code ec = 
+11 −2
Original line number Diff line number Diff line
@@ -251,6 +251,9 @@ remote_resource_to_local_path_transferor::transfer(
            static_cast<std::errc>(resp2.at(0).sys_errnum()));
    }

    task_info->record_transfer(output_buffer->size(), 
                               resp2.at(0).elapsed_time());

    if(resp.at(0).is_collection()) {
        return ::unpack_archive(tempfile.path(), d_dst.parent()->mount());
    }
@@ -345,6 +348,8 @@ remote_resource_to_local_path_transferor::accept_transfer(
                 remote_buffers.count(),
                 remote_buffers.size());

    auto start = std::chrono::steady_clock::now();

    // N.B. IMPORTANT: we NEED to capture 'input_buffer' by value here so that
    // the mapped_buffer doesn't get released before completion_callback()
    // is called.
@@ -353,15 +358,19 @@ remote_resource_to_local_path_transferor::accept_transfer(
    // FIXME: with C++14 we could simply std::move both into the capture rather
    // than using shared_ptrs :/
    const auto completion_callback =
        [this, tempfile, input_buffer](
        [this, tempfile, input_buffer, start](
                hermes::request<rpc::pull_resource>&& req) { 

        uint32_t usecs = 
            std::chrono::duration_cast<std::chrono::microseconds>(
                std::chrono::steady_clock::now() - start).count();

        // default response
        rpc::pull_resource::output out(
                static_cast<uint32_t>(task_status::finished),
                static_cast<uint32_t>(urd_error::success),
                0,
                0);
                usecs);

        //TODO: hermes offers no way to check for an error yet
        LOGGER_DEBUG("Push completed");