diff --git a/include/norns/norns_error.h b/include/norns/norns_error.h index d9d5b830686403b94a642fa685bcee390ec9e939..bbb23dfe5fad69e5317ebe313ab971e1f3f35bcf 100644 --- a/include/norns/norns_error.h +++ b/include/norns/norns_error.h @@ -63,11 +63,13 @@ extern "C" { /* errors about namespaces */ #define NORNS_ENAMESPACEEXISTS -30 #define NORNS_ENOSUCHNAMESPACE -31 +#define NORNS_ENAMESPACENOTEMPTY -32 /* errors about tasks */ #define NORNS_ETASKEXISTS -40 #define NORNS_ENOSUCHTASK -41 #define NORNS_ETOOMANYTASKS -42 +#define NORNS_ETASKSPENDING -43 /* task status */ #define NORNS_EPENDING -100 diff --git a/include/norns/nornsctl_types.h b/include/norns/nornsctl_types.h index 07b8fb9cbc3f7dba88458eb175b5d364e38b32c5..f5aa133155e7ae213b5aa0b4d48c22c73a00568c 100644 --- a/include/norns/nornsctl_types.h +++ b/include/norns/nornsctl_types.h @@ -40,9 +40,10 @@ typedef uint32_t nornsctl_backend_flags_t; /* Administrative command IDs valid for nornsctl_send_command() */ typedef enum { - NORNSCTL_COMMAND_PING = 1000, - NORNSCTL_COMMAND_PAUSE_ACCEPT, - NORNSCTL_COMMAND_RESUME_ACCEPT, + NORNSCTL_CMD_PING = 1000, + NORNSCTL_CMD_PAUSE_LISTEN, + NORNSCTL_CMD_RESUME_LISTEN, + NORNSCTL_CMD_SHUTDOWN, } nornsctl_command_t; diff --git a/lib/communication.c b/lib/communication.c index fc6359cf1f84a02b323ba96137ce7b51b8c89eba..59bfc984f518586cc341e927db8e6a9f332bf068 100644 --- a/lib/communication.c +++ b/lib/communication.c @@ -98,7 +98,7 @@ send_control_command_request(nornsctl_command_t cmd, void* args) { return NORNS_ESNAFU; } - return NORNS_SUCCESS; + return resp.r_error_code; } diff --git a/lib/errors.c b/lib/errors.c index 91b407ef66aa7d6f26113297780733a8efe935af..8c1b274277bf3c16a75f33a1f3b46b2f9eed6b59 100644 --- a/lib/errors.c +++ b/lib/errors.c @@ -53,11 +53,13 @@ const char* const norns_errlist[NORNS_ERRMAX + 1] = { /* backend errors */ [ERR_REMAP(NORNS_ENAMESPACEEXISTS)] = "Namespace already exists", [ERR_REMAP(NORNS_ENOSUCHNAMESPACE)] = "Namespace does not exist", + [ERR_REMAP(NORNS_ENAMESPACENOTEMPTY)] = "Namespace is not empty", /* task errors */ [ERR_REMAP(NORNS_ETASKEXISTS)] = "Task already exists", [ERR_REMAP(NORNS_ENOSUCHTASK)] = "Task does not exist", [ERR_REMAP(NORNS_ETOOMANYTASKS)] = "Too many pending tasks", + [ERR_REMAP(NORNS_ETASKSPENDING)] = "There are still pending tasks", /* misc errors */ [ERR_REMAP(NORNS_ENOTSUPPORTED)] = "Not supported", diff --git a/src/Makefile.am b/src/Makefile.am index 62512391affbb9a297796463810688a144999a88..c939cdcaa1efd7b435d9c68a15b661f872bd1a8b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -215,23 +215,24 @@ config/defaults.cpp: Makefile echo "namespace norns {"; \ echo "namespace config {"; \ echo "namespace defaults {"; \ - echo " const char* progname = \"urd\";"; \ - echo " const bool daemonize = true;"; \ - echo " const bool use_syslog = false;"; \ - echo " const bfs::path log_file = boost::filesystem::path();"; \ + echo " const char* progname = \"urd\";"; \ + echo " const bool daemonize = true;"; \ + echo " const bool use_syslog = false;"; \ + echo " const bfs::path log_file = boost::filesystem::path();"; \ echo " const uint32_t log_file_max_size = static_cast(16*1024*1024);"; \ echo ""; \ \ - echo " const bool dry_run = false;"; \ + echo " const bool dry_run = false;"; \ + echo " const uint32_t dry_run_duration = 100;"; \ \ - echo " const char* global_socket = \"$(localstatedir)/global.socket.2\";"; \ - echo " const char* control_socket = \"$(localstatedir)/control.socket.2\";"; \ - echo " const in_port_t remote_port = 42000;"; \ - echo " const char* pidfile = \"$(localstatedir)/urd.pid\";"; \ + echo " const char* global_socket = \"$(localstatedir)/global.socket.2\";"; \ + echo " const char* control_socket = \"$(localstatedir)/control.socket.2\";"; \ + echo " const in_port_t remote_port = 42000;"; \ + echo " const char* pidfile = \"$(localstatedir)/urd.pid\";"; \ \ - echo " const uint32_t workers_in_pool = std::thread::hardware_concurrency();"; \ - echo " const uint32_t backlog_size = 128;"; \ - echo " const char* config_file = \"$(sysconfdir)/norns.conf\";"; \ + echo " const uint32_t workers_in_pool = std::thread::hardware_concurrency();"; \ + echo " const uint32_t backlog_size = 128;"; \ + echo " const char* config_file = \"$(sysconfdir)/norns.conf\";"; \ echo "} // namespace defaults"; \ echo "} // namespace config"; \ echo "} // namespace norns"; \ diff --git a/src/api/request.cpp b/src/api/request.cpp index ead59182b01c3df4a6719b6cd5a592a822b75c46..c0328523dcd34c84381b4b6fc726b6f88c0e8b2f 100644 --- a/src/api/request.cpp +++ b/src/api/request.cpp @@ -79,12 +79,14 @@ norns::command_type decode_command(::google::protobuf::uint32 type) { using norns::command_type; switch(type) { - case NORNSCTL_COMMAND_PING: + case NORNSCTL_CMD_PING: return command_type::ping; - case NORNSCTL_COMMAND_PAUSE_ACCEPT: - return command_type::pause_accept; - case NORNSCTL_COMMAND_RESUME_ACCEPT: - return command_type::resume_accept; + case NORNSCTL_CMD_PAUSE_LISTEN: + return command_type::pause_listen; + case NORNSCTL_CMD_RESUME_LISTEN: + return command_type::resume_listen; + case NORNSCTL_CMD_SHUTDOWN: + return command_type::shutdown; default: return command_type::unknown; } @@ -427,10 +429,12 @@ std::string command_request::to_string() const { switch(this->get<0>()) { case command_type::ping: return "PING"; - case command_type::pause_accept: - return "PAUSE_ACCEPT"; - case command_type::resume_accept: - return "RESUME_ACCEPT"; + case command_type::pause_listen: + return "PAUSE_LISTEN"; + case command_type::resume_listen: + return "RESUME_LISTEN"; + case command_type::shutdown: + return "SHUTDOWN"; default: return "UNKNOWN"; } diff --git a/src/backends/backend-base.hpp b/src/backends/backend-base.hpp index b6a2ab7aa9b2c6aa7d78fdce40f74dd4750a6b46..cbe7ee6bf006d6bb7aebcab91f1e83e4e3014ad0 100644 --- a/src/backends/backend-base.hpp +++ b/src/backends/backend-base.hpp @@ -60,6 +60,7 @@ public: virtual ~backend() {}; virtual bool is_tracked() const = 0; + virtual bool is_empty() const = 0; virtual bfs::path mount() const = 0; virtual uint32_t quota() const = 0; diff --git a/src/backends/lustre-fs.cpp b/src/backends/lustre-fs.cpp index 97e55549a162d7ef36fa71d120daa120fa82a594..5e614ae5818b26b56ba6a0ed91554b6dc3a8db39 100644 --- a/src/backends/lustre-fs.cpp +++ b/src/backends/lustre-fs.cpp @@ -43,6 +43,11 @@ lustre::is_tracked() const { return m_track; } +bool +lustre::is_empty() const { + return false; +} + bfs::path lustre::mount() const { return m_mount; } diff --git a/src/backends/lustre-fs.hpp b/src/backends/lustre-fs.hpp index 2c0b7c3c530667d11bfa8bd034a26f337e3aaf02..7489f89fd73066f87d5e45527ff257227714b93d 100644 --- a/src/backends/lustre-fs.hpp +++ b/src/backends/lustre-fs.hpp @@ -44,6 +44,7 @@ public: lustre(bool track, const bfs::path& mount, uint32_t quota); bool is_tracked() const override final; + bool is_empty() const override final; bfs::path mount() const override final; uint32_t quota() const override final; diff --git a/src/backends/nvml-dax.cpp b/src/backends/nvml-dax.cpp index 461bcf8c79da4ff2c0685f7799aa477197b12eaa..6b08824ab3e06935190b94d0cd6d3906f3d3e3a8 100644 --- a/src/backends/nvml-dax.cpp +++ b/src/backends/nvml-dax.cpp @@ -43,6 +43,11 @@ nvml_dax::is_tracked() const { return m_track; } +bool +nvml_dax::is_empty() const { + return false; +} + bfs::path nvml_dax::mount() const { return m_mount; } diff --git a/src/backends/nvml-dax.hpp b/src/backends/nvml-dax.hpp index 6ccf3cd7ed1fbc390c82072c366f9b74ece1ef38..76b0cbe2cea3f604bba0a618e83df54b2047d274 100644 --- a/src/backends/nvml-dax.hpp +++ b/src/backends/nvml-dax.hpp @@ -43,6 +43,7 @@ public: nvml_dax(bool track, const bfs::path& mount, uint32_t quota); bool is_tracked() const override final; + bool is_empty() const override final; bfs::path mount() const override final; uint32_t quota() const override final; diff --git a/src/backends/posix-fs.cpp b/src/backends/posix-fs.cpp index 6e105d3d99e3c5ac7326d43466dadc8372fbdd54..0db04a6a4e94b4d3ffe33dbda541661f2fb37594 100644 --- a/src/backends/posix-fs.cpp +++ b/src/backends/posix-fs.cpp @@ -66,6 +66,12 @@ posix_filesystem::is_tracked() const { return m_track; } +bool +posix_filesystem::is_empty() const { + return (bfs::recursive_directory_iterator(m_mount) == + bfs::recursive_directory_iterator()); +} + bfs::path posix_filesystem::mount() const { return m_mount; } diff --git a/src/backends/posix-fs.hpp b/src/backends/posix-fs.hpp index fe0ea480b68f119e700d71137aa0fa376a6f4e8b..23cdbb597a4b89b1a946a5c0d55a9db49789bc83 100644 --- a/src/backends/posix-fs.hpp +++ b/src/backends/posix-fs.hpp @@ -44,6 +44,7 @@ public: posix_filesystem(bool track, const bfs::path& mount, uint32_t quota); bool is_tracked() const override final; + bool is_empty() const override final; bfs::path mount() const override final; uint32_t quota() const override final; diff --git a/src/backends/process-memory.cpp b/src/backends/process-memory.cpp index 2ad5c8948d1781610bf114f98ba173429bc1a2c8..f2a05b2c1c8a7b69b18f414e120f69639b2215e4 100644 --- a/src/backends/process-memory.cpp +++ b/src/backends/process-memory.cpp @@ -40,6 +40,10 @@ process_memory::is_tracked() const { return false; } +bool +process_memory::is_empty() const { + return false; +} bfs::path process_memory::mount() const { return ""; diff --git a/src/backends/process-memory.hpp b/src/backends/process-memory.hpp index cd08a8b64e93ef874ae61fb5033384c725a94f1d..d3727afc26a5c69bff41c0ab69bb20e8e3d2e2a9 100644 --- a/src/backends/process-memory.hpp +++ b/src/backends/process-memory.hpp @@ -45,6 +45,7 @@ public: process_memory(); bool is_tracked() const override final; + bool is_empty() const override final; bfs::path mount() const override final; uint32_t quota() const override final; diff --git a/src/backends/remote-backend.cpp b/src/backends/remote-backend.cpp index e5d11ac8ddd2489f2054952f8ff7a6eb93644bfd..d5ae203cd6eca86e2985561827ccdacfb8cefdcf 100644 --- a/src/backends/remote-backend.cpp +++ b/src/backends/remote-backend.cpp @@ -40,6 +40,11 @@ remote_backend::is_tracked() const { return false; } +bool +remote_backend::is_empty() const { + return false; +} + bfs::path remote_backend::mount() const { return ""; } diff --git a/src/backends/remote-backend.hpp b/src/backends/remote-backend.hpp index 1f4e52be0dd9ff0ac648aeb438de46513c4fe3f7..3e944bc32cab4362b4f1c00de3d8707752c2cec0 100644 --- a/src/backends/remote-backend.hpp +++ b/src/backends/remote-backend.hpp @@ -44,6 +44,7 @@ public: remote_backend(); bool is_tracked() const override final; + bool is_empty() const override final; bfs::path mount() const override final; uint32_t quota() const override final; diff --git a/src/common/types.cpp b/src/common/types.cpp index f105e4c0867f0e6a5177c0c4de4a22aa29a6b606..2edaa8d8a43bbc865a61bbf0a0615209ff3d02a6 100644 --- a/src/common/types.cpp +++ b/src/common/types.cpp @@ -95,12 +95,16 @@ std::string to_string(urd_error ecode) { return "NORNS_ENAMESPACEEXISTS"; case urd_error::no_such_namespace: return "NORNS_ENOSUCHNAMESPACE"; + case urd_error::namespace_not_empty: + return "NORNS_ENAMESPACENOTEMPTY"; case urd_error::task_exists: return "NORNS_ETASKEXISTS"; case urd_error::no_such_task: return "NORNS_ENOSUCHTASK"; case urd_error::too_many_tasks: return "NORNS_ETOOMANYTASKS"; + case urd_error::tasks_pending: + return "NORNS_ETASKSPENDING"; case urd_error::accept_paused: return "NORNS_EACCEPTPAUSED"; default: diff --git a/src/common/types.hpp b/src/common/types.hpp index d2f55fe67955a2ed65888ff9d19e0cb3b288b6a6..098ad00343bd4778de75923220478aad2bb492d3 100644 --- a/src/common/types.hpp +++ b/src/common/types.hpp @@ -64,8 +64,9 @@ enum class backend_type { enum class command_type { ping, - pause_accept, - resume_accept, + pause_listen, + resume_listen, + shutdown, unknown }; @@ -98,11 +99,13 @@ enum class urd_error : norns_error_t { /* errors about backends */ namespace_exists = NORNS_ENAMESPACEEXISTS, no_such_namespace = NORNS_ENOSUCHNAMESPACE, + namespace_not_empty = NORNS_ENAMESPACENOTEMPTY, /* errors about tasks */ task_exists = NORNS_ETASKEXISTS, no_such_task = NORNS_ENOSUCHTASK, too_many_tasks = NORNS_ETOOMANYTASKS, + tasks_pending = NORNS_ETASKSPENDING, }; namespace utils { diff --git a/src/config/defaults.hpp b/src/config/defaults.hpp index e63fe6614a9906e0500d09e392183d3190b246fc..41de6fb18ac9c5e260e694bfdae967c1bdf09621 100644 --- a/src/config/defaults.hpp +++ b/src/config/defaults.hpp @@ -44,6 +44,7 @@ namespace defaults { extern const bfs::path log_file; extern const uint32_t log_file_max_size; extern const bool dry_run; + extern const uint32_t dry_run_duration; extern const char* global_socket; extern const char* control_socket; extern const in_port_t remote_port; diff --git a/src/config/settings.cpp b/src/config/settings.cpp index a7af16a21e84a7962dd1a2d448bd490de8c60793..b176f2644ba5cb7d4cc549ee992418258e626f2c 100644 --- a/src/config/settings.cpp +++ b/src/config/settings.cpp @@ -47,7 +47,8 @@ namespace config { settings::settings() { } settings::settings(const std::string& progname, bool daemonize, bool use_syslog, const bfs::path& log_file, const uint32_t log_file_max_size, - bool dry_run, const bfs::path& global_socket, + bool dry_run, uint32_t dry_run_duration, + const bfs::path& global_socket, const bfs::path& control_socket, uint32_t remote_port, const bfs::path& pidfile, uint32_t workers, uint32_t backlog_size, const bfs::path& cfgfile, @@ -58,6 +59,7 @@ settings::settings(const std::string& progname, bool daemonize, bool use_syslog, m_log_file(log_file), m_log_file_max_size(log_file_max_size), m_dry_run(dry_run), + m_dry_run_duration(dry_run_duration), m_global_socket(global_socket), m_control_socket(control_socket), m_remote_port(remote_port), @@ -74,6 +76,7 @@ void settings::load_defaults() { m_log_file = defaults::log_file; m_log_file_max_size = defaults::log_file_max_size; m_dry_run = defaults::dry_run; + m_dry_run_duration = defaults::dry_run_duration; m_global_socket = defaults::global_socket; m_control_socket = defaults::control_socket; m_remote_port = defaults::remote_port; @@ -106,6 +109,7 @@ void settings::load_from_file(const bfs::path& filename) { } m_dry_run = gsettings.get_as(keywords::dry_run); + m_dry_run_duration = defaults::dry_run_duration; m_global_socket = gsettings.get_as(keywords::global_socket); m_control_socket = gsettings.get_as(keywords::control_socket); m_remote_port = gsettings.get_as(keywords::remote_port); @@ -136,6 +140,7 @@ std::string settings::to_string() const { " m_log_file: " + m_log_file.string() + ",\n" + " m_log_file_max_size: " + std::to_string(m_log_file_max_size) + ",\n" + " m_dry_run: " + (m_dry_run ? "true" : "false") + ",\n" + + " m_dry_run_duration: " + std::to_string(m_dry_run_duration) + ",\n" + " m_global_socket: " + m_global_socket.string() + ",\n" + " m_control_socket: " + m_control_socket.string() + ",\n" + " m_remote_port: " + std::to_string(m_remote_port) + ",\n" + @@ -172,6 +177,10 @@ bool& settings::dry_run() { return m_dry_run; } +uint32_t& settings::dry_run_duration() { + return m_dry_run_duration; +} + bfs::path& settings::global_socket() { return m_global_socket; } diff --git a/src/config/settings.hpp b/src/config/settings.hpp index 955dc50c0cdb036871fea951686a339eeb14db16..64370af47cd443936a11df17be10bf75a0d829fa 100644 --- a/src/config/settings.hpp +++ b/src/config/settings.hpp @@ -86,7 +86,8 @@ struct settings { settings(); settings(const std::string& progname, bool daemonize, bool use_syslog, const bfs::path& log_file, const uint32_t log_file_max_size, - bool dry_run, const bfs::path& global_socket, + bool dry_run, uint32_t dry_run_duration, + const bfs::path& global_socket, const bfs::path& control_socket, uint32_t remote_port, const bfs::path& pidfile, uint32_t workers, uint32_t backlog_size, const bfs::path& cfgfile, @@ -101,6 +102,7 @@ struct settings { bfs::path& log_file(); uint32_t& log_file_max_size(); bool& dry_run(); + uint32_t& dry_run_duration(); bfs::path& global_socket(); bfs::path& control_socket(); in_port_t& remote_port(); @@ -116,6 +118,7 @@ struct settings { bfs::path m_log_file = defaults::log_file; uint32_t m_log_file_max_size = defaults::log_file_max_size; bool m_dry_run = defaults::dry_run; + uint32_t m_dry_run_duration = defaults::dry_run_duration; bfs::path m_global_socket = defaults::global_socket; bfs::path m_control_socket = defaults::control_socket; in_port_t m_remote_port = defaults::remote_port; diff --git a/src/io/task-info.cpp b/src/io/task-info.cpp index d18472d7b582f0b7532ccec369cba03e3fd991be..11d80de3bcf3ced878fe366f858f8e9a795c620d 100644 --- a/src/io/task-info.cpp +++ b/src/io/task-info.cpp @@ -160,5 +160,17 @@ task_info::record_transfer(std::size_t bytes, double usecs) { LOGGER_DEBUG("[{}] {}({}, {}) => {}", m_id, __FUNCTION__, bytes, usecs, m_bandwidth); } +boost::shared_lock +task_info::lock_shared() const { + boost::shared_lock lock(m_mutex); + return lock; +} + +boost::unique_lock +task_info::lock_unique() const { + boost::unique_lock lock(m_mutex); + return lock; +} + } // namespace io } // namespace norns diff --git a/src/io/task-info.hpp b/src/io/task-info.hpp index f4edfb01a27e400e619f26ddb4ce8f31ca0d73cd..c59dedbfa16c6da0251aa6e7efd81234d19af8fb 100644 --- a/src/io/task-info.hpp +++ b/src/io/task-info.hpp @@ -71,6 +71,12 @@ struct task_info { task_stats stats() const; + boost::shared_lock + lock_shared() const; + + boost::unique_lock + lock_unique() const; + mutable boost::shared_mutex m_mutex; // task id and type diff --git a/src/io/task-manager.cpp b/src/io/task-manager.cpp index 0187c9f1aeb3269596baf53c39a40b5fe70afbb4..0651d5daa576346ceca60f8d5fe12f9736321fa0 100644 --- a/src/io/task-manager.cpp +++ b/src/io/task-manager.cpp @@ -38,9 +38,13 @@ namespace norns { namespace io { -task_manager::task_manager(uint32_t nrunners, uint32_t backlog_size, bool dry_run) : +task_manager::task_manager(uint32_t nrunners, + uint32_t backlog_size, + bool dry_run, + uint32_t dry_run_duration) : m_backlog_size(backlog_size), m_dry_run(dry_run), + m_dry_run_duration(dry_run_duration), m_runners(nrunners) {} bool @@ -278,7 +282,8 @@ task_manager::create_task(iotask_type type, const auth::credentials& auth, } m_runners.submit_and_forget( - io::task(std::move(task_info_ptr))); + io::task(std::move(task_info_ptr), + m_dry_run_duration)); break; } diff --git a/src/io/task-manager.hpp b/src/io/task-manager.hpp index dadc5684c99ef3bc5ed984aefb873c7c97babc06..5529ef0efe64b3be2ffeb4de4c8d49c5dd9d3821 100644 --- a/src/io/task-manager.hpp +++ b/src/io/task-manager.hpp @@ -62,13 +62,18 @@ struct task_manager { } }; + using key_type = iotask_id; + using value_type = std::shared_ptr; using backend_ptr = std::shared_ptr; using resource_info_ptr = std::shared_ptr; using resource_ptr = std::shared_ptr; using transferor_ptr = std::shared_ptr; using ReturnType = std::tuple>; - task_manager(uint32_t nrunners, uint32_t backlog_size, bool dry_run); + task_manager(uint32_t nrunners, + uint32_t backlog_size, + bool dry_run, + uint32_t dry_run_duration); bool register_transfer_plugin(const data::resource_type t1, @@ -89,6 +94,17 @@ struct task_manager { std::shared_ptr find(iotask_id) const; + template + std::size_t + count_if(UnaryPredicate&& p) { + boost::unique_lock lock(m_mutex); + return std::count_if(m_task_info.begin(), + m_task_info.end(), + [&](const std::pair& kv) { + return p(kv.second); + }); + } + io::global_stats global_stats() const; @@ -100,6 +116,7 @@ private: iotask_id m_id_base = 0; const uint32_t m_backlog_size; bool m_dry_run; + uint32_t m_dry_run_duration; std::unordered_map> m_task_info; std::unordered_map, boost::circular_buffer, pair_hash> m_bandwidth_backlog; diff --git a/src/io/task.cpp b/src/io/task.cpp index a80e7bb14ac10906f8315ea590bc5e11d67c288c..c4ed40ad7e78e0a2f54186e9fc1d401775a4da5d 100644 --- a/src/io/task.cpp +++ b/src/io/task.cpp @@ -204,31 +204,6 @@ task::operator()() { std::make_error_code(static_cast(ec.value()))); } -///////////////////////////////////////////////////////////////////////////////// -// specializations for noop tasks -///////////////////////////////////////////////////////////////////////////////// -template<> -void -task::operator()() { - - const auto tid = m_task_info->id(); - - LOGGER_WARN("[{}] Starting noop I/O task", tid); - - usleep(100); - - m_task_info->update_status(task_status::running); - - LOGGER_WARN("[{}] noop I/O task \"running\"", tid); - - usleep(100); - - m_task_info->update_status(task_status::finished); - - LOGGER_WARN("[{}] noop I/O task completed successfully", tid); -} - - ///////////////////////////////////////////////////////////////////////////////// // specializations for unknown tasks ///////////////////////////////////////////////////////////////////////////////// diff --git a/src/io/task.hpp b/src/io/task.hpp index b525410d68b3b882ecc15f76cacc1dd83ce2365e..cd02e3a4e4219af77b1043ee4c2abdfa450f4508 100644 --- a/src/io/task.hpp +++ b/src/io/task.hpp @@ -67,6 +67,42 @@ struct task { const transferor_ptr m_transferor; }; +///////////////////////////////////////////////////////////////////////////////// +// specializations for noop tasks +///////////////////////////////////////////////////////////////////////////////// +template <> +struct task { + + using task_info_ptr = std::shared_ptr; + + task(const task_info_ptr&& task_info, uint32_t sleep_duration) + : m_task_info(std::move(task_info)), + m_sleep_duration(sleep_duration) { } + + void operator()() { + const auto tid = m_task_info->id(); + + LOGGER_WARN("[{}] Starting noop I/O task", tid); + + LOGGER_DEBUG("[{}] Sleep for {} usecs", tid, m_sleep_duration); + usleep(m_sleep_duration); + + m_task_info->update_status(task_status::running); + + LOGGER_WARN("[{}] noop I/O task \"running\"", tid); + + LOGGER_DEBUG("[{}] Sleep for {} usecs", tid, m_sleep_duration); + usleep(m_sleep_duration); + + m_task_info->update_status(task_status::finished); + + LOGGER_WARN("[{}] noop I/O task completed successfully", tid); + } + + const task_info_ptr m_task_info; + const uint32_t m_sleep_duration; +}; + } // namespace io } // namespace norns diff --git a/src/main.cpp b/src/main.cpp index 780184238410bac6c1a7c8491fa64a854205cf47..90a2d3b5b5f4d9351f20403a4ea1f47cb07e811a 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -29,13 +29,29 @@ #include #include #include - #include "config.hpp" #include "urd.hpp" namespace bfs = boost::filesystem; namespace bpo = boost::program_options; +namespace { + +void +option_dependency(const boost::program_options::variables_map& vm, + const std::string& for_what, + const std::string& required_option) { + + if(vm.count(for_what) && !vm[for_what].defaulted()) { + if(vm.count(required_option) == 0 || vm[required_option].defaulted()) { + throw std::logic_error(std::string("Option '") + for_what + + "' requires option '" + required_option + "'."); + } + } +} + +} + int main(int argc, char* argv[]){ norns::config::settings cfg; @@ -43,20 +59,27 @@ int main(int argc, char* argv[]){ bool run_in_foreground = !cfg.daemonize(); bool dry_run = cfg.dry_run(); + uint32_t dry_run_duration = cfg.dry_run_duration(); // declare a group of options that will be allowed only on the command line bpo::options_description generic("Allowed options"); generic.add_options() - (",f", bpo::bool_switch(&run_in_foreground), "foreground operation") // check how to do flags - ("dry-run,d", bpo::bool_switch(&dry_run), "don't actually execute tasks") // check how to do flags - ("version,v", "print version string") - ("help,h", "produce help message") + (",f", + bpo::bool_switch(&run_in_foreground), + "foreground operation") + ("dry-run,d", + bpo::value()->value_name("N")->implicit_value(100), + "don't actually execute tasks, but wait N microseconds per task if an argument is provided") + ("version,v", + "print version string") + ("help,h", + "produce help message") ; // declare a group of options that will be allowed in a config file bpo::variables_map vm; bpo::store(bpo::parse_command_line(argc, argv, generic), vm); - bpo::notify(vm); + bpo::notify(vm); if (vm.count("help")) { std::cout << generic << "\n"; @@ -76,9 +99,15 @@ int main(int argc, char* argv[]){ exit(EXIT_FAILURE); } + if(vm.count("dry-run")) { + dry_run = true; + dry_run_duration = vm["dry-run"].as(); + } + // override settings from file with command-line arguments cfg.daemonize() = !run_in_foreground; cfg.dry_run() = dry_run; + cfg.dry_run_duration() = dry_run_duration; norns::urd daemon; daemon.configure(cfg); diff --git a/src/namespaces/namespace-manager.hpp b/src/namespaces/namespace-manager.hpp index 2d60230cadf95eec5d56df8099118ca6ef913525..e29b88d19a08be684c8fca127e8acd96bc1441e0 100644 --- a/src/namespaces/namespace-manager.hpp +++ b/src/namespaces/namespace-manager.hpp @@ -126,6 +126,22 @@ struct namespace_manager { return std::make_tuple(all_found, v); } + template + std::size_t + count_if(UnaryPredicate&& p) const { + + using kv_type = std::pair>; + + return std::count_if( + m_namespaces.begin(), + m_namespaces.end(), + [&](const kv_type& kv) { + return p(kv.second); + }); + } + + private: std::unordered_map> m_namespaces; diff --git a/src/urd.cpp b/src/urd.cpp index ce597554b768ab3b561fa798b75e3bf4f7eb6d00..eee815add95172a66220fb6df46a03b0e27f61a2 100644 --- a/src/urd.cpp +++ b/src/urd.cpp @@ -585,16 +585,27 @@ urd::command_handler(const request_ptr base_request) { switch(request->get<0>()) { case command_type::ping: break; // nothing special to do here - case command_type::pause_accept: - if(!m_is_paused) { - m_is_paused = true; - } + case command_type::pause_listen: + pause_listening(); + break; + case command_type::resume_listen: + resume_listening(); break; - case command_type::resume_accept: - if(m_is_paused) { - m_is_paused = false; + case command_type::shutdown: + { + LOGGER_WARN("Shutdown requested!"); + pause_listening(); + + const auto rv = check_shutdown(); + resp->set_error_code(rv); + + if(rv != urd_error::success) { + resume_listening(); + break; } + shutdown(); break; + } case command_type::unknown: resp->set_error_code(urd_error::bad_args); break; @@ -628,16 +639,12 @@ void urd::signal_handler(int signum){ case SIGINT: LOGGER_WARN("A signal (SIGINT) occurred."); - if(m_api_listener) { - m_api_listener->stop(); - } + shutdown(); break; case SIGTERM: LOGGER_WARN("A signal (SIGTERM) occurred."); - if(m_api_listener) { - m_api_listener->stop(); - } + shutdown(); break; case SIGHUP: @@ -841,7 +848,8 @@ void urd::init_task_manager() { try { m_task_mgr = std::make_unique(m_settings->workers_in_pool(), m_settings->backlog_size(), - m_settings->dry_run()); + m_settings->dry_run(), + m_settings->dry_run_duration()); } catch(const std::exception& e) { LOGGER_ERROR("Failed to create the task manager. This should " @@ -1003,7 +1011,8 @@ void urd::print_configuration() { LOGGER_INFO(" - log file: none"); } - LOGGER_INFO(" - dry run?: {}", (m_settings->dry_run() ? "yes" : "no")); + LOGGER_INFO(" - dry run?: {} [duration: {} microseconds]", + (m_settings->dry_run() ? "yes" : "no"), m_settings->dry_run_duration()); LOGGER_INFO(" - pidfile: {}", m_settings->pidfile()); LOGGER_INFO(" - control socket: {}", m_settings->control_socket()); LOGGER_INFO(" - global socket: {}", m_settings->global_socket()); @@ -1023,6 +1032,54 @@ void urd::print_farewell() { LOGGER_INFO("{}", fsep); } +void urd::pause_listening() { + bool expected = false; + while(!m_is_paused.compare_exchange_weak(expected, true) && !expected); + + LOGGER_WARN("Daemon locked: incoming requests will be rejected"); +} + +void urd::resume_listening() { + bool expected = true; + while(!m_is_paused.compare_exchange_weak(expected, false) && expected); + + LOGGER_WARN("Daemon unlocked: incoming requests will be processed"); +} + +urd_error urd::check_shutdown() { + // - if there are active tasks (i.e. pending or running), we let + // the client know by returning urd_error::tasks_pending + const auto task_is_active = + [](const std::shared_ptr& ti) { + // make sure no modifications can happen + // to the task metadata while we examine it + const auto lock = ti->lock_shared(); + return (ti->status() == io::task_status::pending || + ti->status() == io::task_status::running); + }; + + if(m_task_mgr->count_if(task_is_active) != 0) { + return urd_error::tasks_pending; + } + + // - if there are no active tasks but non-empty tracked backends + // remain, we return urd_error::namespace_not_empty + const auto tracked_namespace_not_empty = + [](const std::shared_ptr& b) { + // no need to filter out the process_memory and remote backends, + // since they will never be tracked + return (b->is_tracked() && !b->is_empty()); + }; + + boost::shared_lock lock(m_namespace_mgr_mutex); + if(m_namespace_mgr->count_if(tracked_namespace_not_empty) != 0) { + return urd_error::namespace_not_empty; + } + + // - otherwise, we return urd_error::success + return urd_error::success; +} + int urd::run() { // initialize logging facilities @@ -1099,4 +1156,10 @@ void urd::teardown() { } } +void urd::shutdown() { + if(m_api_listener) { + m_api_listener->stop(); + } +} + } // namespace norns diff --git a/src/urd.hpp b/src/urd.hpp index 7bd997c9d10f57212609e2f5ad697c149410b91e..8943dad06df4be5db6c2c6933919021c76977535 100644 --- a/src/urd.hpp +++ b/src/urd.hpp @@ -80,6 +80,7 @@ public: void configure(const config::settings& settings); config::settings get_configuration() const; int run(); + void shutdown(); void teardown(); private: @@ -122,6 +123,11 @@ private: bool track, const bfs::path& mount, uint32_t quota); + void pause_listening(); + void resume_listening(); + urd_error check_shutdown(); + + private: std::atomic m_is_paused; diff --git a/tests/api-send-command.cpp b/tests/api-send-command.cpp index 2dc013328654905729ed2ed100fb87717e0cb55f..c5b564684b5c889f8d03e2c0d9c968fcd9919275 100644 --- a/tests/api-send-command.cpp +++ b/tests/api-send-command.cpp @@ -35,7 +35,8 @@ SCENARIO("send control commands to urd", "[api::nornsctl_send_command]") { test_env env( fake_daemon_cfg { - true /* dry_run? */ + true, /* dry_run? */ + 500000 /* dry_run_duration */ } ); @@ -43,9 +44,9 @@ SCENARIO("send control commands to urd", "[api::nornsctl_send_command]") { bfs::path src_mnt; std::tie(std::ignore, src_mnt) = env.create_namespace(nsid0, "mnt/tmp0", 16384); - WHEN("a NORNSCTL_COMMAND_PAUSE_ACCEPT command is sent") { + WHEN("a NORNSCTL_CMD_PAUSE_LISTEN command is sent") { - norns_error_t rv = nornsctl_send_command(NORNSCTL_COMMAND_PAUSE_ACCEPT, NULL); + norns_error_t rv = nornsctl_send_command(NORNSCTL_CMD_PAUSE_LISTEN, NULL); THEN("nornsctl_send_command() returns NORNS_SUCCESS") { REQUIRE(rv == NORNS_SUCCESS); @@ -60,7 +61,7 @@ SCENARIO("send control commands to urd", "[api::nornsctl_send_command]") { REQUIRE(rv == NORNS_EACCEPTPAUSED); AND_THEN("nornsctl_send_command() returns NORNS_SUCCESS and norns_submit() succeeds") { - norns_error_t rv = nornsctl_send_command(NORNSCTL_COMMAND_RESUME_ACCEPT, NULL); + norns_error_t rv = nornsctl_send_command(NORNSCTL_CMD_RESUME_LISTEN, NULL); REQUIRE(rv == NORNS_SUCCESS); rv = norns_submit(&task); @@ -68,25 +69,101 @@ SCENARIO("send control commands to urd", "[api::nornsctl_send_command]") { REQUIRE(task.t_id == 1); } } + + AND_WHEN("further NORNSCTL_CMD_PAUSE_LISTEN commands are sent") { + + rv = nornsctl_send_command(NORNSCTL_CMD_PAUSE_LISTEN, NULL); + + THEN("nornsctl_send_command() returns NORNS_SUCCESS") { + REQUIRE(rv == NORNS_SUCCESS); + } + } + } + } + + WHEN("a NORNSCTL_CMD_RESUME_LISTEN command is sent") { + + norns_error_t rv = nornsctl_send_command(NORNSCTL_CMD_RESUME_LISTEN, NULL); + + THEN("nornsctl_send_command() returns NORNS_SUCCESS") { + REQUIRE(rv == NORNS_SUCCESS); + + AND_WHEN("further NORNSCTL_CMD_RESUME_LISTEN commands are sent") { + rv = nornsctl_send_command(NORNSCTL_CMD_RESUME_LISTEN, NULL); + + THEN("nornsctl_send_command() returns NORNS_SUCCESS") { + REQUIRE(rv == NORNS_SUCCESS); + } + } } } - WHEN("a NORNSCTL_COMMAND_RESUME_ACCEPT command is sent") { + WHEN("a NORNSCTL_CMD_PING command is sent") { - norns_error_t rv = nornsctl_send_command(NORNSCTL_COMMAND_RESUME_ACCEPT, NULL); + norns_error_t rv = nornsctl_send_command(NORNSCTL_CMD_PING, NULL); THEN("nornsctl_send_command() returns NORNS_SUCCESS") { REQUIRE(rv == NORNS_SUCCESS); } } - WHEN("a NORNSCTL_COMMAND_PING command is sent") { +#ifndef USE_REAL_DAEMON + WHEN("a NORNSCTL_CMD_SHUTDOWN command is sent and there are no pending tasks") { - norns_error_t rv = nornsctl_send_command(NORNSCTL_COMMAND_PING, NULL); + norns_error_t rv = nornsctl_send_command(NORNSCTL_CMD_SHUTDOWN, NULL); THEN("nornsctl_send_command() returns NORNS_SUCCESS") { REQUIRE(rv == NORNS_SUCCESS); } } +#endif + + WHEN("a NORNSCTL_CMD_SHUTDOWN command is sent and there are pending tasks") { + + const size_t ntasks = 10; + norns_iotask_t tasks[ntasks]; + + const auto submit_task = [&] { + + norns_iotask_t task = + NORNS_IOTASK(NORNS_IOTASK_COPY, + NORNS_MEMORY_REGION((void*)0xdeadbeef, 43), + NORNS_LOCAL_PATH(nsid0, "foobar")); + auto rv = norns_submit(&task); + + return task; + }; + + for(size_t i=0; i