Commit 92a82b22 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add signal capture and graceful shutdown

parent fe8db74d
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -42,6 +42,7 @@ src_urd_CXXFLAGS = \
	-std=gnu++11 -Wall -Wextra

src_urd_CPPFLAGS = 				\
	-DSPDLOG_ENABLE_SYSLOG		\
	@BOOST_CPPFLAGS@			\
	-I$(top_srcdir)/include		\
	-I$(top_srcdir)/src
+4 −0
Original line number Diff line number Diff line
@@ -96,6 +96,10 @@ public:
        m_ios.run();
    }

    void stop() {
        m_ios.stop();
    }

private:
    void start_accept(){
        /* start an asynchronous accept: the call to async_accept returns immediately, 
+3 −2
Original line number Diff line number Diff line
@@ -36,18 +36,19 @@ public:
    logger(const std::string& ident, const std::string& type) {

        try {
            spdlog::set_async_mode(queue_size);
//            spdlog::set_async_mode(queue_size);

            if(type == "stdout") {
                m_internal_logger = spdlog::stdout_logger_mt(ident);
            }
#ifdef SPDLOG_ENABLE_SYSLOG 
            else if(type == "syslog") {
                m_internal_logger = spd::syslog_logger("syslog", "ident", LOG_PID);
                m_internal_logger = spdlog::syslog_logger("syslog", ident, LOG_PID);
            }
#endif
            else {
                // FIXME: add custom exceptions here!
                std::cerr << "Unknown logger type: '" << type << "'\n";
                abort();
            }

+39 −213
Original line number Diff line number Diff line
@@ -23,7 +23,6 @@
//

#include <stdio.h>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
@@ -38,10 +37,9 @@
#include <tbb/tbb.h>
#include <signal.h>
#include <string.h>
#include <vector>
#include <ctime>

#include <boost/asio.hpp>
#include <boost/atomic.hpp>
#include <functional>


@@ -49,21 +47,12 @@
#include <norns.h>

#include "ipc-listener.hpp"
#include "signal-listener.hpp"
#include "ctpl.h" 
#include "logger.hpp"
#include "urd.hpp"

#if 0 
const char* RUNNING_DIR = "/tmp";
const char* SOCKET_FILE = "/tmp/urd.socket";  
const char* DAEMON_LOCK_FILE = "/tmp/urd.lock";
const char* LOG_FILE = "/tmp/urd.log";
const int WAKEUP_PERIODIC_TIME = 5;
const int N_THREADS_IN_POOL = 3;
const int MAX_CLIENTS_SUPPORTED = 20;

ev_periodic every_few_seconds;

struct task{
	pid_t pid;
	uint64_t taskId;
@@ -87,22 +76,6 @@ tbb::concurrent_queue<task> jobs_priority_1;
tbb::concurrent_hash_map<pid_t, std::list<task_finished>> tasks_finished_map;
#endif

void signal_handler(int sig){
	switch(sig) {
		case SIGHUP: 
			//log_message(LOG_FILE, "Hangup signal catched");
			break;
		case SIGTERM:
			//log_message(LOG_FILE, "Terminate signal catched");
			//log_message(LOG_FILE, "Shutting down urd");
			exit(EXIT_SUCCESS);
			break;
		default:
			//log_message(LOG_FILE, "Unknown signal received");
			break;
		}	
}

void urd::daemonize() {
	/*
	 * --- Daemonize structure ---
@@ -180,7 +153,7 @@ void urd::daemonize() {
	
	/* Check if daemon already exists:
	 * First instance of the daemon will lock the file so that other
	 * instances understand that an instnace is already running. 
	 * instances understand that an instance is already running. 
	 */

	int lfp;
@@ -214,188 +187,11 @@ void urd::daemonize() {
	signal(SIGTSTP, SIG_IGN); /* ignore tty signals */
	signal(SIGTTOU, SIG_IGN);
	signal(SIGTTIN, SIG_IGN);
	signal(SIGHUP, signal_handler); /* catch hangup signal */
	signal(SIGTERM, signal_handler); /* catch kill signal */

}

#if 0
int set_non_block(int fd){
	/* Add O_NONBLOCK to the file descriptor */
	int flags;
	flags = fcntl(fd, F_GETFL);
	flags |= O_NONBLOCK;
	return fcntl(fd, F_SETFL, flags);
}

int unix_socket_init(sockaddr_un* socket_un, const char* sock_path){
	int fd;

    struct stat stbuf;

    /* if socket exists from a previous run, remove it */
    if (stat(sock_path, &stbuf) == 0) {
        if (unlink(sock_path) < 0){
            log_message(LOG_FILE, "unlink socket failed");
            perror("unlink socket");
            exit(EXIT_FAILURE);		 
        }
    }

	/* Setup unix socket listener */
	fd = socket(AF_UNIX, SOCK_STREAM, 0);	
	if (fd == -1){
		log_message(LOG_FILE, "echo server socket failed");
		perror("echo server socket");
		exit(EXIT_FAILURE);
	}

	/* Set it non-blocking */
	if (set_non_block(fd) == -1){
		log_message(LOG_FILE, "echo server socket nonbloc failed");
		perror("echo server socket nonbloc");
		exit(EXIT_FAILURE);
	}

	/* Set it as Unix socket */
	socket_un->sun_family = AF_UNIX;
	strncpy(socket_un->sun_path, sock_path, sizeof(socket_un->sun_path));

	return fd;
}


int urd::server_init(sock_ev_serv *serv, const char *sock_path){
	log_message(LOG_FILE, "initializing server...");
	serv->max_clients = MAX_CLIENTS_SUPPORTED;
	serv->fd = unix_socket_init(&serv->socket, sock_path);
	serv->socket_len = sizeof(serv->socket.sun_family) + strnlen(serv->socket.sun_path, sizeof(serv->socket.sun_path));
	if(bind(serv->fd, (sockaddr*) &serv->socket, serv->socket_len) == -1){
		perror("echo server bind");
		exit(EXIT_FAILURE);
	}
	if(listen(serv->fd, serv->max_clients) < -1){
		perror("listen");
		exit(EXIT_FAILURE);
	}
	return 0;
}


/*static*/ void urd::read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents){
	/* to-do: receive data */
	/* A client has become readable */
	log_message(LOG_FILE, "Inside read_cb");
	(void)revents;
	(void)loop;

	norns_iotd t;
	ssize_t read;

	if(EV_ERROR & revents)
    {
    	log_message(LOG_FILE, "got invalid event in read_cb");
      	perror("got invalid event");
      	return;
    }

    //receive message from client socket
    read = recv(watcher->fd, &t, sizeof(t), 0);

    if(read < 0){
    	log_message(LOG_FILE, "read error");
    }

    if(read == 0){
    	//stop and free watcher if client socket is closing
    	ev_io_stop(loop, watcher);
    	free(watcher);
    	log_message(LOG_FILE, "client close socket");
    	server.current_clients = server.current_clients -1;
    	return;
    } else {
    	log_message(LOG_FILE, "we have received this message: ");
    	log_message(LOG_FILE, (std::to_string(t.ni_tid)).c_str());
    }
    
}


