Commit 98f86de9 authored by Arnau Bago Castro's avatar Arnau Bago Castro
Browse files

unix socket added to norn

parent bf32803a
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -34,7 +34,9 @@ src_dloom_SOURCES = \

src_dloom_CXXFLAGS = 			\
	@TBB_CFLAGS@ 				\
	@LIBEV_CFLAGS@				\
	-std=gnu++11 -Wall -Wextra

src_dloom_LDFLAGS = \
	@TBB_LIBS@
	@TBB_LIBS@		\
	@LIBEV_LIBS@
+2 −0
Original line number Diff line number Diff line
@@ -47,6 +47,8 @@ AX_CHECK_COMPILE_FLAG([-std=c++11], [CXXFLAGS+=" -std=c++11"], [

# check that Intel's TBB
PKG_CHECK_MODULES([TBB], [tbb])
# check libev
PKG_CHECK_MODULES([LIBEV], [libev])

# Checks for header files.

+3 −1
Original line number Diff line number Diff line
@@ -27,6 +27,8 @@

int main() {
   printf("Hello, World! I'm the app \n");
   initialize();
   init();
   push_job();
   finit();
   return 0;
}
+38 −9
Original line number Diff line number Diff line
@@ -27,8 +27,20 @@
#include <sys/types.h>
#include <stdlib.h>
#include <stdint.h>
#include <ev.h>
#include <sys/fcntl.h>
#include <unistd.h>
#include <errno.h>

#define SOCKET_NAME "dloom_socket" 
#define SOCKET_NAME "/tmp/dloom.socket" 

/* Global variables */
int sock;
char buff[1024] = "hola";

/* Function declaration */

void init();

struct task{
	pid_t pid;
@@ -36,10 +48,14 @@ struct task{
	const char *filePath;
};

void initialize(){
	int sock;

void init(){
	/*
	 * 
	 */

	
	struct sockaddr_un server;
	char buff[1024] = "hola";
	struct task t;
	t.pid = 3;
	t.taskId = 4;
@@ -59,7 +75,20 @@ void initialize(){
        perror("connecting stream socket");
        exit(1);
    }
    if (write(sock, &t, sizeof(t)) < 0)
        perror("writing on stream socket");

}

void finit(){
	/*
	 * 
	 */
	close(sock);
}

void push_job(){
	/*
	 * 
	 */
	if (write(sock, &buff, sizeof(buff)) < 0)
        perror("writing on stream socket");
}
+168 −151
Original line number Diff line number Diff line
@@ -37,13 +37,17 @@
#include <list>
#include <tbb/tbb.h>
#include <signal.h>
#include <ev.h>
#include <string>
#include <vector>
#include <ctime>

#include "ctpl.h" 

#define RUNNING_DIR "/tmp"  
#define SOCKET_FILE "/tmp/dloom.socket"  
#define DAEMON_LOCK_FILE "/tmp/dloom.lock"
#define LOG_FILE "tmp/dloom.log"
#define LOG_FILE "/tmp/dloom.log"

extern "C" {
	struct foo {
@@ -71,15 +75,41 @@ enum job_type{
	check_all_tasks
};

struct sock_ev_client;

struct sock_ev_serv {
	ev_io io;
	int fd;
	struct sockaddr_un socket;
	int socket_len;
	std::vector<sock_ev_client> clients;
};

struct sock_ev_client {
	ev_io io;
	int fd;
	int index;
	struct sock_ev_serv *server;
};


tbb::concurrent_queue<task> jobs_priority_1;
tbb::concurrent_hash_map<pid_t, std::list<task_finished>> tasks_finished_map;

void log_message(const char filename[], const char message[]){
	time_t now = time(0);
	char* dt = ctime(&now);
	FILE *logfile;
	logfile=fopen(filename,"a");
	if(!logfile) return;
	fprintf(logfile,"%s\n",message);
	fclose(logfile);
	logfile=fopen(filename, "a+");
	if(logfile == NULL){
		return;
	}
	if (0 > fprintf(logfile,"%s -- %s", message, dt)){
		perror("log_message, fprintf");
	}
	if( 0 != fclose(logfile)){
		perror("log_message, fclose");
	}
}

void signal_handler(int sig){
@@ -88,9 +118,12 @@ void signal_handler(int sig){
			log_message(LOG_FILE, "Hangup signal catched");
			break;
		case SIGTERM:
		log_message(LOG_FILE,"terminate signal catched");
			log_message(LOG_FILE, "Terminate signal catched");
			exit(0);
			break;
		default:
			log_message(LOG_FILE, "Unknown signal received");
			break;
		}	
}

@@ -108,6 +141,10 @@ void daemonize() {
	 *	Manage signals
	 */

	log_message(LOG_FILE, "***************************");
	log_message(LOG_FILE, "** Starting dloom daemon **");
	log_message(LOG_FILE, "***************************");

	pid_t pid, sid;

	/* Check if this is already a daemon */ 
@@ -124,7 +161,7 @@ void daemonize() {

	/* Parent exits */
	if (pid > 0) {
		return;
		exit(0);
	}

	/* Obtain new process group */
@@ -137,10 +174,7 @@ void daemonize() {
	/* Close all descriptors */
	int i;
	for(i=getdtablesize(); i>=0; --i){
		if(close(i)<0){
			perror("Close descritpors");
			exit(1);
		}	
		close(i);
	} 

	/* Handle standard IO */
@@ -189,154 +223,137 @@ void daemonize() {

}

void comm_thread(int id){
	/*
	 * 1. Initialize socket
	 * 2. Check if incoming rpc or status rpc
	 * 	if (incoming rpc):
	 *		check prioritiy and enqueu
	 *	else if (status rpc) :
	 *		check status and return
	 * 3. Back to 2. 
	 */

	/* Variables necessary for the socket */
	int sock, msgsock, rval, listen_err;
	struct sockaddr_un server;
	char buf[1024];

	/* Create socket */
	sock = socket(AF_UNIX, SOCK_STREAM, 0);
	if (sock < 0) {
        perror("opening stream socket");
        exit(1);
int set_non_block(int fd){
	/* Add O_NONBLOCK to the file descriptor */
	int flags;
	flags = fcntl(fd, F_GETFL);
	flags |= O_NONBLOCK;
	return fcntl(fd, F_SETFL, flags);
}

    /* Name and bind the socket using file system name */
    server.sun_family = AF_UNIX;
    strcpy(server.sun_path, SOCKET_NAME);
    if (bind(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un))) {
    	perror("binding stream obert");
    	exit(1);
int unix_socket_init(sockaddr_un* socket_un, char* sock_path){
	int fd;
	if (unlink(sock_path) < 0){
		perror("unlink socket");
		exit(EXIT_FAILURE);		 
	}

    /* Start accepting connections, and start reading on the accepted condition */
    listen_err = listen(sock, 10);
    if (listen_err == -1){
    	perror("listen socket failed");
    	exit(1);
	/* Setup unix socket listener */
	fd = socket(AF_UNIX, SOCK_STREAM, 0);	
	if (fd == -1){
		perror("echo server socket");
		exit(EXIT_FAILURE);
	}

	job_type type;
	uint64_t taskId;
	while(1){

		msgsock = accept(sock, 0, 0);
		if (msgsock == -1)
			perror("accept");
		else do {
	/* Set it non-blocking */
	if (set_non_block(fd) == -1){
		perror("echo server socket nonbloc");
		exit(EXIT_FAILURE);
	}

			struct foo bar;
			memset(&bar, 0, sizeof(bar));
	/* Set it as Unix socket */
	socket_un->sun_family = AF_UNIX;
	strcpy(socket_un->sun_path, sock_path);

			rval = read(msgsock, &bar, sizeof(bar));
			continue;
	return fd;
}

			bzero(buf, sizeof(buf));
			if ((rval = read(msgsock, buf, 1024)) > 0){
				/*
				 * Transform buf to job_type.
				 * Switch through the different cases.
				 */
				task t;
				type = enqueue_task;
				switch(type){
					case enqueue_task:
					{
						// sleep(2);
						// incoming rpc
						// set request id
						jobs_priority_1.push(t);
						// return request id through pipe
						break;
int server_init(sock_ev_serv *server, char *sock_path, int max_queue){
	server->fd = unix_socket_init(&server->socket, sock_path);
	server->socket_len = sizeof(server->socket.sun_family) + strlen(server->socket.sun_path);
	if(bind(server->fd, (sockaddr*) &server->socket, server->socket_len) == -1){
		perror("echo server bind");
		exit(EXIT_FAILURE);
	}
					case check_task_status:
					{
						// return certain job id status
						pid_t pid_thread;
						tbb::concurrent_hash_map<pid_t, std::list<task_finished>>::accessor acc;
						if(tasks_finished_map.find(acc, pid_thread)){
							//key pid_thread is found
							for(auto iterator = acc->second.begin(); iterator !=acc->second.end();){
								if (iterator->taskId == taskId){
									//found the task, should erase it and return it. 
									uint64_t status = iterator->status; 
									iterator = acc->second.erase(iterator); 
									//retornar status

									//break
									break;
								} else {
									 ++iterator;
	if(listen(server->fd, max_queue) == -1){
		perror("listen");
		exit(EXIT_FAILURE);
	}
	return 0;
}
						} else {
							//key pid_thread is not found in map, it may be processing or waiting to be processed

static void not_blocked(EV_P_ ev_periodic *w, int revents){
	log_message(LOG_FILE, "Not blocked");
}
						break;

static void client_cb(EV_P_ ev_io *w, int revents){
	/* to-do: receive data */
	/* A client has become readable */
	std::string buff;
	struct sock_ev_client* client = (struct sock_ev_client*) w;
	int n = recv(client->fd, &buff, sizeof(buff), 0);
	if(n<=0){

	}
					case check_all_tasks:
					{
						// return all jobs from certain pid_t
						pid_t pid_thread;
						tbb::concurrent_hash_map<pid_t, std::list<task_finished>>::accessor acc;
						if(tasks_finished_map.find(acc, pid_thread)){
							//key pid_is found, return all jobs inside
							//while list not empty, pop and return job
							while(!acc->second.empty()){
								task_finished tf = acc->second.front();
								acc->second.pop_front();
								//return tf

}
						} else{
							//no jobs from pid_thread are finished yet.

inline static sock_ev_client client_new(int fd){
	sock_ev_client client;
	client.fd = fd;
	set_non_block(client.fd);
	ev_io_init(&client.io, client_cb, client.fd, EV_READ);

	return client;
}

						break;
static void server_cb(EV_P_ ev_io *w, int revents){
	log_message(LOG_FILE, "Unix stream socket has become readable");
	int client_fd;
	sock_ev_client client; 

	/* ev_io is the first member, watcher 'w' has the addres of the start of the sock_ev_serv */
	sock_ev_serv *server = (struct sock_ev_serv*) w;

	while(1){
		client_fd = accept(server->fd, NULL, NULL);
		if( client_fd == -1){
			if( errno != EAGAIN && errno != EWOULDBLOCK ){
				log_message(LOG_FILE, "accept() failed");
				perror("accept()");
				exit(EXIT_FAILURE);
			}
			break;
		}
		log_message(LOG_FILE, "accepted a client");
		client = client_new(client_fd);
		client.server = server;
		server->clients.push_back(client);
		client.index = server->clients.size()-1;
		ev_io_start(EV_A_ &client.io);
	
			} else if (rval == 0)
				printf("Ending connection\n");
			else
				perror("reading stream message");
		} while (rval > 0);
		close(msgsock);
	}
}

	close(sock);
    unlink(SOCKET_NAME);
void communication_thread(int id){
	int max_queue = 128;
	sock_ev_serv server;
	ev_periodic every_few_seconds;
	EV_P = ev_default_loop(0);

}
	/* create unix socket in non-blocking fashion */
	server_init(&server, SOCKET_FILE, max_queue);

	/* Wake up periodically */
	ev_periodic_init(&every_few_seconds, not_blocked, 0, 5, 0);
	ev_periodic_start(EV_A_ &every_few_seconds);

void serve_job(int id, task t){
	/*
	 * 1. Serve t
	 * 2. Enqueue t to finished queue
	 */
	/* Get notified whenever the socket is ready to read */
	ev_io_init(&server.io, server_cb, server.fd, EV_READ);
	ev_io_start(EV_A_ &server.io);

	//serve
	//enqueue
}
	log_message(LOG_FILE, "unix-socket-echo starting...");
	ev_loop(EV_A_ 0);

	close(server.fd);
}

void push_jobs(ctpl::thread_pool &p){
	/*
	 * 1. If p.n_idle > 0 (probably sleep between checks)
	 * 2. Push job
	 */

	/*
	while(1){
		task t;
		if (p.n_idle() > 0 and jobs_priority_1.unsafe_size() > 0){
@@ -351,7 +368,7 @@ void push_jobs(ctpl::thread_pool &p){
			sleep(4);
		}

	}
	}*/

}

@@ -363,7 +380,7 @@ void infinite_loop() {
	 */

	ctpl::thread_pool p(3);
	p.push(comm_thread);
	p.push(communication_thread);
	push_jobs(p);
}