Commit fdea4289 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Replace libev with boost::asio

parent dcf780bd
Loading
Loading
Loading
Loading
+11 −4
Original line number Diff line number Diff line
@@ -30,7 +30,10 @@ DIST_SUBDIRS = lib examples
bin_PROGRAMS = src/urd

src_urd_SOURCES = 			\
	src/urd.cpp
	src/main.cpp			\
	src/ipc-listener.hpp	\
	src/urd.cpp				\
	src/urd.hpp

src_urd_CXXFLAGS = 				\
	@TBB_CFLAGS@ 				\
@@ -41,5 +44,9 @@ src_urd_CPPFLAGS = \
	-I$(top_srcdir)/include

src_urd_LDFLAGS = \
	@TBB_LIBS@
	@LIBEV_LIBS@
	@TBB_LIBS@	  \
	@LIBEV_LIBS@  \
	-pthread

src_urd_LDADD = 	\
	-lboost_system
+21 −4
Original line number Diff line number Diff line
@@ -21,20 +21,37 @@
 * along with Data Scheduler.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <norns.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>


int main() {
    printf("Hello, World! I'm the app \n");
#include <norns.h>

void print_iotd(struct norns_iotd* iotdp){
    fprintf(stdout, "iotd -> struct nornds_iotd {\n");
    fprintf(stdout, "  ni_tid = %d;\n", iotdp->ni_tid);
    fprintf(stdout, "};\n");
}


int main() {
    struct norns_iotd iotd = {
        .ni_tid = 0,
    };

    fprintf(stdout, "calling norns_transfer(&iotd)\n");
    print_iotd(&iotd);

    if(norns_transfer(&iotd) != 0) {
    	printf("Error with push job \n");
    	fprintf(stderr, "norns_transfer error: %s \n", strerror(errno));
    	exit(EXIT_FAILURE);
    }

    fprintf(stdout, "norns_transfer() succeeded!\n");
    fprintf(stdout, "output from submission:\n");
    print_iotd(&iotd);

    return 0;
}
+24 −5
Original line number Diff line number Diff line
@@ -45,22 +45,41 @@ enum {
    NORNS_MOVE
};


/* I/O task status descriptor */
struct norns_iotst {
    int ni_status;  /* current status */
};

/* Status codes */
enum {
    NORNS_WAITING,
    NORNS_INPROGRESS,
    NORNS_COMPLETE
};


void norns_init() __THROW;

int norns_getconf() __THROW;

/* Enqueue an asynchronous I/O task */
int norns_transfer(struct norns_iotd* iotdp) __THROW __nonnull((1));
int norns_transfer(struct norns_iotd* iotdp) __THROW;

/* Try to cancel an asynchronous I/O task associated with iotdp */
ssize_t norns_cancel(struct norns_iotd* iotdp) __THROW __nonnull((1));
ssize_t norns_cancel(struct norns_iotd* iotdp) __THROW;

/* Retrieve return status associated with iotdp */
ssize_t norns_return(struct norns_iotd* iotdp) __THROW __nonnull((1));
ssize_t norns_return(struct norns_iotd* iotdp) __THROW;

/* Retrieve current status associated with iotdp */
ssize_t norns_progress(struct norns_iotd* iotdp) __THROW __nonnull((1));
ssize_t norns_progress(struct norns_iotd* iotdp, struct norns_iotst* statp) __THROW;

/* Retrieve status associated with all current tasks */
ssize_t norns_progress_all(struct norns_iotst** statp) __THROW;

/* Retrieve error status associated with iotdp */
int norns_error(struct norns_iotd* iotdp) __THROW __nonnull((1));
int norns_error(struct norns_iotd* iotdp) __THROW;

__END_DECLS

+64 −9
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@
#include <stdlib.h>
#include <stdint.h>
#include <unistd.h>
#include <errno.h>

#include <norns.h>

@@ -47,6 +48,7 @@ __attribute__((destructor)) static void __norns_finit(void);

void __norns_init(){

#if 0
	struct sockaddr_un server;

	sock = socket(AF_UNIX, SOCK_STREAM, 0);
@@ -66,9 +68,31 @@ void __norns_init(){
        perror("connecting stream socket");
        exit(EXIT_FAILURE);
    }
#endif

}

static int connect_to_daemon(void){

	struct sockaddr_un server;

	int sfd = socket(AF_UNIX, SOCK_STREAM, 0);

	if (sfd < 0) {
	    return -1;
	}

	server.sun_family = AF_UNIX;
	strncpy(server.sun_path, SOCKET_FILE, sizeof(server.sun_path));
	server.sun_path[sizeof(server.sun_path)-1] = '\0';

	if (connect(sfd, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) {
	    return -1;
    }

    return sfd;
}

void __norns_finit(void){
	/*
	 * 
@@ -78,18 +102,49 @@ void __norns_finit(void){
}

int norns_transfer(struct norns_iotd* iotdp) {
	/*
	 * return -1 on error
	 */
    struct norns_iotd* iotd_copy = (struct norns_iotd*) 
        malloc(sizeof(*iotd_copy));

	struct norns_iotd t;
	t.ni_tid = 1234;
	t.ni_ibid = 4321;
    if(iotd_copy == NULL){
        errno = ENOMEM;
        return -1;
    }

    memcpy(iotd_copy, iotdp, sizeof(*iotd_copy));

    int sfd = connect_to_daemon();

    if(sfd == -1){
        // errno should have been correctly set by connect_to_daemon()
        return -1;
    }
	
	if (write(sock, &t, sizeof(t)) < 0){
	// connection established, send the request descriptor to the daemon and 
	// wait for a response
	if (write(sfd, iotd_copy, sizeof(*iotd_copy)) < 0){
        perror("writing on stream socket");
    }
    close(sock);

    struct norns_iotd response;

    size_t nbytes = 0;

    while(nbytes < sizeof(response)){
        ssize_t n = 0; 
        
        if((n = read(sfd, ((void*)&response)+n, sizeof(response))) == -1 ){
            return -1;
        }

        nbytes += n;
    }

    // copy data from response to user-provided structure
    memcpy(iotdp, &response, sizeof(response));

    free(iotd_copy);

    close(sfd);

    return 0;
}

src/ipc-listener.hpp

0 → 100644
+119 −0
Original line number Diff line number Diff line
//
// Copyright (C) 2017 Barcelona Supercomputing Center
//                    Centro Nacional de Supercomputacion
//
// This file is part of the Data Scheduler, a daemon for tracking and managing
// requests for asynchronous data transfer in a hierarchical storage environment.
//
// See AUTHORS file in the top level directory for information
// regarding developers and contributors.
//
// The Data Scheduler is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Data Scheduler is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Data Scheduler.  If not, see <http://www.gnu.org/licenses/>.
//

#ifndef __IPC_LISTENER_HPP__
#define __IPC_LISTENER_HPP__

#include <boost/asio.hpp>
#include <norns.h>

namespace ba = boost::asio;

/* simple lister for an AF_UNIX socket that accepts requests asynchronously and
 * invokes a callback with a fixed-length payload */
template <typename Payload>
class ipc_listener {

    typedef std::function<void(Payload*)> callback_t;

    /* helper class for managing communication sessions with a client */
    template <typename T>
    class session : public std::enable_shared_from_this<session<T>> {

        static const int32_t max_buffer_length = sizeof(T);

    public:
        session(ba::local::stream_protocol::socket socket, callback_t callback)
            : m_socket(std::move(socket)),
              m_callback(callback) {}

        void start(){
            do_read();
        }

    private:
        void do_read(){

            auto self(std::enable_shared_from_this<session<T>>::shared_from_this());

            m_socket.async_read_some(boost::asio::buffer(m_data, max_buffer_length),
                [this, self](boost::system::error_code ec, std::size_t length){
                    if(!ec){
                        T* payload = (T*) &m_data;
                        m_callback(payload);
                        do_write(length);
                    }
                });
        }

        void do_write(std::size_t length){

            auto self(std::enable_shared_from_this<session<T>>::shared_from_this());

            ba::async_write(m_socket, boost::asio::buffer(m_data, length),
                [this, self](boost::system::error_code ec, std::size_t /*length*/){
                    if(!ec){
                        do_read();
                    }
                });
        }

        ba::local::stream_protocol::socket m_socket;
        callback_t                         m_callback;
        char                               m_data[max_buffer_length];
    };

public:
    ipc_listener(const char* socket_file, callback_t callback) 
        : m_acceptor(m_ios, ba::local::stream_protocol::endpoint(socket_file)),
          m_socket(m_ios),
          m_callback(callback) {
        start_accept();
    }

    void run() {
        m_ios.run();
    }

private:
    void start_accept(){
        /* start an asynchronous accept: the call to async_accept returns immediately, 
         * and we use a lambda function as the handler */
        m_acceptor.async_accept(m_socket,
            [this](const boost::system::error_code& ec){
                if(!ec){
                    std::make_shared<session<Payload>>(std::move(m_socket), m_callback)->start();
                }

                start_accept();
            });
    }

    boost::asio::io_service                 m_ios;
    ba::local::stream_protocol::acceptor    m_acceptor;
    ba::local::stream_protocol::socket      m_socket;
    callback_t                              m_callback;
};

#endif /* __IPC_LISTENER_HPP__ */
Loading