/*inline static*/ urd::sock_ev_client urd::client_new(int fd){
    //FIXME BUG: returning reference from local variable
	sock_ev_client client;
	client.fd = fd;
	set_non_block(client.fd);
	//ev_io_init(&client.io, client_cb, client.fd, EV_READ);
	log_message(LOG_FILE, "client_new registered");
	return client;
}

/*static*/ void urd::accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents){
	/*
	 * Callback for accepting clients.
	 */
	(void)watcher;
	(void)revents;
	log_message(LOG_FILE, "Unix stream socket has become readable");

	struct sockaddr_un client_addr;
	socklen_t client_len = sizeof(client_addr);
	int client_sd;
	struct ev_io *w_client = (struct ev_io*) malloc (sizeof(struct ev_io));

	if(EV_ERROR & revents){
		log_message(LOG_FILE, "got invalide event");
		return;
	}

	/* Accept client request */
	client_sd = accept(server.fd, (struct sockaddr *) &client_addr, &client_len);

	if(client_sd < 0){
		log_message(LOG_FILE, "accept error");
		return;
	}

	/* Increment number of clients connected */
	server.current_clients = server.current_clients + 1;

	log_message(LOG_FILE, "succesfully connected with client.");

	/* Initialize and start watcher to read client requests */

	ev_io_init(w_client, read_cb, client_sd, EV_READ);
	ev_io_start(loop, w_client);
}
#endif

