Commit 0ad1779a authored by Arnau Bago Castro's avatar Arnau Bago Castro
Browse files

socket working with struct

parent 98f86de9
Loading
Loading
Loading
Loading
+6 −7
Original line number Diff line number Diff line
@@ -21,14 +21,13 @@
 * along with Data Scheduler.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <libnorn.h>
#include <stdio.h>

#include <libnorn.h>

int main() {
    printf("Hello, World! I'm the app \n");
   init();
   push_job();
   finit();
    if( push_job() < 0 ){
    	printf("Error with push job \n");
    }
    return 0;
}
+3 −1
Original line number Diff line number Diff line
@@ -28,7 +28,9 @@
extern "C" {
#endif

void initialize();
void init(void);
void finit(void);
int push_job();

#ifdef __cplusplus
}; // extern "C"
+33 −19
Original line number Diff line number Diff line
@@ -32,15 +32,18 @@
#include <unistd.h>
#include <errno.h>

#define SOCKET_NAME "/tmp/dloom.socket" 
const char* SOCKET_FILE = "/tmp/dloom.socket";

/* Global variables */
int sock;
char buff[1024] = "hola";
char buff[10] = "hola";

/* Function declaration */

void init();
/* Specify init and finit function as constructor and destructor */

__attribute__((constructor)) static void init(void);
__attribute__((destructor)) static void finit(void); 

