Commit 65fdc95c authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Integrate thread pool into task_manager

parent 2740244d
Loading
Loading
Loading
Loading
+13 −6
Original line number Diff line number Diff line
@@ -34,19 +34,21 @@
namespace norns {
namespace io {

task_manager::task_manager() {}
task_manager::task_manager(uint32_t nrunners, bool dry_run) :
    m_dry_run(dry_run),
    m_runners(nrunners) {}

boost::optional<task_manager::ReturnType>
task_manager::create() {
task_manager::register_task() {

    iotask_id tid = ++m_id_base;

    if(m_tasks.count(tid) != 0) {
    if(m_task_info.count(tid) != 0) {
        --m_id_base;
        return boost::optional<ReturnType>();
    }

    auto it = m_tasks.emplace(tid, 
    auto it = m_task_info.emplace(tid, 
            std::make_shared<task_stats>(task_status::pending));

    return boost::optional<ReturnType>(
@@ -58,14 +60,19 @@ task_manager::find(iotask_id tid) const {

    std::shared_ptr<task_stats> stats_ptr;

    const auto& it = m_tasks.find(tid);
    const auto& it = m_task_info.find(tid);

    if(it != m_tasks.end()) {
    if(it != m_task_info.end()) {
        stats_ptr = it->second;
    }

    return stats_ptr;
}

void
task_manager::stop_all_tasks() {
    m_runners.stop();
}

} // namespace io
} // namespace norns
+42 −3
Original line number Diff line number Diff line
@@ -31,6 +31,10 @@
#include <memory>
#include <unordered_map>
#include <boost/optional.hpp>
#include "thread-pool.hpp"

#include "task.hpp"
#include "fake-task.hpp"

#include "common.hpp"

@@ -46,16 +50,51 @@ struct task_manager {
    using ReturnType = 
        std::tuple<iotask_id, std::shared_ptr<task_stats>>;

    task_manager();
    task_manager(uint32_t nrunners, bool dry_run);

    template <typename... Args>
    boost::optional<iotask_id>
    create(Args&&... args) {

        auto ret = register_task();

        if(!ret) {
            return boost::none;
        }

        iotask_id tid;
        std::shared_ptr<io::task_stats> stats_record;

        std::tie(tid, stats_record) = *ret;

    boost::optional<ReturnType> create();
        if(!m_dry_run) {
            m_runners.submit_and_forget(
                    io::task(tid, std::forward<Args>(args)...,
                                std::move(stats_record)));
            
            return tid;
        }

        m_runners.submit_and_forget(
                io::fake_task(tid, std::move(stats_record)));

        return tid;
    }

    std::shared_ptr<task_stats>
    find(iotask_id) const;

    void 
    stop_all_tasks();

private:
    boost::optional<ReturnType> register_task();

private:
    iotask_id m_id_base = 0;
    std::unordered_map<iotask_id, std::shared_ptr<task_stats>> m_tasks;
    bool m_dry_run;
    std::unordered_map<iotask_id, std::shared_ptr<task_stats>> m_task_info;
    thread_pool m_runners;

};

+23 −58
Original line number Diff line number Diff line
@@ -255,18 +255,11 @@ response_ptr urd::iotask_create_handler(const request_ptr base_request) {
            goto log_and_return;
        }

        std::shared_ptr<io::task_stats> stats_record;
        const auto ret = m_task_mgr->create(type, bsrc, src_info, bdst, 
                                            dst_info, *creds, 
                                            std::move(tx_ptr)); 

        // register the task in the task manager
        {
            boost::unique_lock<boost::shared_mutex> lock(m_task_mgr_mutex);

            auto ret = m_task_mgr->create();

            if(ret) {
                std::tie(tid, stats_record) = *ret;
            }
            else {
        if(!ret) {
            // this can only happen if we tried to register a task
            // and the TID automatically generated collided with an
            // already running task. 
@@ -278,22 +271,8 @@ response_ptr urd::iotask_create_handler(const request_ptr base_request) {
            rv = urd_error::too_many_tasks;
            goto log_and_return;
        }
        }

        // everything is ok, add the I/O task to the queue
        if(stats_record) {
            if(m_settings->dry_run()) {
                m_workers->submit_and_forget(
                        io::fake_task(tid, stats_record));
            }
            else {
                m_workers->submit_and_forget(
                        io::task(tid, type, bsrc, src_info, bdst, dst_info, 
                                 *creds, std::move(tx_ptr), 
                                 std::move(stats_record)));
            }
        }

        tid = *ret;
        rv = urd_error::success;
    }

@@ -631,19 +610,6 @@ void urd::init_logger() {
    }
}

void urd::init_worker_pool() {
    LOGGER_INFO(" * Creating workers...");

    try {
        m_workers = std::make_unique<thread_pool>(m_settings->workers_in_pool());
    }
    catch(const std::exception& e) {
        LOGGER_ERROR("Failed to create the worker pool. This should "
                     "not happen under normal conditions.");
        exit(EXIT_FAILURE);
    }
}

void urd::init_event_handlers() {

    LOGGER_INFO(" * Creating event listener...");
@@ -812,7 +778,8 @@ void urd::init_task_manager() {
    LOGGER_INFO(" * Creating task manager...");

    try {
        m_task_mgr = std::make_unique<io::task_manager>();
        m_task_mgr = std::make_unique<io::task_manager>(m_settings->workers_in_pool(),
                                                        m_settings->dry_run());
    }
    catch(const std::exception& e) {
        LOGGER_ERROR("Failed to create the task manager. This should "
@@ -989,10 +956,9 @@ int urd::run() {

    LOGGER_INFO("[[ Starting up ]]");

    init_worker_pool();
    init_task_manager();
    init_event_handlers();
    init_namespace_manager();
    init_task_manager();
    load_backend_plugins();
    load_transfer_plugins();
    load_default_namespaces();
@@ -1041,11 +1007,10 @@ void urd::teardown() {
        m_settings.reset();
    }

    //m_workers.reset();
    if(m_workers) {
        LOGGER_INFO("* Stopping worker threads...");
        m_workers->stop();
        m_workers.reset();
    if(m_task_mgr) {
        LOGGER_INFO("* Stopping task manager...");
        m_task_mgr->stop_all_tasks();
        m_task_mgr.reset();
    }
}

+0 −9
Original line number Diff line number Diff line
@@ -37,8 +37,6 @@
#include "logger.hpp"
#include "api.hpp"

#include "io/thread-pool.hpp"

#include "job.hpp"


@@ -48,8 +46,6 @@ namespace norns {


/*! Aliases for convenience */
using thread_pool_ptr = std::unique_ptr<thread_pool>;

using api_listener = api::listener<api::message<api::request, api::response>>;
using api_listener_ptr = std::unique_ptr<api_listener>;

@@ -93,7 +89,6 @@ private:
    void signal_handler(int);

    void init_logger();
    void init_worker_pool();
    void init_event_handlers();
    void init_namespace_manager();
    void init_task_manager();
@@ -130,10 +125,6 @@ private:
    std::shared_ptr<config::settings>                    m_settings;
    std::unique_ptr<io::transferor_registry> m_transferor_registry;

    thread_pool_ptr m_workers;



    api_listener_ptr                                    m_api_listener;

    std::unique_ptr<ns::namespace_manager> m_namespace_mgr;