Commit 4aaff69b authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Urd daemon can now listen to requests from different sources

Added support for the API listener to receive requests from
different endpoints: local sockets for user/control requests and
remote sockets for requests from other daemons. Signals are also
managed by the API listener.
parent 0453a331
Loading
Loading
Loading
Loading
+3 −1
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ libnorns_la_SOURCES = \
	norns.c	\
	daemon-communication.c \
	daemon-communication.h \
	defaults.h \
	errors.c \
	requests.c \
	requests.h \
@@ -84,7 +85,8 @@ defaults.c: Makefile
	@( echo "/* This file was autogenerated by Makefile */"; \
	   echo "#include \"defaults.h\""; \
	   echo ""; \
	   echo "const char* norns_api_sockfile = \"/tmp/urd.socket\";"; \
	   echo "const char* norns_api_global_socket = \"$(localstatedir)/urd/global.socket.2\";"; \
	   echo "const char* norns_api_control_socket = \"$(localstatedir)/urd/control.socket.2\";"; \
	) > $@

%.pb-c.c %.pb-c.h: $(top_srcdir)/rpc/%.proto
+1 −1
Original line number Diff line number Diff line
@@ -324,7 +324,7 @@ connect_to_daemon(void) {
	}

	server.sun_family = AF_UNIX;
	strncpy(server.sun_path, norns_api_sockfile, sizeof(server.sun_path));
	strncpy(server.sun_path, norns_api_global_socket, sizeof(server.sun_path));
	server.sun_path[sizeof(server.sun_path)-1] = '\0';

	if (connect(sfd, (struct sockaddr *) &server, sizeof(struct sockaddr_un)) < 0) {
+2 −1
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@
#ifndef __DEFAULTS_H__
#define  __DEFAULTS_H__

extern const char* norns_api_sockfile;
extern const char* norns_api_global_socket;
extern const char* norns_api_control_socket;

#endif /* __DEFAULTS_H__ */
+10 −3
Original line number Diff line number Diff line
@@ -128,24 +128,31 @@ liburd_aux_la_LDFLAGS = \
	-pthread

BUILT_SOURCES = \
	defaults.hpp \
	defaults.cpp \
	messages.pb.cc \
	messages.pb.h

defaults.cpp: Makefile
	@( echo "/* This file autogenerated by Makefile */"; \
	   echo "#include <thread>"; \
	   echo "#include \"defaults.hpp\""; \
	   echo ""; \
	   echo "namespace norns {"; \
	   echo "namespace defaults {"; \
	   echo "    const char* progname           = \"urd\";"; \
	   echo "    const bool  daemonize          = true;"; \
	   echo "    const bool  use_syslog         = false;"; \
	   echo "    const char* running_dir        = \"/tmp\";"; \
	   echo "    const char* ipc_sockfile       = \"/tmp/urd.socket\";"; \
	   echo "    const char* daemon_pidfile     = \"/tmp/urd.pid\";"; \
\
	   echo "    const char* global_socket      = \"$(localstatedir)/urd/global.socket.2\";"; \
	   echo "    const char* control_socket     = \"$(localstatedir)/urd/control.socket.2\";"; \
	   echo "    const in_port_t remote_port    = 42000;"; \
	   echo "    const char* pidfile            = \"$(localstatedir)/urd/urd.pid\";"; \
\
	   echo "    const uint32_t workers_in_pool = std::thread::hardware_concurrency();"; \
	   echo "    const char* config_file        = \"$(sysconfdir)/norns.conf\";"; \
	   echo "} // namespace defaults"; \
	   echo "} // namespace norns"; \
	 ) > $@

%.pb.cc %.pb.h: $(top_srcdir)/rpc/%.proto
+57 −135
Original line number Diff line number Diff line
@@ -28,155 +28,83 @@
#ifndef __API_LISTENER_HPP__
#define __API_LISTENER_HPP__

#include <thread>
#include <boost/asio.hpp>
#include <boost/filesystem.hpp>

#include "api/dispatch-table.hpp"

namespace norns {
namespace api {
#include "api/local-endpoint.hpp"
#include "api/remote-endpoint.hpp"
#include "api/signal-listener.hpp"

namespace ba = boost::asio;
namespace bfs = boost::filesystem;

namespace norns {
namespace api {

/* simple lister for an AF_UNIX socket that accepts requests asynchronously and
 * invokes a callback with a fixed-length payload */
template <typename Message>
using dispatcher = dispatch_table<
class listener {

    using Dispatcher = dispatch_table<
        typename Message::key_type, 
        typename Message::key_hash_type, 
        std::unique_ptr<typename Message::response_type>,
        std::unique_ptr<typename Message::request_type>
            >;

/* helper class for managing communication sessions with a client */
template <typename Message>
class session : public std::enable_shared_from_this<session<Message>> {

    using Input = std::unique_ptr<typename Message::request_type>;
    using Output = std::unique_ptr<typename Message::response_type>;
    using MessageKey = typename Message::key_type;

public:
    session(ba::local::stream_protocol::socket socket, std::shared_ptr<dispatcher<Message>> callbacks)
        : m_socket(std::move(socket)),
          m_callbacks(callbacks) {}
    explicit listener() :
        m_ios(),
        m_msg_handlers(std::make_shared<Dispatcher>()), 
        m_signal_listener(m_ios) { }

    ~session() {
//        std::cerr << "session dying\n";
    }

    void start(){
        do_read_request();
    ~listener() {
        Message::cleanup();
    }

private:
    void do_read_request(){

        auto self(std::enable_shared_from_this<session<Message>>::shared_from_this());

        std::size_t header_length = m_message.expected_length(Message::header);

        // read the request header and use the information provided in it
        // to read the request body
        ba::async_read(m_socket,
                ba::buffer(m_message.buffer(Message::header), header_length),
                [this, self](boost::system::error_code ec, std::size_t length) {
    void run() {

                    if(!ec && m_message.decode_header(length)) {
                        //FIXME: check what happens if the caller never
                        //sends a body... are we leaking?
                        do_read_request_body();
                    }
                });
        for(auto& endp : m_local_endpoints) {
            endp->do_accept();
        }

    void do_read_request_body() {

        auto self(std::enable_shared_from_this<session<Message>>::shared_from_this());

        std::size_t body_length = m_message.expected_length(Message::body);

        if(body_length != 0) {
            ba::async_read(m_socket,
                    ba::buffer(m_message.buffer(Message::body), body_length),
                    [this, self](boost::system::error_code ec, std::size_t length) {

                        if(!ec) {
                            Input req = m_message.decode_body(length);

                            Output resp = m_callbacks->run(req->type(), std::move(req));

                            assert(resp != nullptr);

                            m_message.clear();

                            if(m_message.encode_response(std::move(resp))) {
                                do_write_response();
        for(auto& endp : m_remote_endpoints) {
            endp->do_accept();
        }
                        }
                    });
        }
    }

    void do_write_response() {

        std::vector<ba::const_buffer> buffers;
        buffers.push_back(ba::buffer(m_message.buffer(Message::header)));
        buffers.push_back(ba::buffer(m_message.buffer(Message::body)));
        m_signal_listener.do_accept();

        //Message::print_hex(m_message.buffer(Message::header));
        //Message::print_hex(m_message.buffer(Message::body));

        auto self(std::enable_shared_from_this<session<Message>>::shared_from_this());

        ba::async_write(m_socket, buffers,
            [this, self](boost::system::error_code ec, std::size_t /*length*/){

//                std::cerr << "Writing done!\n";

                if(!ec){
                    //do_read_request();
                }
            });
        m_ios.run();
    }

    ba::local::stream_protocol::socket  m_socket;
    Message                             m_message;

    std::shared_ptr<dispatcher<Message>> m_callbacks;
};


/* simple lister for an AF_UNIX socket that accepts requests asynchronously and
 * invokes a callback with a fixed-length payload */
template <typename Message>
class listener {

    using MessageKey = typename Message::key_type;

public:
    listener(const bfs::path& socket_file)
        : m_acceptor(m_ios, ba::local::stream_protocol::endpoint(socket_file.string())),
          m_socket(m_ios),
          m_callbacks(std::make_shared<dispatcher<Message>>()) {
        do_accept();
    void stop() {
        m_ios.stop();
    }

    ~listener() {
        //std::cerr << "Called!\n";
        Message::cleanup();
    template <typename Callable>
    void register_callback(MessageKey k, Callable&& func) {
        m_msg_handlers->add(k, std::forward<Callable>(func));
    }

    void run() {
        m_ios.run();
    /* register a socket endpoint from which to accept local requests */
    void register_endpoint(const bfs::path& sockfile) {
        m_local_endpoints.emplace_back(
                std::make_shared<local_endpoint<Message>>(sockfile, m_ios, m_msg_handlers));
    }

    void stop() {
        m_ios.stop();
    /* register a socket endpoint from which to accept remote requests */
    void register_endpoint(short port) {
        m_remote_endpoints.emplace_back(
                std::make_shared<remote_endpoint<Message>>(port, m_ios, m_msg_handlers));
    }

    template <typename Callable>
    void register_callback(MessageKey k, Callable&& func) {
        m_callbacks->add(k, std::forward<Callable>(func));
    template <typename... Args>
    void set_signal_handler(signal_listener::SignalHandler handler, Args... signums) {
        m_signal_listener.set_handler(handler, std::forward<Args>(signums)...);
    }

    static void cleanup() {
@@ -184,26 +112,20 @@ public:
    }

private:
    void do_accept() {
        /* start an asynchronous accept: the call to async_accept returns immediately, 
         * and we use a lambda function as the handler */
        m_acceptor.async_accept(m_socket,
            [this](const boost::system::error_code& ec) {
                if(!ec) {
                    std::make_shared<session<Message>>(
                        std::move(m_socket),
                        m_callbacks)->start();
                }
    /*! Main io_service */
    boost::asio::io_service                m_ios;

                do_accept();
            });
    }
    /*! Dispatcher of message handlers */
    std::shared_ptr<Dispatcher> m_msg_handlers;

    boost::asio::io_service                m_ios;
    ba::local::stream_protocol::acceptor   m_acceptor;
    ba::local::stream_protocol::socket     m_socket;
    /*! Dispatcher of signal listener */
    signal_listener m_signal_listener;

    /*! Local endpoints */
    std::vector<std::shared_ptr<local_endpoint<Message>>>  m_local_endpoints;

    std::shared_ptr<dispatcher<Message>> m_callbacks;
    /*! Remote endpoints */
    std::vector<std::shared_ptr<remote_endpoint<Message>>> m_remote_endpoints;
};

} // namespace api
Loading