struct task{
	pid_t pid;
@@ -49,46 +52,57 @@ struct task{
};


void init(){
void init(void){
	/*
	 * 
	 */	

	
	printf("Executing this when the library is loaded\n");
	struct sockaddr_un server;
	struct task t;
	t.pid = 3;
	t.taskId = 4;

	sock = socket(AF_UNIX, SOCK_STREAM, 0);
	if (sock < 0) {
		perror("opening stream socket");
		exit(1);
		exit(EXIT_FAILURE);
	}

	server.sun_family = AF_UNIX;
	strcpy(server.sun_path, SOCKET_NAME);
	strncpy(server.sun_path, SOCKET_FILE, sizeof(server.sun_path));
	/*if (strncpy(server.sun_path, SOCKET_FILE, sizeof(server.sun_path)) < 0){
		perror("strncpy");
		exit(EXIT_FAILURE);
	}*/

	if (connect(sock, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) {
        if (close(sock) < 0)
        	exit(1);
        if (close(sock) < 0){
        	exit(EXIT_FAILURE);
        }
        perror("connecting stream socket");
        exit(1);
        exit(EXIT_FAILURE);
    }

}

void finit(){
void finit(void){
	/*
	 * 
	 */
	printf("Executing this when the library is unloaded\n");
	close(sock);
}

void push_job(){
int push_job(){
	/*
	 * 
	 * return -1 on error
	 */
	if (write(sock, &buff, sizeof(buff)) < 0)
	
	struct task t;
	t.pid = 1234;
	t.taskId = 4321;
	t.filePath = "/tmp/something";

	if (write(sock, &t, sizeof(t)) < 0){
        perror("writing on stream socket");
        return -1;
	}
	return 0;
}
+159 −79
Original line number Diff line number Diff line
@@ -38,16 +38,21 @@
#include <tbb/tbb.h>
#include <signal.h>
#include <ev.h>
#include <string>
#include <string.h>
#include <vector>
#include <ctime>

#include "ctpl.h" 

#define RUNNING_DIR "/tmp"  
#define SOCKET_FILE "/tmp/dloom.socket"  
#define DAEMON_LOCK_FILE "/tmp/dloom.lock"
#define LOG_FILE "/tmp/dloom.log"
const char* RUNNING_DIR = "/tmp";
const char* SOCKET_FILE = "/tmp/dloom.socket";  
const char* DAEMON_LOCK_FILE = "/tmp/dloom.lock";
const char* LOG_FILE = "/tmp/dloom.log";
const int WAKEUP_PERIODIC_TIME = 5;
const int N_THREADS_IN_POOL = 3;
const int MAX_CLIENTS_SUPPORTED = 20;

ev_periodic every_few_seconds;

extern "C" {
	struct foo {
@@ -70,9 +75,9 @@ struct task_finished{
};

enum job_type{
	enqueue_task,
	check_task_status, 
	check_all_tasks
	ENQUEUE_TASK,
	CHECK_TASK_STATUS,
	CHECK_ALL_TASKS
};

struct sock_ev_client;
@@ -82,7 +87,8 @@ struct sock_ev_serv {
	int fd;
	struct sockaddr_un socket;
	int socket_len;
	std::vector<sock_ev_client> clients;
	int max_clients;
	int current_clients;
};

struct sock_ev_client {
@@ -92,11 +98,15 @@ struct sock_ev_client {
	struct sock_ev_serv *server;
};


sock_ev_serv server;
tbb::concurrent_queue<task> jobs_priority_1;
tbb::concurrent_hash_map<pid_t, std::list<task_finished>> tasks_finished_map;

void log_message(const char filename[], const char message[]){
	/*
	 * In a future log messages will not be redirected to a file but to the syslog.
	 * We can use fprintf, systemd will redirect to syslog automatically.  
	 */
	time_t now = time(0);
	char* dt = ctime(&now);
	FILE *logfile;
@@ -119,7 +129,8 @@ void signal_handler(int sig){
			break;
		case SIGTERM:
			log_message(LOG_FILE, "Terminate signal catched");
			exit(0);
			log_message(LOG_FILE, "Shutting down dloom");
			exit(EXIT_SUCCESS);
			break;
		default:
			log_message(LOG_FILE, "Unknown signal received");
@@ -148,27 +159,32 @@ void daemonize() {
	pid_t pid, sid;

	/* Check if this is already a daemon */ 
	if (getpid() == 1) return;
	if (getpid() == 1){
		return;
	}
	
	/* Fork off the parent process */
	pid = fork();

	/* Fork error */
	if (pid < 0) {
		log_message(LOG_FILE, "[daemonize] fork failed.");
		perror("Fork");
		exit(1);
		exit(EXIT_FAILURE);
	}

	/* Parent exits */
	if (pid > 0) {
		exit(0);
		exit(EXIT_SUCCESS);
	}

	/* Obtain new process group */
	sid = setsid();
	if (sid < 0) {
		/* Log failure */
		throw std::exception();
		log_message(LOG_FILE, "[daemonize] setsid failed.");
		perror("Setsid");
		exit(EXIT_FAILURE);
	}

	/* Close all descriptors */
@@ -180,16 +196,25 @@ void daemonize() {
	/* Handle standard IO */

	i=open("/dev/null", O_RDWR); /* open stdin */
	dup(i); /* stdout */
	dup(i); /* stderr */
	if(-1 == dup(i)){ /* stdout */
		log_message(LOG_FILE, "[daemonize] dup[1] failed.");
		perror("dup");
		exit(EXIT_FAILURE);
	}
	if(-1 == dup(i)){ /* stderr */
		log_message(LOG_FILE, "[daemonize] dup[2] failed.");
		perror("dup");
		exit(EXIT_FAILURE);
	}

	/* Change the file mode mask */
	umask(027); /* file creation mode to 750 */

	/* Change the current working directory */
	if ((chdir(RUNNING_DIR)) < 0) {
		log_message(LOG_FILE, "[daemonize] chdir failed.");
		perror("Chdir");
		exit(1);
		exit(EXIT_FAILURE);
	}
	
	/* Check if daemon already exists:
@@ -200,18 +225,28 @@ void daemonize() {
	int lfp;
	lfp=open(DAEMON_LOCK_FILE, O_RDWR|O_CREAT, 0640);
	if(lfp<0){
		log_message(LOG_FILE, "[daemonize] can not open daemon lock file");
		perror("Can not open daemon lock file");
		exit(1);
		exit(EXIT_FAILURE);
	} 
	if(lockf(lfp, F_TLOCK, 0)<0){
		log_message(LOG_FILE, "[daemonize] another instance of this daemon already running");
		perror("Another instance of this daemon already running");
		exit(1);
		exit(EXIT_FAILURE);
	}

	/* record pid in lockfile */
	char str[10];
	sprintf(str, "%d\n", getpid());
	write(lfp, str, strlen(str));
	size_t err_snprintf;
	err_snprintf = snprintf(str, sizeof(str), "%d\n", getpid());
	if(err_snprintf >= sizeof(str)){
		log_message(LOG_FILE, "[daemonize] snprintf failed");
	}
	size_t err_write;
	err_write = write(lfp, str, strnlen(str, sizeof(str)));
	if(err_write != strnlen(str, sizeof(str))){
		log_message(LOG_FILE, "[daemonize] write failed");
	}

	/* Manage signals */
	signal(SIGCHLD,SIG_IGN); /* ignore child */
@@ -231,9 +266,10 @@ int set_non_block(int fd){
	return fcntl(fd, F_SETFL, flags);
}

int unix_socket_init(sockaddr_un* socket_un, char* sock_path){
int unix_socket_init(sockaddr_un* socket_un, const char* sock_path){
	int fd;
	if (unlink(sock_path) < 0){
		log_message(LOG_FILE, "unlink socket failed");
		perror("unlink socket");
		exit(EXIT_FAILURE);		 
	}
@@ -241,49 +277,77 @@ int unix_socket_init(sockaddr_un* socket_un, char* sock_path){
	/* Setup unix socket listener */
	fd = socket(AF_UNIX, SOCK_STREAM, 0);	
	if (fd == -1){
		log_message(LOG_FILE, "echo server socket failed");
		perror("echo server socket");
		exit(EXIT_FAILURE);
	}

	/* Set it non-blocking */
	if (set_non_block(fd) == -1){
		log_message(LOG_FILE, "echo server socket nonbloc failed");
		perror("echo server socket nonbloc");
		exit(EXIT_FAILURE);
	}

	/* Set it as Unix socket */
	socket_un->sun_family = AF_UNIX;
	strcpy(socket_un->sun_path, sock_path);
	strncpy(socket_un->sun_path, sock_path, sizeof(socket_un->sun_path));

	return fd;
}

int server_init(sock_ev_serv *server, char *sock_path, int max_queue){
	server->fd = unix_socket_init(&server->socket, sock_path);
	server->socket_len = sizeof(server->socket.sun_family) + strlen(server->socket.sun_path);
	if(bind(server->fd, (sockaddr*) &server->socket, server->socket_len) == -1){

int server_init(sock_ev_serv *serv, const char *sock_path){
	log_message(LOG_FILE, "initializing server...");
	serv->max_clients = MAX_CLIENTS_SUPPORTED;
	serv->fd = unix_socket_init(&serv->socket, sock_path);
	serv->socket_len = sizeof(serv->socket.sun_family) + strnlen(serv->socket.sun_path, sizeof(serv->socket.sun_path));
	if(bind(serv->fd, (sockaddr*) &serv->socket, serv->socket_len) == -1){
		perror("echo server bind");
		exit(EXIT_FAILURE);
	}
	if(listen(server->fd, max_queue) == -1){
	if(listen(serv->fd, serv->max_clients) < -1){
		perror("listen");
		exit(EXIT_FAILURE);
	}
	return 0;
}

static void not_blocked(EV_P_ ev_periodic *w, int revents){
	log_message(LOG_FILE, "Not blocked");
}

static void client_cb(EV_P_ ev_io *w, int revents){
static void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents){
	/* to-do: receive data */
	/* A client has become readable */
	std::string buff;
	struct sock_ev_client* client = (struct sock_ev_client*) w;
	int n = recv(client->fd, &buff, sizeof(buff), 0);
	if(n<=0){
	log_message(LOG_FILE, "Inside read_cb");
	(void)revents;
	(void)loop;

	foo t;
	ssize_t read;

	if(EV_ERROR & revents)
    {
    	log_message(LOG_FILE, "got invalid event in read_cb");
      	perror("got invalid event");
      	return;
    }

    //receive message from client socket
    read = recv(watcher->fd, &t, sizeof(t), 0);

    if(read < 0){
    	log_message(LOG_FILE, "read error");
    }

    if(read == 0){
    	//stop and free watcher if client socket is closing
    	ev_io_stop(loop, watcher);
    	free(watcher);
    	log_message(LOG_FILE, "client close socket");
    	server.current_clients = server.current_clients -1;
    	return;
    } else {
    	log_message(LOG_FILE, "we have received this message: ");
    	log_message(LOG_FILE, (std::to_string(t.taskId)).c_str());
    }

}
@@ -292,60 +356,71 @@ inline static sock_ev_client client_new(int fd){
	sock_ev_client client;
	client.fd = fd;
	set_non_block(client.fd);
	ev_io_init(&client.io, client_cb, client.fd, EV_READ);

	//ev_io_init(&client.io, client_cb, client.fd, EV_READ);
	log_message(LOG_FILE, "client_new registered");
	return client;
}

static void server_cb(EV_P_ ev_io *w, int revents){
static void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents){
	/*
	 * Callback for accepting clients.
	 */
	(void)watcher;
	(void)revents;
	log_message(LOG_FILE, "Unix stream socket has become readable");
	int client_fd;
	sock_ev_client client; 

	/* ev_io is the first member, watcher 'w' has the addres of the start of the sock_ev_serv */
	sock_ev_serv *server = (struct sock_ev_serv*) w;
	struct sockaddr_un client_addr;
	socklen_t client_len = sizeof(client_addr);
	int client_sd;
	struct ev_io *w_client = (struct ev_io*) malloc (sizeof(struct ev_io));

	while(1){
		client_fd = accept(server->fd, NULL, NULL);
		if( client_fd == -1){
			if( errno != EAGAIN && errno != EWOULDBLOCK ){
				log_message(LOG_FILE, "accept() failed");
				perror("accept()");
				exit(EXIT_FAILURE);
			}
			break;
	if(EV_ERROR & revents){
		log_message(LOG_FILE, "got invalide event");
		return;
	}
		log_message(LOG_FILE, "accepted a client");
		client = client_new(client_fd);
		client.server = server;
		server->clients.push_back(client);
		client.index = server->clients.size()-1;
		ev_io_start(EV_A_ &client.io);

	/* Accept client request */
	client_sd = accept(server.fd, (struct sockaddr *) &client_addr, &client_len);

	if(client_sd < 0){
		log_message(LOG_FILE, "accept error");
		return;
	}

	/* Increment number of clients connected */
	server.current_clients = server.current_clients + 1;

	log_message(LOG_FILE, "succesfully connected with client.");

	/* Initialize and start watcher to read client requests */

	ev_io_init(w_client, read_cb, client_sd, EV_READ);
	ev_io_start(loop, w_client);
}

void communication_thread(int id){
	int max_queue = 128;
	sock_ev_serv server;
	ev_periodic every_few_seconds;
	EV_P = ev_default_loop(0);
	(void)id;
	
	/* create unix socket in non-blocking fashion */
	server_init(&server, SOCKET_FILE, max_queue);
	/* loop is set to default. If different config is needed we need to change this. */
	struct ev_loop *loop = ev_default_loop(0);

	/* Wake up periodically */
	ev_periodic_init(&every_few_seconds, not_blocked, 0, 5, 0);
	ev_periodic_start(EV_A_ &every_few_seconds);
	/* init server and socket */
	server_init(&server, SOCKET_FILE);

	/* Get notified whenever the socket is ready to read */
	ev_io_init(&server.io, server_cb, server.fd, EV_READ);
	ev_io_start(EV_A_ &server.io);
	/* Initialize and start a watcher to accept client requests */
	ev_io_init(&server.io, accept_cb, server.fd, EV_READ);
	ev_io_start(loop, &server.io);

	log_message(LOG_FILE, "unix-socket-echo starting...");
	
	while(1){
		ev_loop(EV_A_ 0);
	}
	
	/* Only reached if the loop is manually exited */
	log_message(LOG_FILE, "loop manually exited");
	close(server.fd);
	exit(EXIT_SUCCESS);
}

void push_jobs(ctpl::thread_pool &p){
@@ -353,6 +428,11 @@ void push_jobs(ctpl::thread_pool &p){
	 * 1. If p.n_idle > 0 (probably sleep between checks)
	 * 2. Push job
	 */
	(void) p;
	while(1){
		//log_message(LOG_FILE, "[main_thread] not blocked");
		sleep(5);
	};
	/*
	while(1){
		task t;
@@ -379,7 +459,7 @@ void infinite_loop() {
	 *	Start infinite loop
	 */

	ctpl::thread_pool p(3);
	ctpl::thread_pool p(N_THREADS_IN_POOL);
	p.push(communication_thread);
	push_jobs(p);
}