From e14e29a632d1b73536b9b05270e8645600c85c07 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Sat, 16 Mar 2019 14:06:34 +0100 Subject: [PATCH] Remove task descriptors when tasks complete Calling norns(ctl)?_error() to query the status of a running task will release its descriptor in the server if the task already completed (NORNS_EFINISHED or NORNS_EFINISHEDWERROR). Since norns(ctl)?_wait() internally queries the task status to check whether a task has completed, we cache the retrieved task_status into the iotask_t user descriptor so that it can be retrieved by a subsequent norns(ctl)?_error() call. --- include/norns/norns_error.h | 9 ++-- include/norns/norns_types.h | 17 ++++--- lib/libnorns.c | 12 +++++ lib/libnornsctl.c | 12 +++++ src/api/response.cpp | 2 + src/io/task-info.cpp | 10 ++++ src/io/task-info.hpp | 3 ++ src/io/task-manager.cpp | 50 +++++++++++++------ src/io/task-manager.hpp | 3 ++ .../local-path-to-remote-resource.cpp | 7 ++- .../remote-resource-to-local-path.cpp | 4 +- src/logger.hpp | 5 ++ src/rpcs.hpp | 1 - src/urd.cpp | 16 ++++-- src/urd.hpp | 2 +- 15 files changed, 116 insertions(+), 37 deletions(-) diff --git a/include/norns/norns_error.h b/include/norns/norns_error.h index cc3f3b4..e42061b 100644 --- a/include/norns/norns_error.h +++ b/include/norns/norns_error.h @@ -73,10 +73,11 @@ extern "C" { #define NORNS_ETASKSPENDING -43 /* task status */ -#define NORNS_EPENDING -100 -#define NORNS_EINPROGRESS -101 -#define NORNS_EFINISHED -102 -#define NORNS_EFINISHEDWERROR -103 +#define NORNS_EUNDEFINED -100 +#define NORNS_EPENDING -101 +#define NORNS_EINPROGRESS -102 +#define NORNS_EFINISHED -103 +#define NORNS_EFINISHEDWERROR -104 /* errors resources */ #define NORNS_ERESOURCEEXISTS -110 diff --git a/include/norns/norns_types.h b/include/norns/norns_types.h index ee2736a..dc79725 100644 --- a/include/norns/norns_types.h +++ b/include/norns/norns_types.h @@ -123,14 +123,6 @@ typedef enum { NORNS_IOTASK_REMOVE = 0x3 } norns_op_t; -/* Descriptor for an I/O task */ -typedef struct { - norns_tid_t t_id; /* task identifier */ - norns_op_t t_op; /* operation to be performed */ - norns_resource_t t_src; /* source resource */ - norns_resource_t t_dst; /* destination resource */ -} norns_iotask_t; - /* I/O task status descriptor */ typedef struct { norns_status_t st_status; /* task current status */ @@ -140,7 +132,16 @@ typedef struct { size_t st_total; /* total bytes in task */ } norns_stat_t; +/* Descriptor for an I/O task */ +typedef struct { + norns_tid_t t_id; /* task identifier */ + norns_op_t t_op; /* operation to be performed */ + norns_resource_t t_src; /* source resource */ + norns_resource_t t_dst; /* destination resource */ + /* Internal members */ + norns_stat_t __t_status; /* cached task status */ +} norns_iotask_t; /* Additional administrative types */ typedef enum { diff --git a/lib/libnorns.c b/lib/libnorns.c index 61a8462..3f7aad2 100644 --- a/lib/libnorns.c +++ b/lib/libnorns.c @@ -215,6 +215,14 @@ norns_error(norns_iotask_t* task, norns_stat_t* stats) { return NORNS_EBADARGS; } + // we might already have the task status cached in the task descriptor if + // the user called norns_wait() and the task completed + if(task->__t_status.st_status == NORNS_EFINISHED || + task->__t_status.st_status == NORNS_EFINISHEDWERROR) { + *stats = task->__t_status; + return NORNS_SUCCESS; + } + return send_status_request(task, stats); } @@ -251,6 +259,10 @@ norns_wait(norns_iotask_t* task, if(stats.st_status == NORNS_EFINISHED || stats.st_status == NORNS_EFINISHEDWERROR) { + // given that the task finished, we can save its completion status + // so that future calls to norns_error() can retrieve it without + // having to contact the server + task->__t_status = stats; return NORNS_SUCCESS; } diff --git a/lib/libnornsctl.c b/lib/libnornsctl.c index b3810de..fe5191a 100644 --- a/lib/libnornsctl.c +++ b/lib/libnornsctl.c @@ -402,6 +402,14 @@ nornsctl_error(norns_iotask_t* task, return NORNS_EBADARGS; } + // we might already have the task status cached in the task descriptor if + // the user called norns_wait() and the task completed + if(task->__t_status.st_status == NORNS_EFINISHED || + task->__t_status.st_status == NORNS_EFINISHEDWERROR) { + *stats = task->__t_status; + return NORNS_SUCCESS; + } + return send_status_request(task, stats); } @@ -437,6 +445,10 @@ nornsctl_wait(norns_iotask_t* task, if(stats.st_status == NORNS_EFINISHED || stats.st_status == NORNS_EFINISHEDWERROR) { + // given that the task finished, we can save its completion status + // so that future calls to nornsctl_error() can retrieve it without + // having to contact the server + task->__t_status = stats; return NORNS_SUCCESS; } diff --git a/src/api/response.cpp b/src/api/response.cpp index fbcfcbb..fc26941 100644 --- a/src/api/response.cpp +++ b/src/api/response.cpp @@ -75,6 +75,8 @@ norns::rpc::Response_Type encode(norns::api::response_type type) { using norns::io::task_status; switch(status) { + case task_status::undefined: + return NORNS_EUNDEFINED; case task_status::pending: return NORNS_EPENDING; case task_status::running: diff --git a/src/io/task-info.cpp b/src/io/task-info.cpp index 0d394ec..457315d 100644 --- a/src/io/task-info.cpp +++ b/src/io/task-info.cpp @@ -25,6 +25,7 @@ * . * *************************************************************************/ +#include #include #include #include "task-stats.hpp" @@ -124,6 +125,15 @@ task_info::set_context(const boost::any& ctx) { m_ctx = ctx; } +void +task_info::clear_context() { +#if BOOST_VERSION <= 105500 + m_ctx = boost::any(); +#else + m_ctx.clear(); +#endif +} + task_status task_info::status() const { boost::shared_lock lock(m_mutex); diff --git a/src/io/task-info.hpp b/src/io/task-info.hpp index 0f260b8..9280806 100644 --- a/src/io/task-info.hpp +++ b/src/io/task-info.hpp @@ -88,6 +88,9 @@ struct task_info { void set_context(const boost::any& ctx); + void + clear_context(); + task_status status() const; diff --git a/src/io/task-manager.cpp b/src/io/task-manager.cpp index c92b8de..3650e86 100644 --- a/src/io/task-manager.cpp +++ b/src/io/task-manager.cpp @@ -136,6 +136,7 @@ task_manager::register_transfer_plugin(const data::resource_type t1, /// } +// XXX unused std::tuple> task_manager::create_task(iotask_type type, const auth::credentials& auth, @@ -467,48 +468,50 @@ task_manager::create_remote_initiated_task(iotask_type task_type, urd_error -task_manager::enqueue_task(io::generic_task&& t) { +task_manager::enqueue_task(io::generic_task&& tsk) { + + auto self(std::enable_shared_from_this::shared_from_this()); // helper lambda to register the completion of tasks so that we can keep track // of the consumed bandwidth by each task // N.B: we use capture-by-value here so that the task_info_ptr is valid when // the callback is invoked. - const auto completion_callback = [this, t]() { - assert(t.info()->status() == task_status::finished || - t.info()->status() == task_status::finished_with_error); + const auto completion_callback = [self, tsk]() { + assert(tsk.info()->status() == task_status::finished || + tsk.info()->status() == task_status::finished_with_error); LOGGER_DEBUG("Task {} finished [{} MiB/s]", - t.info()->id(), t.info()->bandwidth()); + tsk.info()->id(), tsk.info()->bandwidth()); - auto bw = t.info()->bandwidth(); + auto bw = tsk.info()->bandwidth(); // bw might be nan if the task did not finish correctly if(!std::isnan(bw)) { - const auto key = std::make_pair(t.info()->src_rinfo()->nsid(), - t.info()->dst_rinfo()->nsid()); + const auto key = std::make_pair(tsk.info()->src_rinfo()->nsid(), + tsk.info()->dst_rinfo()->nsid()); - if(!m_bandwidth_backlog.count(key)) { - m_bandwidth_backlog.emplace(key, - boost::circular_buffer(m_backlog_size)); + if(!self->m_bandwidth_backlog.count(key)) { + self->m_bandwidth_backlog.emplace(key, + boost::circular_buffer(self->m_backlog_size)); } - m_bandwidth_backlog.at(key).push_back(bw); + self->m_bandwidth_backlog.at(key).push_back(bw); } }; - switch(t.m_type) { + switch(tsk.m_type) { case iotask_type::remove: case iotask_type::noop: { - m_runners.submit_and_forget(t); + m_runners.submit_and_forget(tsk); break; } case iotask_type::copy: case iotask_type::move: { - m_runners.submit_with_epilog_and_forget(t, completion_callback); + m_runners.submit_with_epilog_and_forget(tsk, completion_callback); break; } @@ -519,6 +522,8 @@ task_manager::enqueue_task(io::generic_task&& t) { return urd_error::success; } +// XXX we could return the iterator here so that it can be reused for erase() +// later std::shared_ptr task_manager::find(iotask_id tid) const { @@ -533,6 +538,21 @@ task_manager::find(iotask_id tid) const { return nullptr; } +bool +task_manager::erase(iotask_id tid) { + boost::unique_lock lock(m_mutex); + + const auto it = m_task_info.find(tid); + + if(it == m_task_info.end()) { + return false; + } + + m_task_info.erase(it); + + return true; +} + io::global_stats task_manager::global_stats() const { diff --git a/src/io/task-manager.hpp b/src/io/task-manager.hpp index 03bce14..3b9fc50 100644 --- a/src/io/task-manager.hpp +++ b/src/io/task-manager.hpp @@ -112,6 +112,9 @@ struct task_manager : public std::enable_shared_from_this { std::shared_ptr find(iotask_id) const; + bool + erase(iotask_id); + template std::size_t count_if(UnaryPredicate&& p) { diff --git a/src/io/transferors/local-path-to-remote-resource.cpp b/src/io/transferors/local-path-to-remote-resource.cpp index 8399caa..16f1fe0 100644 --- a/src/io/transferors/local-path-to-remote-resource.cpp +++ b/src/io/transferors/local-path-to-remote-resource.cpp @@ -362,7 +362,7 @@ local_path_to_remote_resource_transferor::accept_transfer( // the mapped_buffer doesn't get released before completion_callback() // is called. const auto completion_callback = - [this, is_collection, tempfile, d_dst, output_buffer, start]( + [this, is_collection, tempfile, d_dst, output_buffer, start, task_info]( hermes::request&& req) { // LOGGER_CRITICAL("completion_callback invoked: {}", @@ -399,9 +399,6 @@ local_path_to_remote_resource_transferor::accept_transfer( goto respond; } - LOGGER_DEBUG("Archive {} extracted into {}", - tempfile->path(), d_dst.parent()->mount()); - goto respond; } @@ -413,6 +410,8 @@ respond: m_network_service->respond( std::move(req), out); } + + task_info->clear_context(); }; // LOGGER_CRITICAL("async_pull posted: {}", diff --git a/src/io/transferors/remote-resource-to-local-path.cpp b/src/io/transferors/remote-resource-to-local-path.cpp index 07eba07..49888eb 100644 --- a/src/io/transferors/remote-resource-to-local-path.cpp +++ b/src/io/transferors/remote-resource-to-local-path.cpp @@ -359,7 +359,7 @@ remote_resource_to_local_path_transferor::accept_transfer( // FIXME: with C++14 we could simply std::move both into the capture rather // than using shared_ptrs :/ const auto completion_callback = - [this, tempfile, input_buffer, start]( + [this, tempfile, input_buffer, start, task_info]( hermes::request&& req) { uint32_t usecs = @@ -381,6 +381,8 @@ remote_resource_to_local_path_transferor::accept_transfer( std::move(req), out); } + + task_info->clear_context(); }; m_network_service->async_push(local_buffers, diff --git a/src/logger.hpp b/src/logger.hpp index 7a04567..3b10bdf 100644 --- a/src/logger.hpp +++ b/src/logger.hpp @@ -336,6 +336,11 @@ private: #endif #define HERMES_DEBUG4(...) // LOGGER_DEBUG(__VA_ARGS__) +#ifdef HERMES_DEBUG_FLUSH +#undef HERMES_DEBUG_FLUSH +#endif +#define HERMES_DEBUG_FLUSH() LOGGER_FLUSH() + #ifdef HERMES_ERROR #undef HERMES_ERROR #endif diff --git a/src/rpcs.hpp b/src/rpcs.hpp index d487c74..96faea4 100644 --- a/src/rpcs.hpp +++ b/src/rpcs.hpp @@ -328,7 +328,6 @@ struct push_resource { m_out_resource_name.c_str()}; } - private: std::string m_in_address; std::string m_in_nsid; diff --git a/src/urd.cpp b/src/urd.cpp index 73bf73d..2a3f362 100644 --- a/src/urd.cpp +++ b/src/urd.cpp @@ -333,12 +333,15 @@ log_and_return: } -response_ptr urd::iotask_status_handler(const request_ptr base_request) const { +response_ptr +urd::iotask_status_handler(const request_ptr base_request) const { auto resp = std::make_unique(); // downcast the generic request to the concrete implementation - auto request = utils::static_unique_ptr_cast(std::move(base_request)); + auto request = + utils::static_unique_ptr_cast( + std::move(base_request)); auto task_info_ptr = m_task_mgr->find(request->get<0>()); @@ -348,12 +351,19 @@ response_ptr urd::iotask_status_handler(const request_ptr base_request) const { // stats provides a thread-safe view of a task status // (locking is done internally) resp->set<0>(task_info_ptr->stats()); + + if(task_info_ptr->status() == io::task_status::finished || + task_info_ptr->status() == io::task_status::finished_with_error) { + m_task_mgr->erase(request->get<0>()); + } + } else { resp->set_error_code(urd_error::no_such_task); } - LOGGER_INFO("IOTASK_STATUS({}) = {}", request->to_string(), resp->to_string()); + LOGGER_INFO("IOTASK_STATUS({}) = {}", + request->to_string(), resp->to_string()); return std::move(resp); } diff --git a/src/urd.hpp b/src/urd.hpp index bb2973e..e45325c 100644 --- a/src/urd.hpp +++ b/src/urd.hpp @@ -158,7 +158,7 @@ private: std::unordered_map> m_jobs; boost::shared_mutex m_jobs_mutex; - std::unique_ptr m_task_mgr; + std::shared_ptr m_task_mgr; mutable boost::shared_mutex m_task_mgr_mutex; }; -- GitLab