#if 0
void urd::communication_thread(int id){
	(void)id;
//	signal(SIGHUP, signal_handler); /* catch hangup signal */
//	signal(SIGTERM, signal_handler); /* catch kill signal */

	/* loop is set to default. If different config is needed we need to change this. */
	struct ev_loop *loop = ev_default_loop(0);

	/* init server and socket */
	server_init(&server, SOCKET_FILE);

	/* Initialize and start a watcher to accept client requests */
	ev_io_init(&server.io, accept_cb, server.fd, EV_READ);
	ev_io_start(loop, &server.io);

	log_message(LOG_FILE, "unix-socket-echo starting...");
	
	while(1){
		ev_loop(EV_A_ 0);
}

	/* Only reached if the loop is manually exited */
	log_message(LOG_FILE, "loop manually exited");
	close(server.fd);
	exit(EXIT_SUCCESS);
}
#endif

#if 0
void push_jobs(ctpl::thread_pool &p){
	/*
@@ -443,10 +239,31 @@ void urd::set_configuration(const config_settings& settings) {
    m_settings = std::make_shared<config_settings>(settings);
}

void urd::signal_handler(int signum){

    switch(signum) {

        case SIGTERM:
            m_logger->info(" A signal(SIGTERM) occurred.");

            m_ipc_listener->stop();
            ::unlink(m_settings->m_daemon_pidfile.c_str());
            break;

        case SIGHUP:
            m_logger->info(" A signal(SIGHUP) occurred.");
            break;
    }
}

void urd::run() {

    // initialize logging facilities
    if(m_settings->m_daemonize) {
        m_logger = std::shared_ptr<logger>(new logger(m_settings->m_progname, "syslog"));
    } else{
        m_logger = std::shared_ptr<logger>(new logger(m_settings->m_progname, "stdout"));
    }

	m_logger->info("===========================");
	m_logger->info("== Starting Urd daemon   ==");
@@ -461,7 +278,16 @@ void urd::run() {
	m_logger->info("    internal storage: {}", m_settings->m_storage_path);
	m_logger->info("    internal storage capacity: {}", m_settings->m_storage_capacity);

    //daemonize();
    if(m_settings->m_daemonize) {
        daemonize();
    }

    // signal handlers must be installed AFTER daemonizing
    m_logger->info("* Installing signal handlers...");
    m_signal_listener = std::shared_ptr<signal_listener>(new signal_listener(
                std::bind(&urd::signal_handler, this, std::placeholders::_1)));

    m_signal_listener->run();

    // create worker pool
    m_workers = std::shared_ptr<ctpl::thread_pool>(new ctpl::thread_pool(m_settings->m_workers_in_pool));
+3 −23
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
#include <ev.h>

#include "settings.hpp"
#include "signal-listener.hpp"
#include "ipc-listener.hpp"
#include "logger.hpp"
#include "ctpl.h"
@@ -37,16 +38,6 @@
class urd {

public:
    /* constants */
//    static constexpr const char* name = "urd";
//    static constexpr const char* RUNNING_DIR = "/tmp";
//    static constexpr const char* SOCKET_FILE = "/tmp/urd.socket";  
//    static constexpr const char* DAEMON_LOCK_FILE = "/tmp/urd.lock";
//    static constexpr const char* LOG_FILE = "/tmp/urd.log";
//    static const int WAKEUP_PERIODIC_TIME = 5;
//    static const int N_THREADS_IN_POOL = 3;
//    static const int MAX_CLIENTS_SUPPORTED = 20;

    /* sample task (to be removed) */
    struct task {
        task(struct norns_iotd* /*iotdp*/){
@@ -65,25 +56,14 @@ public:
    void set_configuration(const config_settings& settings);
    void run();

    

private:
    void daemonize();
    void new_request_handler(struct norns_iotd*);

    void log_message(const char filename[], const char message[]);
//    void signal_handler(int sig);
    int set_non_block(int fd);
    //int unix_socket_init(sockaddr_un* socket_un, const char* sock_path);
    //int server_init(sock_ev_serv *serv, const char *sock_path);

    void communication_thread(int id);
    void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
    static void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);
    void signal_handler(int);

private:

    std::shared_ptr<logger>                          m_logger;
    std::shared_ptr<signal_listener>                 m_signal_listener;
    std::shared_ptr<config_settings>                 m_settings;
    std::shared_ptr<ctpl::thread_pool>               m_workers;
    std::shared_ptr<ipc_listener<struct norns_iotd>> m_ipc_listener;