diff --git a/include/norns/norns_error.h b/include/norns/norns_error.h index cc3f3b46a9e6dbb64c03a80d88983874c36e436d..e42061b1fe6908faaabfda5c283f062f2df25f13 100644 --- a/include/norns/norns_error.h +++ b/include/norns/norns_error.h @@ -73,10 +73,11 @@ extern "C" { #define NORNS_ETASKSPENDING -43 /* task status */ -#define NORNS_EPENDING -100 -#define NORNS_EINPROGRESS -101 -#define NORNS_EFINISHED -102 -#define NORNS_EFINISHEDWERROR -103 +#define NORNS_EUNDEFINED -100 +#define NORNS_EPENDING -101 +#define NORNS_EINPROGRESS -102 +#define NORNS_EFINISHED -103 +#define NORNS_EFINISHEDWERROR -104 /* errors resources */ #define NORNS_ERESOURCEEXISTS -110 diff --git a/include/norns/norns_types.h b/include/norns/norns_types.h index ee2736a193149e7ac4bb027a677a8cb0fa217e43..dc7972553641fde6546e58c4e5843a80afd83229 100644 --- a/include/norns/norns_types.h +++ b/include/norns/norns_types.h @@ -123,14 +123,6 @@ typedef enum { NORNS_IOTASK_REMOVE = 0x3 } norns_op_t; -/* Descriptor for an I/O task */ -typedef struct { - norns_tid_t t_id; /* task identifier */ - norns_op_t t_op; /* operation to be performed */ - norns_resource_t t_src; /* source resource */ - norns_resource_t t_dst; /* destination resource */ -} norns_iotask_t; - /* I/O task status descriptor */ typedef struct { norns_status_t st_status; /* task current status */ @@ -140,7 +132,16 @@ typedef struct { size_t st_total; /* total bytes in task */ } norns_stat_t; +/* Descriptor for an I/O task */ +typedef struct { + norns_tid_t t_id; /* task identifier */ + norns_op_t t_op; /* operation to be performed */ + norns_resource_t t_src; /* source resource */ + norns_resource_t t_dst; /* destination resource */ + /* Internal members */ + norns_stat_t __t_status; /* cached task status */ +} norns_iotask_t; /* Additional administrative types */ typedef enum { diff --git a/lib/libnorns.c b/lib/libnorns.c index 61a84628fae6442b06447381538e731b4c47fdd0..3f7aad2ad0d785ae94b8422aef59b61b0e44ff30 100644 --- a/lib/libnorns.c +++ b/lib/libnorns.c @@ -215,6 +215,14 @@ norns_error(norns_iotask_t* task, norns_stat_t* stats) { return NORNS_EBADARGS; } + // we might already have the task status cached in the task descriptor if + // the user called norns_wait() and the task completed + if(task->__t_status.st_status == NORNS_EFINISHED || + task->__t_status.st_status == NORNS_EFINISHEDWERROR) { + *stats = task->__t_status; + return NORNS_SUCCESS; + } + return send_status_request(task, stats); } @@ -251,6 +259,10 @@ norns_wait(norns_iotask_t* task, if(stats.st_status == NORNS_EFINISHED || stats.st_status == NORNS_EFINISHEDWERROR) { + // given that the task finished, we can save its completion status + // so that future calls to norns_error() can retrieve it without + // having to contact the server + task->__t_status = stats; return NORNS_SUCCESS; } diff --git a/lib/libnornsctl.c b/lib/libnornsctl.c index b3810de178c788e4930efc13d1e27130c2fc12fd..fe5191a6d2147e2e1f81a25cb11f760c73e2243c 100644 --- a/lib/libnornsctl.c +++ b/lib/libnornsctl.c @@ -402,6 +402,14 @@ nornsctl_error(norns_iotask_t* task, return NORNS_EBADARGS; } + // we might already have the task status cached in the task descriptor if + // the user called norns_wait() and the task completed + if(task->__t_status.st_status == NORNS_EFINISHED || + task->__t_status.st_status == NORNS_EFINISHEDWERROR) { + *stats = task->__t_status; + return NORNS_SUCCESS; + } + return send_status_request(task, stats); } @@ -437,6 +445,10 @@ nornsctl_wait(norns_iotask_t* task, if(stats.st_status == NORNS_EFINISHED || stats.st_status == NORNS_EFINISHEDWERROR) { + // given that the task finished, we can save its completion status + // so that future calls to nornsctl_error() can retrieve it without + // having to contact the server + task->__t_status = stats; return NORNS_SUCCESS; } diff --git a/src/api/response.cpp b/src/api/response.cpp index fbcfcbbe35561f0af2c1fe1f52b19a7ed2711ba7..fc2694116186fa1f4f0f6c733c7105672521465b 100644 --- a/src/api/response.cpp +++ b/src/api/response.cpp @@ -75,6 +75,8 @@ norns::rpc::Response_Type encode(norns::api::response_type type) { using norns::io::task_status; switch(status) { + case task_status::undefined: + return NORNS_EUNDEFINED; case task_status::pending: return NORNS_EPENDING; case task_status::running: diff --git a/src/io/task-info.cpp b/src/io/task-info.cpp index 0d394ec186ab7154b26157a9472b7f2f2fd81da1..457315d310b5e73d0d2e54c890ee7421a8e828e0 100644 --- a/src/io/task-info.cpp +++ b/src/io/task-info.cpp @@ -25,6 +25,7 @@ * . * *************************************************************************/ +#include #include #include #include "task-stats.hpp" @@ -124,6 +125,15 @@ task_info::set_context(const boost::any& ctx) { m_ctx = ctx; } +void +task_info::clear_context() { +#if BOOST_VERSION <= 105500 + m_ctx = boost::any(); +#else + m_ctx.clear(); +#endif +} + task_status task_info::status() const { boost::shared_lock lock(m_mutex); diff --git a/src/io/task-info.hpp b/src/io/task-info.hpp index 0f260b8e4b3bcd9eef78fc9dab60f01db333c20a..92808063dfdc11d70dd44eeefb41804f4a56c87c 100644 --- a/src/io/task-info.hpp +++ b/src/io/task-info.hpp @@ -88,6 +88,9 @@ struct task_info { void set_context(const boost::any& ctx); + void + clear_context(); + task_status status() const; diff --git a/src/io/task-manager.cpp b/src/io/task-manager.cpp index c92b8ded442d00b4c030e662f60ea2d2f35b6fed..3650e8655789ab739b2f9fce61039bd4ad8c7cdd 100644 --- a/src/io/task-manager.cpp +++ b/src/io/task-manager.cpp @@ -136,6 +136,7 @@ task_manager::register_transfer_plugin(const data::resource_type t1, /// } +// XXX unused std::tuple> task_manager::create_task(iotask_type type, const auth::credentials& auth, @@ -467,48 +468,50 @@ task_manager::create_remote_initiated_task(iotask_type task_type, urd_error -task_manager::enqueue_task(io::generic_task&& t) { +task_manager::enqueue_task(io::generic_task&& tsk) { + + auto self(std::enable_shared_from_this::shared_from_this()); // helper lambda to register the completion of tasks so that we can keep track // of the consumed bandwidth by each task // N.B: we use capture-by-value here so that the task_info_ptr is valid when // the callback is invoked. - const auto completion_callback = [this, t]() { - assert(t.info()->status() == task_status::finished || - t.info()->status() == task_status::finished_with_error); + const auto completion_callback = [self, tsk]() { + assert(tsk.info()->status() == task_status::finished || + tsk.info()->status() == task_status::finished_with_error); LOGGER_DEBUG("Task {} finished [{} MiB/s]", - t.info()->id(), t.info()->bandwidth()); + tsk.info()->id(), tsk.info()->bandwidth()); - auto bw = t.info()->bandwidth(); + auto bw = tsk.info()->bandwidth(); // bw might be nan if the task did not finish correctly if(!std::isnan(bw)) { - const auto key = std::make_pair(t.info()->src_rinfo()->nsid(), - t.info()->dst_rinfo()->nsid()); + const auto key = std::make_pair(tsk.info()->src_rinfo()->nsid(), + tsk.info()->dst_rinfo()->nsid()); - if(!m_bandwidth_backlog.count(key)) { - m_bandwidth_backlog.emplace(key, - boost::circular_buffer(m_backlog_size)); + if(!self->m_bandwidth_backlog.count(key)) { + self->m_bandwidth_backlog.emplace(key, + boost::circular_buffer(self->m_backlog_size)); } - m_bandwidth_backlog.at(key).push_back(bw); + self->m_bandwidth_backlog.at(key).push_back(bw); } }; - switch(t.m_type) { + switch(tsk.m_type) { case iotask_type::remove: case iotask_type::noop: { - m_runners.submit_and_forget(t); + m_runners.submit_and_forget(tsk); break; } case iotask_type::copy: case iotask_type::move: { - m_runners.submit_with_epilog_and_forget(t, completion_callback); + m_runners.submit_with_epilog_and_forget(tsk, completion_callback); break; } @@ -519,6 +522,8 @@ task_manager::enqueue_task(io::generic_task&& t) { return urd_error::success; } +// XXX we could return the iterator here so that it can be reused for erase() +// later std::shared_ptr task_manager::find(iotask_id tid) const { @@ -533,6 +538,21 @@ task_manager::find(iotask_id tid) const { return nullptr; } +bool +task_manager::erase(iotask_id tid) { + boost::unique_lock lock(m_mutex); + + const auto it = m_task_info.find(tid); + + if(it == m_task_info.end()) { + return false; + } + + m_task_info.erase(it); + + return true; +} + io::global_stats task_manager::global_stats() const { diff --git a/src/io/task-manager.hpp b/src/io/task-manager.hpp index 03bce143709da92f0198f032fa59a15df4685612..3b9fc50b77a943b91e523be443fb0006da4f9875 100644 --- a/src/io/task-manager.hpp +++ b/src/io/task-manager.hpp @@ -112,6 +112,9 @@ struct task_manager : public std::enable_shared_from_this { std::shared_ptr find(iotask_id) const; + bool + erase(iotask_id); + template std::size_t count_if(UnaryPredicate&& p) { diff --git a/src/io/transferors/local-path-to-remote-resource.cpp b/src/io/transferors/local-path-to-remote-resource.cpp index 8399caa406f079c29f48d9690b475e2d8fab5abc..16f1fe02a936c20496f28b9b2e4826ed5dd5e151 100644 --- a/src/io/transferors/local-path-to-remote-resource.cpp +++ b/src/io/transferors/local-path-to-remote-resource.cpp @@ -362,7 +362,7 @@ local_path_to_remote_resource_transferor::accept_transfer( // the mapped_buffer doesn't get released before completion_callback() // is called. const auto completion_callback = - [this, is_collection, tempfile, d_dst, output_buffer, start]( + [this, is_collection, tempfile, d_dst, output_buffer, start, task_info]( hermes::request&& req) { // LOGGER_CRITICAL("completion_callback invoked: {}", @@ -399,9 +399,6 @@ local_path_to_remote_resource_transferor::accept_transfer( goto respond; } - LOGGER_DEBUG("Archive {} extracted into {}", - tempfile->path(), d_dst.parent()->mount()); - goto respond; } @@ -413,6 +410,8 @@ respond: m_network_service->respond( std::move(req), out); } + + task_info->clear_context(); }; // LOGGER_CRITICAL("async_pull posted: {}", diff --git a/src/io/transferors/remote-resource-to-local-path.cpp b/src/io/transferors/remote-resource-to-local-path.cpp index 07eba07e89320c6d0fc54c7540a5bc459d82b8c9..49888ebc916546b062730bca1f96c1c200f56022 100644 --- a/src/io/transferors/remote-resource-to-local-path.cpp +++ b/src/io/transferors/remote-resource-to-local-path.cpp @@ -359,7 +359,7 @@ remote_resource_to_local_path_transferor::accept_transfer( // FIXME: with C++14 we could simply std::move both into the capture rather // than using shared_ptrs :/ const auto completion_callback = - [this, tempfile, input_buffer, start]( + [this, tempfile, input_buffer, start, task_info]( hermes::request&& req) { uint32_t usecs = @@ -381,6 +381,8 @@ remote_resource_to_local_path_transferor::accept_transfer( std::move(req), out); } + + task_info->clear_context(); }; m_network_service->async_push(local_buffers, diff --git a/src/logger.hpp b/src/logger.hpp index 7a04567306a7ae9420ca1eeb0de912ed44c8f566..3b10bdf0c9aaea9fcaf3f83617081f8acd40382e 100644 --- a/src/logger.hpp +++ b/src/logger.hpp @@ -336,6 +336,11 @@ private: #endif #define HERMES_DEBUG4(...) // LOGGER_DEBUG(__VA_ARGS__) +#ifdef HERMES_DEBUG_FLUSH +#undef HERMES_DEBUG_FLUSH +#endif +#define HERMES_DEBUG_FLUSH() LOGGER_FLUSH() + #ifdef HERMES_ERROR #undef HERMES_ERROR #endif diff --git a/src/rpcs.hpp b/src/rpcs.hpp index d487c743de943e47b920866bb6d96d4d646848b2..96faea4805f9e1cf74a7a2c183d98516a0873cf5 100644 --- a/src/rpcs.hpp +++ b/src/rpcs.hpp @@ -328,7 +328,6 @@ struct push_resource { m_out_resource_name.c_str()}; } - private: std::string m_in_address; std::string m_in_nsid; diff --git a/src/urd.cpp b/src/urd.cpp index 73bf73d0de65ac9bccea154d0a9b99f25de4236f..2a3f362acf4ec8cec79ccb0cfd001141a448bacd 100644 --- a/src/urd.cpp +++ b/src/urd.cpp @@ -333,12 +333,15 @@ log_and_return: } -response_ptr urd::iotask_status_handler(const request_ptr base_request) const { +response_ptr +urd::iotask_status_handler(const request_ptr base_request) const { auto resp = std::make_unique(); // downcast the generic request to the concrete implementation - auto request = utils::static_unique_ptr_cast(std::move(base_request)); + auto request = + utils::static_unique_ptr_cast( + std::move(base_request)); auto task_info_ptr = m_task_mgr->find(request->get<0>()); @@ -348,12 +351,19 @@ response_ptr urd::iotask_status_handler(const request_ptr base_request) const { // stats provides a thread-safe view of a task status // (locking is done internally) resp->set<0>(task_info_ptr->stats()); + + if(task_info_ptr->status() == io::task_status::finished || + task_info_ptr->status() == io::task_status::finished_with_error) { + m_task_mgr->erase(request->get<0>()); + } + } else { resp->set_error_code(urd_error::no_such_task); } - LOGGER_INFO("IOTASK_STATUS({}) = {}", request->to_string(), resp->to_string()); + LOGGER_INFO("IOTASK_STATUS({}) = {}", + request->to_string(), resp->to_string()); return std::move(resp); } diff --git a/src/urd.hpp b/src/urd.hpp index bb2973ec7ee161ec094cfd6c5dd0d4fb4e568f2f..e45325cdfcb1ef2463930c4989b91caf94c0f214 100644 --- a/src/urd.hpp +++ b/src/urd.hpp @@ -158,7 +158,7 @@ private: std::unordered_map> m_jobs; boost::shared_mutex m_jobs_mutex; - std::unique_ptr m_task_mgr; + std::shared_ptr m_task_mgr; mutable boost::shared_mutex m_task_mgr_mutex; };