Commit 274052bb authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add class 'task_manager' for managing tasks

Rename handlers:

urd::create_task      -> urd::iotask_create_handler
urd::check_on_task    -> urd::iotask_status_handler
urd::ping_request     -> urd::ping_handler
urd::register_job     -> urd::job_register_handler
urd::update_job       -> urd::job_update_handler
urd::remove_job       -> urd::job_remove_handler
urd::add_process      -> urd::process_add_handler
urd::remove_process   -> urd::process_remove_handler
urd::register_backend -> urd::backend_register_handler
urd::update_backend   -> urd::backend_update_handler
urd::remove_backend   -> urd::backend_remove_handler
urd::unknown_request  -> urd::unknown_request_handler
parent f68f84ce
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -87,6 +87,7 @@ liburd_aux_la_SOURCES = \
	settings.cpp \
	settings.hpp \
	signal-listener.hpp \
	task-manager.hpp \
	thread-pool.hpp \
	thread-pool/thread-pool.hpp \
	thread-pool/thread-pool-queue.hpp \

src/task-manager.hpp

0 → 100644
+50 −0
Original line number Diff line number Diff line
/*************************************************************************
 * Copyright (C) 2017-2018 Barcelona Supercomputing Center               *
 *                         Centro Nacional de Supercomputacion           *
 * All rights reserved.                                                  *
 *                                                                       *
 * This file is part of the NORNS Data Scheduler, a service that allows  *
 * other programs to start, track and manage asynchronous transfers of   *
 * data resources transfers requests between different storage backends. *
 *                                                                       *
 * See AUTHORS file in the top level directory for information           *
 * regarding developers and contributors.                                *
 *                                                                       *
 * The NORNS Data Scheduler is free software: you can redistribute it    *
 * and/or modify it under the terms of the GNU Lesser General Public     *
 * License as published by the Free Software Foundation, either          *
 * version 3 of the License, or (at your option) any later version.      *
 *                                                                       *
 * The NORNS Data Scheduler is distributed in the hope that it will be   *
 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty   *
 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU  *
 * Lesser General Public License for more details.                       *
 *                                                                       *
 * You should have received a copy of the GNU Lesser General             *
 * Public License along with the NORNS Data Scheduler.  If not, see      *
 * <http://www.gnu.org/licenses/>.                                       *
 *************************************************************************/

#ifndef __TASK_MANAGER_HPP__
#define __TASK_MANAGER_HPP__

#include <memory>
#include <unordered_map>

namespace norns {

namespace io {
    struct task_stats;
}


/*! Class for keeping track of registered tasks and their related information. 
 * Since we inherit from unordered_map<T>, all its usual methods, such as 
 * count(), find(), at(), etc... can used */
struct task_manager : 
    public std::unordered_map<norns_tid_t, std::shared_ptr<io::task_stats>> {
};

} // namespace norns

#endif /* __TASK_MANAGER_HPP__ */
+32 −28
Original line number Diff line number Diff line
@@ -57,10 +57,13 @@
#include "resources.hpp"
#include "io-task.hpp"
#include "io-task-stats.hpp"


#include "urd.hpp"
#include "make-unique.hpp"
#include "unique-ptr-cast.hpp"


