Commit 784ca5cf authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Complex responses can now be sent to clients

parent 8db63787
Loading
Loading
Loading
Loading
+10 −2
Original line number Diff line number Diff line
@@ -71,7 +71,6 @@ int main(int argc, char* argv[]) {
    }

    struct norns_job job = {
        .jb_jobid = 42,
        .jb_hosts = hosts,
        .jb_nhosts = num_hosts,
        .jb_backends = backends,
@@ -81,8 +80,17 @@ int main(int argc, char* argv[]) {

    struct norns_cred cred;

    norns_register_job(&cred, &job);
    int rv;

    // try to register a duplicate jobid
    for(int i=0; i<2; ++i) {
        if((rv = norns_register_job(&cred, 42, &job)) != NORNS_SUCCESS) {
            fprintf(stderr, "norns_register_job failed!\n");
        }
        else {
            fprintf(stdout, "norns_register_job succeded!\n");
        }
    }

    NORNS_PLIST_FREE(hosts);
    NORNS_PLIST_FREE(backends);
+10 −4
Original line number Diff line number Diff line
@@ -37,6 +37,8 @@ __BEGIN_DECLS
#define NORNS_ENOMEM            -2
#define NORNS_ECONNFAILED       -3
#define NORNS_ERPCSENDFAILED    -4
#define NORNS_ERPCRECVFAILED    -5
#define NORNS_EJOBEXISTS        -6

typedef uint32_t jobid_t;

@@ -175,7 +177,6 @@ struct norns_backend {

/* Batch job descriptor */
struct norns_job {
    uint32_t                jb_jobid; /* desired job ID (for later requests) */
    const char**            jb_hosts;  /* NULL-terminated list of hostnames participating in the job */
    size_t                  jb_nhosts; /* entries in hostname list */
    struct norns_backend**  jb_backends; /* NULL-terminated list of storage backends the job will use */
@@ -187,14 +188,19 @@ struct norns_job {
int norns_command(struct norns_cred* auth);

/* Register and describe a batch job */
int norns_register_job(struct norns_cred* auth, struct norns_job* job);
int norns_register_job(struct norns_cred* auth, uint32_t jobid, struct norns_job* job);

/* Update the description of an existing batch job */
int norns_update_job(struct norns_cred* auth, struct norns_job* job);
int norns_update_job(struct norns_cred* auth, uint32_t jobid, struct norns_job* job);

/* Remove the description of a batch job */
int norns_remove_job(struct norns_cred* auth, struct norns_job* job);
int norns_remove_job(struct norns_cred* auth, uint32_t jobid, struct norns_job* job);

/* Add a process to a registered batch job */
int norns_add_process(struct norns_cred* auth, uint32_t jobid, pid_t pid);

/* Remove a process from a registered batch job */
int norns_remove_process(struct norns_cred* auth, uint32_t jobid, pid_t pid);


__END_DECLS
+116 −7
Original line number Diff line number Diff line
@@ -134,6 +134,32 @@ int norns_transfer(struct norns_iotd* iotdp) {
    return 0;
}

ssize_t recv_data(int conn, void* data, size_t size) {

    size_t brecvd = 0; // bytes received
    size_t bleft = size; // bytes left to receive
    ssize_t n = 0;

	while(brecvd < size) {
		n = read(conn, data + brecvd, bleft);

		fprintf(stdout, "read %zd\n", n);

		if(n == -1 || n == 0) {
		    if(errno == EINTR) {
		        continue;
            }
			break;
		}

        brecvd += n;
        bleft -= n;
	}

	return (n == -1 ? n : (ssize_t) brecvd);
}


ssize_t send_data(int conn, const void* data, size_t size) {

	size_t bsent = 0;	// bytes sent
@@ -147,6 +173,9 @@ ssize_t send_data(int conn, const void* data, size_t size){
		n = write(conn, data + bsent, bleft);

		if(n == -1) {
		    if(errno == EINTR) {
		        continue;
            }
			break;
		}

@@ -157,30 +186,90 @@ ssize_t send_data(int conn, const void* data, size_t size){
	return (n == -1 ? n : (ssize_t) bsent);
}

static void print_hex(void* buffer, size_t bytes) {

    unsigned char* p = (unsigned char*) buffer;

    fprintf(stdout, "<< ");

    for(size_t i = 0; i < bytes; ++i) {
        fprintf(stdout, "%02x ", (int) p[i]);
    }

    fprintf(stdout, " >>\n");
}


static int send_message(int conn, const void* msg, size_t msg_size) {

    // transform the message size into network order and send it 
    // before the actual data

    uint64_t prefix = htonll(msg_size);

    assert(sizeof(prefix) == NORNS_RPC_HEADER_LENGTH); 

	if(send_data(conn, &prefix, sizeof(prefix)) < 0) {
        return NORNS_ERPCSENDFAILED;
        return -1;
    }

	if(send_data(conn, msg, msg_size) < 0) {
        return NORNS_ERPCSENDFAILED;
        return -1;
    }

    return NORNS_SUCCESS;
    return 0;
}

static int recv_message(int conn, void** msg, size_t* msg_size) {

    // first of all read the message prefix and decode it 
    // so that we know how much data to receive
    uint64_t prefix = 0;

    if(recv_data(conn, &prefix, sizeof(prefix)) < 0) {
        goto recv_error;
    }

    print_hex(&prefix, sizeof(prefix));

    size_t expected_size = ntohll(prefix);

    if(expected_size == 0) {
        goto recv_error;
    }

    void* buffer = malloc(expected_size);

    if(buffer == NULL) {
        goto recv_error;
    }

    if(recv_data(conn, buffer, expected_size) < 0) {
        free(buffer);
        goto recv_error;
    }

    print_hex(buffer, expected_size);

    *msg = buffer;
    *msg_size = expected_size;

    return 0;

recv_error:
    *msg = NULL;
    *msg_size = 0;

    return -1;
}

/* Register and describe a batch job */
int norns_register_job(struct norns_cred* auth, struct norns_job* job) {
int norns_register_job(struct norns_cred* auth, uint32_t jobid, struct norns_job* job) {

    (void) auth;

    int rv;

    if(job->jb_nhosts <= 0 || job->jb_nbackends <= 0) {
        return NORNS_EBADPARAMS;
    }
@@ -188,7 +277,7 @@ int norns_register_job(struct norns_cred* auth, struct norns_job* job) {
    // first of all, build the request body so that 
    // we can compute the final size
    Norns__Rpc__Request__Job jobmsg = NORNS__RPC__REQUEST__JOB__INIT;
    jobmsg.id = job->jb_jobid;
    jobmsg.id = jobid;
    jobmsg.n_hosts = job->jb_nhosts; // save number of repeated hosts
    jobmsg.hosts = malloc(jobmsg.n_hosts*sizeof(char*));

@@ -271,8 +360,28 @@ int norns_register_job(struct norns_cred* auth, struct norns_job* job) {
	    return NORNS_ERPCSENDFAILED;
    }

    // wait for the daemon's response
    void* resp_buf;
    size_t resp_len;

    if(recv_message(conn, &resp_buf, &resp_len) < 0) {
        return NORNS_ERPCRECVFAILED;
    }

    close(conn);

    return NORNS_SUCCESS;
    Norns__Rpc__Response* resp = 
        norns__rpc__response__unpack(NULL, resp_len, resp_buf);

    if(resp == NULL) {
        return NORNS_ERPCRECVFAILED;
    }

    rv = resp->code;

    free(resp_buf);
    norns__rpc__response__free_unpacked(resp, NULL);

    return rv;
}
+10 −5
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ message Request {
    // oneof would be better here, but the version of protoc provided by
    // CentOS 7 does not support it yet
    enum Type {
        START_IOTASK = 1;
        SUBMIT_IOTASK = 1;
        REGISTER_JOB = 2;
    }

@@ -29,15 +29,20 @@ message Request {
        }

        repeated Backend backends = 3;

    //    optional int32 a=3;
    //    optional int32 b=4;
    }
    optional Job job = 3;
}

message Response {
    required uint64 id = 1;
    enum Type {
        SUBMIT_IOTASK = 1;
        REGISTER_JOB = 2;
    }

    required Type type = 1;

    // most responses only need to return an error code
    required uint32 code = 2;
}

src/Makefile.am

0 → 100644
+110 −0
Original line number Diff line number Diff line
# Copyright (C) 2017 Barcelona Supercomputing Center
#                    Centro Nacional de Supercomputacion
#
# This file is part of the Data Scheduler, a daemon for tracking and managing
# requests for asynchronous data transfer in a hierarchical storage environment.
#
# See AUTHORS file in the top level directory for information
# regarding developers and contributors.
#
# The Data Scheduler is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Data Scheduler is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Data Scheduler.  If not, see <http://www.gnu.org/licenses/>.
#

ACLOCAL_AMFLAGS = -I m4

bin_PROGRAMS = urd

MOSTLYCLEANFILES = \
	defaults.cpp \
	messages.pb.cc \
	messages.pb.h

urd_SOURCES = \
	$(top_srcdir)/rpc/norns-rpc.h \
	ipc-listener.hpp \
	backends.cpp \
	backends.hpp \
	defaults.hpp \
	nvml-dax.cpp \
	nvml-dax.hpp \
	main.cpp	 \
	messages.hpp \
	posix-fs.cpp \
	posix-fs.hpp \
	response-base.hpp \
	responses.cpp \
	responses.hpp \
	request-base.cpp \
	request-base.hpp \
	requests.hpp \
	requests.cpp \
	settings.cpp \
	settings.hpp \
	urd.cpp	\
	urd.hpp	\
	utils.cpp \
	utils.hpp

nodist_urd_SOURCES = \
	defaults.cpp \
	messages.pb.cc \
	messages.pb.h

urd_CXXFLAGS = \
	@TBB_CFLAGS@ \
	-std=gnu++11 -Wall -Wextra

urd_CPPFLAGS = \
	-DSPDLOG_ENABLE_SYSLOG \
	@BOOST_CPPFLAGS@ \
	-I$(top_srcdir)/include \
	-I$(top_srcdir)/src	\
	-I$(top_srcdir)/rpc	\
	-I$(top_builddir)/rpc

urd_LDFLAGS = 				\
	@TBB_LIBS@	  				\
	@BOOST_LDFLAGS@				\
    @BOOST_SYSTEM_LIB@			\
    @BOOST_ASIO_LIB@			\
    @BOOST_PROGRAM_OPTIONS_LIB@	\
	@PROTOBUF_LIBS@ \
	-pthread

urd_LDADD = \
	-lboost_system \
	-lboost_thread

BUILT_SOURCES = \
	defaults.hpp \
	messages.pb.cc \
	messages.pb.h

defaults.cpp: Makefile
	@( echo "/* This file autogenerated by Makefile */"; \
	   echo "#include \"defaults.hpp\""; \
	   echo ""; \
	   echo "namespace defaults {"; \
	   echo "    const char* progname           = \"urd\";"; \
	   echo "    const bool  daemonize          = true;"; \
	   echo "    const char* running_dir        = \"/tmp\";"; \
	   echo "    const char* ipc_sockfile       = \"/tmp/urd.socket\";"; \
	   echo "    const char* daemon_pidfile     = \"/tmp/urd.pid\";"; \
	   echo "    const uint32_t workers_in_pool = std::thread::hardware_concurrency();"; \
	   echo "    const char* config_file        = \"$(sysconfdir)/norns.conf\";"; \
	   echo "} // namespace defaults"; \
	 ) > $@

%.pb.cc %.pb.h: $(top_srcdir)/rpc/%.proto
	$(PROTOC) --proto_path=$(top_srcdir)/rpc --cpp_out=$(builddir) $^
Loading