diff --git a/include/norns/norns_error.h b/include/norns/norns_error.h
index cc3f3b46a9e6dbb64c03a80d88983874c36e436d..e42061b1fe6908faaabfda5c283f062f2df25f13 100644
--- a/include/norns/norns_error.h
+++ b/include/norns/norns_error.h
@@ -73,10 +73,11 @@ extern "C" {
#define NORNS_ETASKSPENDING -43
/* task status */
-#define NORNS_EPENDING -100
-#define NORNS_EINPROGRESS -101
-#define NORNS_EFINISHED -102
-#define NORNS_EFINISHEDWERROR -103
+#define NORNS_EUNDEFINED -100
+#define NORNS_EPENDING -101
+#define NORNS_EINPROGRESS -102
+#define NORNS_EFINISHED -103
+#define NORNS_EFINISHEDWERROR -104
/* errors resources */
#define NORNS_ERESOURCEEXISTS -110
diff --git a/include/norns/norns_types.h b/include/norns/norns_types.h
index ee2736a193149e7ac4bb027a677a8cb0fa217e43..dc7972553641fde6546e58c4e5843a80afd83229 100644
--- a/include/norns/norns_types.h
+++ b/include/norns/norns_types.h
@@ -123,14 +123,6 @@ typedef enum {
NORNS_IOTASK_REMOVE = 0x3
} norns_op_t;
-/* Descriptor for an I/O task */
-typedef struct {
- norns_tid_t t_id; /* task identifier */
- norns_op_t t_op; /* operation to be performed */
- norns_resource_t t_src; /* source resource */
- norns_resource_t t_dst; /* destination resource */
-} norns_iotask_t;
-
/* I/O task status descriptor */
typedef struct {
norns_status_t st_status; /* task current status */
@@ -140,7 +132,16 @@ typedef struct {
size_t st_total; /* total bytes in task */
} norns_stat_t;
+/* Descriptor for an I/O task */
+typedef struct {
+ norns_tid_t t_id; /* task identifier */
+ norns_op_t t_op; /* operation to be performed */
+ norns_resource_t t_src; /* source resource */
+ norns_resource_t t_dst; /* destination resource */
+ /* Internal members */
+ norns_stat_t __t_status; /* cached task status */
+} norns_iotask_t;
/* Additional administrative types */
typedef enum {
diff --git a/lib/libnorns.c b/lib/libnorns.c
index 61a84628fae6442b06447381538e731b4c47fdd0..3f7aad2ad0d785ae94b8422aef59b61b0e44ff30 100644
--- a/lib/libnorns.c
+++ b/lib/libnorns.c
@@ -215,6 +215,14 @@ norns_error(norns_iotask_t* task, norns_stat_t* stats) {
return NORNS_EBADARGS;
}
+ // we might already have the task status cached in the task descriptor if
+ // the user called norns_wait() and the task completed
+ if(task->__t_status.st_status == NORNS_EFINISHED ||
+ task->__t_status.st_status == NORNS_EFINISHEDWERROR) {
+ *stats = task->__t_status;
+ return NORNS_SUCCESS;
+ }
+
return send_status_request(task, stats);
}
@@ -251,6 +259,10 @@ norns_wait(norns_iotask_t* task,
if(stats.st_status == NORNS_EFINISHED ||
stats.st_status == NORNS_EFINISHEDWERROR) {
+ // given that the task finished, we can save its completion status
+ // so that future calls to norns_error() can retrieve it without
+ // having to contact the server
+ task->__t_status = stats;
return NORNS_SUCCESS;
}
diff --git a/lib/libnornsctl.c b/lib/libnornsctl.c
index b3810de178c788e4930efc13d1e27130c2fc12fd..fe5191a6d2147e2e1f81a25cb11f760c73e2243c 100644
--- a/lib/libnornsctl.c
+++ b/lib/libnornsctl.c
@@ -402,6 +402,14 @@ nornsctl_error(norns_iotask_t* task,
return NORNS_EBADARGS;
}
+ // we might already have the task status cached in the task descriptor if
+ // the user called norns_wait() and the task completed
+ if(task->__t_status.st_status == NORNS_EFINISHED ||
+ task->__t_status.st_status == NORNS_EFINISHEDWERROR) {
+ *stats = task->__t_status;
+ return NORNS_SUCCESS;
+ }
+
return send_status_request(task, stats);
}
@@ -437,6 +445,10 @@ nornsctl_wait(norns_iotask_t* task,
if(stats.st_status == NORNS_EFINISHED ||
stats.st_status == NORNS_EFINISHEDWERROR) {
+ // given that the task finished, we can save its completion status
+ // so that future calls to nornsctl_error() can retrieve it without
+ // having to contact the server
+ task->__t_status = stats;
return NORNS_SUCCESS;
}
diff --git a/src/api/response.cpp b/src/api/response.cpp
index fbcfcbbe35561f0af2c1fe1f52b19a7ed2711ba7..fc2694116186fa1f4f0f6c733c7105672521465b 100644
--- a/src/api/response.cpp
+++ b/src/api/response.cpp
@@ -75,6 +75,8 @@ norns::rpc::Response_Type encode(norns::api::response_type type) {
using norns::io::task_status;
switch(status) {
+ case task_status::undefined:
+ return NORNS_EUNDEFINED;
case task_status::pending:
return NORNS_EPENDING;
case task_status::running:
diff --git a/src/io/task-info.cpp b/src/io/task-info.cpp
index 0d394ec186ab7154b26157a9472b7f2f2fd81da1..457315d310b5e73d0d2e54c890ee7421a8e828e0 100644
--- a/src/io/task-info.cpp
+++ b/src/io/task-info.cpp
@@ -25,6 +25,7 @@
* . *
*************************************************************************/
+#include
#include
#include
#include "task-stats.hpp"
@@ -124,6 +125,15 @@ task_info::set_context(const boost::any& ctx) {
m_ctx = ctx;
}
+void
+task_info::clear_context() {
+#if BOOST_VERSION <= 105500
+ m_ctx = boost::any();
+#else
+ m_ctx.clear();
+#endif
+}
+
task_status
task_info::status() const {
boost::shared_lock lock(m_mutex);
diff --git a/src/io/task-info.hpp b/src/io/task-info.hpp
index 0f260b8e4b3bcd9eef78fc9dab60f01db333c20a..92808063dfdc11d70dd44eeefb41804f4a56c87c 100644
--- a/src/io/task-info.hpp
+++ b/src/io/task-info.hpp
@@ -88,6 +88,9 @@ struct task_info {
void
set_context(const boost::any& ctx);
+ void
+ clear_context();
+
task_status
status() const;
diff --git a/src/io/task-manager.cpp b/src/io/task-manager.cpp
index c92b8ded442d00b4c030e662f60ea2d2f35b6fed..3650e8655789ab739b2f9fce61039bd4ad8c7cdd 100644
--- a/src/io/task-manager.cpp
+++ b/src/io/task-manager.cpp
@@ -136,6 +136,7 @@ task_manager::register_transfer_plugin(const data::resource_type t1,
/// }
+// XXX unused
std::tuple>
task_manager::create_task(iotask_type type,
const auth::credentials& auth,
@@ -467,48 +468,50 @@ task_manager::create_remote_initiated_task(iotask_type task_type,
urd_error
-task_manager::enqueue_task(io::generic_task&& t) {
+task_manager::enqueue_task(io::generic_task&& tsk) {
+
+ auto self(std::enable_shared_from_this::shared_from_this());
// helper lambda to register the completion of tasks so that we can keep track
// of the consumed bandwidth by each task
// N.B: we use capture-by-value here so that the task_info_ptr is valid when
// the callback is invoked.
- const auto completion_callback = [this, t]() {
- assert(t.info()->status() == task_status::finished ||
- t.info()->status() == task_status::finished_with_error);
+ const auto completion_callback = [self, tsk]() {
+ assert(tsk.info()->status() == task_status::finished ||
+ tsk.info()->status() == task_status::finished_with_error);
LOGGER_DEBUG("Task {} finished [{} MiB/s]",
- t.info()->id(), t.info()->bandwidth());
+ tsk.info()->id(), tsk.info()->bandwidth());
- auto bw = t.info()->bandwidth();
+ auto bw = tsk.info()->bandwidth();
// bw might be nan if the task did not finish correctly
if(!std::isnan(bw)) {
- const auto key = std::make_pair(t.info()->src_rinfo()->nsid(),
- t.info()->dst_rinfo()->nsid());
+ const auto key = std::make_pair(tsk.info()->src_rinfo()->nsid(),
+ tsk.info()->dst_rinfo()->nsid());
- if(!m_bandwidth_backlog.count(key)) {
- m_bandwidth_backlog.emplace(key,
- boost::circular_buffer(m_backlog_size));
+ if(!self->m_bandwidth_backlog.count(key)) {
+ self->m_bandwidth_backlog.emplace(key,
+ boost::circular_buffer(self->m_backlog_size));
}
- m_bandwidth_backlog.at(key).push_back(bw);
+ self->m_bandwidth_backlog.at(key).push_back(bw);
}
};
- switch(t.m_type) {
+ switch(tsk.m_type) {
case iotask_type::remove:
case iotask_type::noop:
{
- m_runners.submit_and_forget(t);
+ m_runners.submit_and_forget(tsk);
break;
}
case iotask_type::copy:
case iotask_type::move:
{
- m_runners.submit_with_epilog_and_forget(t, completion_callback);
+ m_runners.submit_with_epilog_and_forget(tsk, completion_callback);
break;
}
@@ -519,6 +522,8 @@ task_manager::enqueue_task(io::generic_task&& t) {
return urd_error::success;
}
+// XXX we could return the iterator here so that it can be reused for erase()
+// later
std::shared_ptr
task_manager::find(iotask_id tid) const {
@@ -533,6 +538,21 @@ task_manager::find(iotask_id tid) const {
return nullptr;
}
+bool
+task_manager::erase(iotask_id tid) {
+ boost::unique_lock lock(m_mutex);
+
+ const auto it = m_task_info.find(tid);
+
+ if(it == m_task_info.end()) {
+ return false;
+ }
+
+ m_task_info.erase(it);
+
+ return true;
+}
+
io::global_stats
task_manager::global_stats() const {
diff --git a/src/io/task-manager.hpp b/src/io/task-manager.hpp
index 03bce143709da92f0198f032fa59a15df4685612..3b9fc50b77a943b91e523be443fb0006da4f9875 100644
--- a/src/io/task-manager.hpp
+++ b/src/io/task-manager.hpp
@@ -112,6 +112,9 @@ struct task_manager : public std::enable_shared_from_this {
std::shared_ptr
find(iotask_id) const;
+ bool
+ erase(iotask_id);
+
template
std::size_t
count_if(UnaryPredicate&& p) {
diff --git a/src/io/transferors/local-path-to-remote-resource.cpp b/src/io/transferors/local-path-to-remote-resource.cpp
index 8399caa406f079c29f48d9690b475e2d8fab5abc..16f1fe02a936c20496f28b9b2e4826ed5dd5e151 100644
--- a/src/io/transferors/local-path-to-remote-resource.cpp
+++ b/src/io/transferors/local-path-to-remote-resource.cpp
@@ -362,7 +362,7 @@ local_path_to_remote_resource_transferor::accept_transfer(
// the mapped_buffer doesn't get released before completion_callback()
// is called.
const auto completion_callback =
- [this, is_collection, tempfile, d_dst, output_buffer, start](
+ [this, is_collection, tempfile, d_dst, output_buffer, start, task_info](
hermes::request&& req) {
// LOGGER_CRITICAL("completion_callback invoked: {}",
@@ -399,9 +399,6 @@ local_path_to_remote_resource_transferor::accept_transfer(
goto respond;
}
- LOGGER_DEBUG("Archive {} extracted into {}",
- tempfile->path(), d_dst.parent()->mount());
-
goto respond;
}
@@ -413,6 +410,8 @@ respond:
m_network_service->respond(
std::move(req), out);
}
+
+ task_info->clear_context();
};
// LOGGER_CRITICAL("async_pull posted: {}",
diff --git a/src/io/transferors/remote-resource-to-local-path.cpp b/src/io/transferors/remote-resource-to-local-path.cpp
index 07eba07e89320c6d0fc54c7540a5bc459d82b8c9..49888ebc916546b062730bca1f96c1c200f56022 100644
--- a/src/io/transferors/remote-resource-to-local-path.cpp
+++ b/src/io/transferors/remote-resource-to-local-path.cpp
@@ -359,7 +359,7 @@ remote_resource_to_local_path_transferor::accept_transfer(
// FIXME: with C++14 we could simply std::move both into the capture rather
// than using shared_ptrs :/
const auto completion_callback =
- [this, tempfile, input_buffer, start](
+ [this, tempfile, input_buffer, start, task_info](
hermes::request&& req) {
uint32_t usecs =
@@ -381,6 +381,8 @@ remote_resource_to_local_path_transferor::accept_transfer(
std::move(req),
out);
}
+
+ task_info->clear_context();
};
m_network_service->async_push(local_buffers,
diff --git a/src/logger.hpp b/src/logger.hpp
index 7a04567306a7ae9420ca1eeb0de912ed44c8f566..3b10bdf0c9aaea9fcaf3f83617081f8acd40382e 100644
--- a/src/logger.hpp
+++ b/src/logger.hpp
@@ -336,6 +336,11 @@ private:
#endif
#define HERMES_DEBUG4(...) // LOGGER_DEBUG(__VA_ARGS__)
+#ifdef HERMES_DEBUG_FLUSH
+#undef HERMES_DEBUG_FLUSH
+#endif
+#define HERMES_DEBUG_FLUSH() LOGGER_FLUSH()
+
#ifdef HERMES_ERROR
#undef HERMES_ERROR
#endif
diff --git a/src/rpcs.hpp b/src/rpcs.hpp
index d487c743de943e47b920866bb6d96d4d646848b2..96faea4805f9e1cf74a7a2c183d98516a0873cf5 100644
--- a/src/rpcs.hpp
+++ b/src/rpcs.hpp
@@ -328,7 +328,6 @@ struct push_resource {
m_out_resource_name.c_str()};
}
-
private:
std::string m_in_address;
std::string m_in_nsid;
diff --git a/src/urd.cpp b/src/urd.cpp
index 73bf73d0de65ac9bccea154d0a9b99f25de4236f..2a3f362acf4ec8cec79ccb0cfd001141a448bacd 100644
--- a/src/urd.cpp
+++ b/src/urd.cpp
@@ -333,12 +333,15 @@ log_and_return:
}
-response_ptr urd::iotask_status_handler(const request_ptr base_request) const {
+response_ptr
+urd::iotask_status_handler(const request_ptr base_request) const {
auto resp = std::make_unique();
// downcast the generic request to the concrete implementation
- auto request = utils::static_unique_ptr_cast(std::move(base_request));
+ auto request =
+ utils::static_unique_ptr_cast(
+ std::move(base_request));
auto task_info_ptr = m_task_mgr->find(request->get<0>());
@@ -348,12 +351,19 @@ response_ptr urd::iotask_status_handler(const request_ptr base_request) const {
// stats provides a thread-safe view of a task status
// (locking is done internally)
resp->set<0>(task_info_ptr->stats());
+
+ if(task_info_ptr->status() == io::task_status::finished ||
+ task_info_ptr->status() == io::task_status::finished_with_error) {
+ m_task_mgr->erase(request->get<0>());
+ }
+
}
else {
resp->set_error_code(urd_error::no_such_task);
}
- LOGGER_INFO("IOTASK_STATUS({}) = {}", request->to_string(), resp->to_string());
+ LOGGER_INFO("IOTASK_STATUS({}) = {}",
+ request->to_string(), resp->to_string());
return std::move(resp);
}
diff --git a/src/urd.hpp b/src/urd.hpp
index bb2973ec7ee161ec094cfd6c5dd0d4fb4e568f2f..e45325cdfcb1ef2463930c4989b91caf94c0f214 100644
--- a/src/urd.hpp
+++ b/src/urd.hpp
@@ -158,7 +158,7 @@ private:
std::unordered_map> m_jobs;
boost::shared_mutex m_jobs_mutex;
- std::unique_ptr m_task_mgr;
+ std::shared_ptr m_task_mgr;
mutable boost::shared_mutex m_task_mgr_mutex;
};