namespace norns {

pid_t urd::daemonize() {
@@ -203,7 +206,7 @@ norns_error_t urd::validate_iotask_args(norns_op_t op,
    return NORNS_SUCCESS;
}

response_ptr urd::create_task(const request_ptr base_request) {
response_ptr urd::iotask_create_handler(const request_ptr base_request) {


    // downcast the generic request to the concrete implementation
@@ -274,8 +277,8 @@ response_ptr urd::create_task(const request_ptr base_request) {
            {
                boost::unique_lock<boost::shared_mutex> lock(m_task_manager_mutex);

                if(m_task_manager.count(tid) == 0) {
                    auto it = m_task_manager.emplace(tid, 
                if(m_task_manager->count(tid) == 0) {
                    auto it = m_task_manager->emplace(tid, 
                            std::make_shared<io::task_stats>(io::task_status::pending));
                    stats_record = it.first->second;
                }
@@ -304,7 +307,7 @@ response_ptr urd::create_task(const request_ptr base_request) {
    return resp;
}

response_ptr urd::check_on_task(const request_ptr base_request) const {
response_ptr urd::iotask_status_handler(const request_ptr base_request) const {

    response_ptr resp;
    norns_error_t rv = NORNS_SUCCESS;
@@ -317,9 +320,9 @@ response_ptr urd::check_on_task(const request_ptr base_request) const {
    {
        boost::shared_lock<boost::shared_mutex> lock(m_task_manager_mutex);

        const auto& it = m_task_manager.find(request->get<0>());
        const auto& it = m_task_manager->find(request->get<0>());

        if(it != m_task_manager.end()) {
        if(it != m_task_manager->end()) {
            stats_ptr = it->second;
        }
        else {
@@ -341,7 +344,7 @@ response_ptr urd::check_on_task(const request_ptr base_request) const {
}


response_ptr urd::ping_request(const request_ptr /*base_request*/) {
response_ptr urd::ping_handler(const request_ptr /*base_request*/) {
    response_ptr resp = std::make_unique<api::ping_response>();

    resp->set_error_code(NORNS_SUCCESS);
@@ -351,7 +354,7 @@ response_ptr urd::ping_request(const request_ptr /*base_request*/) {
}


response_ptr urd::register_job(const request_ptr base_request) {
response_ptr urd::job_register_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::job_register_response>();

@@ -377,7 +380,7 @@ response_ptr urd::register_job(const request_ptr base_request) {
    return resp;
}

response_ptr urd::update_job(const request_ptr base_request) {
response_ptr urd::job_update_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::job_update_response>();

@@ -405,7 +408,7 @@ response_ptr urd::update_job(const request_ptr base_request) {
    return resp;
}

response_ptr urd::remove_job(const request_ptr base_request) {
response_ptr urd::job_remove_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::job_unregister_response>();

@@ -432,7 +435,7 @@ response_ptr urd::remove_job(const request_ptr base_request) {
    return resp;
}

response_ptr urd::add_process(const request_ptr base_request) {
response_ptr urd::process_add_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::process_register_response>();

@@ -462,7 +465,7 @@ log_and_return:
    return resp;
}

response_ptr urd::remove_process(const request_ptr base_request) {
response_ptr urd::process_remove_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::process_unregister_response>();

@@ -495,7 +498,7 @@ log_and_return:
    return resp;
}

response_ptr urd::register_backend(const request_ptr base_request) {
response_ptr urd::backend_register_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::backend_register_response>();

@@ -532,7 +535,7 @@ response_ptr urd::register_backend(const request_ptr base_request) {
}

/* XXX not supported yet
response_ptr urd::update_backend(const request_ptr base_request) {
response_ptr urd::backend_update_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::backend_update_response>();

@@ -546,7 +549,7 @@ log_and_return:
    return resp;
}*/

response_ptr urd::remove_backend(const request_ptr base_request) {
response_ptr urd::backend_remove_handler(const request_ptr base_request) {

    response_ptr resp = std::make_unique<api::backend_unregister_response>();

@@ -573,7 +576,7 @@ response_ptr urd::remove_backend(const request_ptr base_request) {
    return resp;
}

response_ptr urd::unknown_request(const request_ptr /*base_request*/) {
response_ptr urd::unknown_request_handler(const request_ptr /*base_request*/) {
    response_ptr resp = std::make_unique<api::bad_request_response>();

    resp->set_error_code(NORNS_EBADREQUEST);
@@ -666,54 +669,55 @@ int urd::run() {
    /* user-level functionalities */
    m_api_listener->register_callback(
            api::request_type::iotask_create,
            std::bind(&urd::create_task, this, std::placeholders::_1));
            std::bind(&urd::iotask_create_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::iotask_status,
            std::bind(&urd::check_on_task, this, std::placeholders::_1));
            std::bind(&urd::iotask_status_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::ping,
            std::bind(&urd::ping_request, this, std::placeholders::_1));
            std::bind(&urd::ping_handler, this, std::placeholders::_1));

    /* admin-level functionalities */
    m_api_listener->register_callback(
            api::request_type::job_register,
            std::bind(&urd::register_job, this, std::placeholders::_1));
            std::bind(&urd::job_register_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::job_update,
            std::bind(&urd::update_job, this, std::placeholders::_1));
            std::bind(&urd::job_update_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::job_unregister,
            std::bind(&urd::remove_job, this, std::placeholders::_1));
            std::bind(&urd::job_remove_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::process_register,
            std::bind(&urd::add_process, this, std::placeholders::_1));
            std::bind(&urd::process_add_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::process_unregister,
            std::bind(&urd::remove_process, this, std::placeholders::_1));
            std::bind(&urd::process_remove_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::backend_register,
            std::bind(&urd::register_backend, this, std::placeholders::_1));
            std::bind(&urd::backend_register_handler, this, std::placeholders::_1));

/*    m_api_listener->register_callback(
            api::request_type::backend_update,
            std::bind(&urd::update_backend, this, std::placeholders::_1));*/
            std::bind(&urd::backend_update_handler, this, std::placeholders::_1));*/

    m_api_listener->register_callback(
            api::request_type::backend_unregister,
            std::bind(&urd::remove_backend, this, std::placeholders::_1));
            std::bind(&urd::backend_remove_handler, this, std::placeholders::_1));

    m_api_listener->register_callback(
            api::request_type::bad_request,
            std::bind(&urd::unknown_request, this, std::placeholders::_1));
            std::bind(&urd::unknown_request_handler, this, std::placeholders::_1));

    m_backends = std::make_unique<backend_manager>();
    m_task_manager = std::make_unique<task_manager>();

    // restore the umask
    umask(old_mask);
+17 −14
Original line number Diff line number Diff line
@@ -41,6 +41,8 @@

#include "job.hpp"

#include "task-manager.hpp"



namespace norns {
@@ -69,6 +71,7 @@ namespace io {
}



class urd {

public:
@@ -84,18 +87,18 @@ private:
                                       resource_info_ptr src_info, 
                                       resource_info_ptr dst_info) const;

    response_ptr create_task(const request_ptr req);
    response_ptr check_on_task(const request_ptr req) const;
    response_ptr ping_request(const request_ptr req);
    response_ptr register_job(const request_ptr req);
    response_ptr update_job(const request_ptr req);
    response_ptr remove_job(const request_ptr req);
    response_ptr add_process(const request_ptr req);
    response_ptr remove_process(const request_ptr req);
    response_ptr register_backend(const request_ptr req);
    response_ptr update_backend(const request_ptr req);
    response_ptr remove_backend(const request_ptr req);
    response_ptr unknown_request(const request_ptr req);
    response_ptr iotask_create_handler(const request_ptr req);
    response_ptr iotask_status_handler(const request_ptr req) const;
    response_ptr ping_handler(const request_ptr req);
    response_ptr job_register_handler(const request_ptr req);
    response_ptr job_update_handler(const request_ptr req);
    response_ptr job_remove_handler(const request_ptr req);
    response_ptr process_add_handler(const request_ptr req);
    response_ptr process_remove_handler(const request_ptr req);
    response_ptr backend_register_handler(const request_ptr req);
    response_ptr backend_update_handler(const request_ptr req);
    response_ptr backend_remove_handler(const request_ptr req);
    response_ptr unknown_request_handler(const request_ptr req);

private:
    signal_listener_ptr                    m_signal_listener;
@@ -114,7 +117,7 @@ private:
    std::unordered_map<uint32_t, std::shared_ptr<job>>    m_jobs;
    boost::shared_mutex                         m_jobs_mutex;

    std::unordered_map<norns_tid_t, std::shared_ptr<io::task_stats>> m_task_manager;
    std::unique_ptr<task_manager> m_task_manager;
    mutable boost::shared_mutex   m_task_manager_mutex;
};

+1 −1
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ void fake_daemon::run() {
        return;
    }

    config_settings settings = {
    norns::config_settings settings = {
        "test_urd", /* progname */
        false, /* daemonize */
        true, /* use syslog */