Commit fb8e248c authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Clients can now register and remove backends

parent badf021e
Loading
Loading
Loading
Loading
+12 −3
Original line number Diff line number Diff line
@@ -248,16 +248,16 @@ int norns_ping() __THROW;

/* Storage backend descriptor */
struct norns_backend {
    int         b_type;
    const char* b_prefix; /* prefix ID for this backend (e.g. nvm01, tmpfs02, ...) */
    int         b_type;
    const char* b_mount; /* mount point */
    size_t      b_quota; /* backend capacity (in megabytes) allocated to the job for writing */
};

#define NORNS_BACKEND_INIT(type, prefix, mount, quota) \
#define NORNS_BACKEND_INIT(prefix, type, mount, quota) \
{ \
    .b_type = (type), \
    .b_prefix = (prefix), \
    .b_type = (type), \
    .b_mount = (mount), \
    .b_quota = (quota) \
}
@@ -324,6 +324,15 @@ int norns_add_process(struct norns_cred* auth, uint32_t jobid, uid_t uid, gid_t
/* Remove a process from a registered batch job */
int norns_remove_process(struct norns_cred* auth, uint32_t jobid, uid_t uid, gid_t gid, pid_t pid);

/* Register a backend in the local norns server */
int norns_register_backend(struct norns_cred* auth, struct norns_backend* backend);

/* Update an existing backend in the local norns server */
int norns_register_backend(struct norns_cred* auth, struct norns_backend* backend);

/* Unregister a backend from the local norns server */
int norns_unregister_backend(struct norns_cred* auth, const char* prefix);


char* norns_strerror(int errnum);

+4 −1
Original line number Diff line number Diff line
@@ -46,7 +46,10 @@ extern "C" {
#define NORNS_ERPCRECVFAILED    -7
#define NORNS_EJOBEXISTS        -8
#define NORNS_ENOSUCHJOB        -9
#define NORNS_ENOSUCHPROCESS    -10
#define NORNS_EPROCESSEXISTS    -10
#define NORNS_ENOSUCHPROCESS    -11
#define NORNS_EBACKENDEXISTS    -12
#define NORNS_ENOSUCHBACKEND    -13

#ifdef __cplusplus
}
+45 −6
Original line number Diff line number Diff line
@@ -74,6 +74,16 @@ send_request(norns_rpc_type_t type, norns_response_t* resp, ...) {
            break;
        }


        case NORNS_PING:
        {
            if((res = pack_to_buffer(type, &req_buf)) != NORNS_SUCCESS) {
                return res;
            }

            break;
        }

        case NORNS_REGISTER_JOB:
        case NORNS_UPDATE_JOB:
        case NORNS_UNREGISTER_JOB:
@@ -115,9 +125,19 @@ send_request(norns_rpc_type_t type, norns_response_t* resp, ...) {
            break;
        }

        case NORNS_PING:
        case NORNS_REGISTER_BACKEND:
        case NORNS_UPDATE_BACKEND:
        case NORNS_UNREGISTER_BACKEND:
        {
            if((res = pack_to_buffer(type, &req_buf)) != NORNS_SUCCESS) {
            const struct norns_cred* auth =
                va_arg(ap, const struct norns_cred*);
            const char* const prefix =
                va_arg(ap, const char* const);
            const struct norns_backend* backend = 
                va_arg(ap, const struct norns_backend*);

            if((res = pack_to_buffer(type, &req_buf, auth, prefix, backend)) 
                    != NORNS_SUCCESS) {
                return res;
            }

@@ -269,6 +289,23 @@ cleanup_on_error:
#endif
}

int
send_ping_request() {

    int res;
    norns_response_t resp;

    if((res = send_request(NORNS_PING, &resp)) != NORNS_SUCCESS) {
        return res;
    }

    if(resp.r_type != NORNS_PING) {
        return NORNS_ESNAFU;
    }

    return resp.r_status;
}

int 
send_job_request(norns_rpc_type_t type, struct norns_cred* auth, 
                 uint32_t jobid, struct norns_job* job) {
@@ -309,23 +346,25 @@ send_process_request(norns_rpc_type_t type, struct norns_cred* auth,
}

int
send_ping_request() {
send_backend_request(norns_rpc_type_t type, struct norns_cred* auth, 
                     const char* prefix_id, struct norns_backend* backend) {

    int res;
    norns_response_t resp;

    if((res = send_request(NORNS_PING, &resp)) != NORNS_SUCCESS) {
    if((res = send_request(type, &resp, auth, prefix_id, backend)) 
            != NORNS_SUCCESS) {
        return res;
    }

    if(resp.r_type != NORNS_PING) {
    if(resp.r_type != type) {
        return NORNS_ESNAFU;
    }

    return resp.r_status;

}


static int connect_to_daemon(void) {

	struct sockaddr_un server;
+43 −7
Original line number Diff line number Diff line
@@ -25,19 +25,32 @@
#include <sys/types.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>

#include <norns.h>
//#include <norns-rpc.h>

#include "xmalloc.h"
#include "xstring.h"
#include "daemon-communication.h"

static bool
validate_job(struct norns_job* job) {

    return (job != NULL) && (job->jb_hosts != NULL) && (job->jb_nhosts) != 0 &&
       (job->jb_backends != NULL) && (job->jb_nbackends != 0);
    return (job != NULL) && 
           (job->jb_hosts != NULL) && 
           (job->jb_nhosts) != 0 &&
           (job->jb_backends != NULL) && 
           (job->jb_nbackends != 0);
}

static bool
validate_backend(struct norns_backend* backend) {

    return (backend != NULL) && 
           (backend->b_prefix != NULL) &&
           (strncmp(backend->b_prefix, "", 1) != 0) &&
           (backend->b_mount != NULL) &&
           (strncmp(backend->b_mount, "", 1) != 0) &&
           (backend->b_quota > 0);
}

/* Public API */
@@ -52,6 +65,10 @@ norns_transfer(struct norns_iotd* iotdp) {
    return send_transfer_request(iotdp);
}

int
norns_ping() {
    return send_ping_request();
}

/* Register and describe a batch job */
int 
@@ -115,8 +132,27 @@ norns_remove_process(struct norns_cred* auth, uint32_t jobid, uid_t uid,
                                uid, gid, pid);
}

/* Register a backend in the local norns server */
int 
norns_register_backend(struct norns_cred* auth, struct norns_backend* backend) {

    if(auth == NULL || !validate_backend(backend)) {
        return NORNS_EBADARGS;
    }

    const char* const prefix = backend->b_prefix;

    return send_backend_request(NORNS_REGISTER_BACKEND, auth, prefix, backend);
}

/* Unregister a backend from the local norns server */
int 
norns_ping() {
    return send_ping_request();
norns_unregister_backend(struct norns_cred* auth, const char* prefix) {

    if(auth == NULL || prefix == NULL) {
        return NORNS_EBADARGS;
    }

    return send_backend_request(NORNS_UNREGISTER_BACKEND, auth, prefix, NULL);
}
+116 −40
Original line number Diff line number Diff line
@@ -32,6 +32,8 @@

static Norns__Rpc__Request__Task* build_task_msg(const struct norns_iotd* iotdp);
static void free_task_msg(Norns__Rpc__Request__Task* msg);
static Norns__Rpc__Request__Backend* build_backend_msg(const struct norns_backend* backend);
static void free_backend_msg(Norns__Rpc__Request__Backend* msg);
static Norns__Rpc__Request__Job* build_job_msg(const struct norns_job* job);
static void free_job_msg(Norns__Rpc__Request__Job* msg);
static void* build_membuf_msg(const struct norns_data_in* src);
@@ -43,6 +45,8 @@ remap_request(norns_rpc_type_t type) {
    switch(type) {
        case NORNS_SUBMIT_IOTASK:
            return NORNS__RPC__REQUEST__TYPE__SUBMIT_IOTASK;
        case NORNS_PING:
            return NORNS__RPC__REQUEST__TYPE__PING;
        case NORNS_REGISTER_JOB:
            return NORNS__RPC__REQUEST__TYPE__REGISTER_JOB;
        case NORNS_UPDATE_JOB:
@@ -53,8 +57,12 @@ remap_request(norns_rpc_type_t type) {
            return NORNS__RPC__REQUEST__TYPE__ADD_PROCESS;
        case NORNS_REMOVE_PROCESS:
            return NORNS__RPC__REQUEST__TYPE__REMOVE_PROCESS;
        case NORNS_PING:
            return NORNS__RPC__REQUEST__TYPE__PING;
        case NORNS_REGISTER_BACKEND:
            return NORNS__RPC__REQUEST__TYPE__REGISTER_BACKEND;
        case NORNS_UPDATE_BACKEND:
            return NORNS__RPC__REQUEST__TYPE__UPDATE_BACKEND;
        case NORNS_UNREGISTER_BACKEND:
            return NORNS__RPC__REQUEST__TYPE__UNREGISTER_BACKEND;
        default:
            return -1;
    }
@@ -65,6 +73,8 @@ remap_response(int norns_rpc_type) {
    switch(norns_rpc_type) {
        case NORNS__RPC__RESPONSE__TYPE__SUBMIT_IOTASK:
            return NORNS_SUBMIT_IOTASK;
        case NORNS__RPC__RESPONSE__TYPE__PING:
            return NORNS_PING;
        case NORNS__RPC__RESPONSE__TYPE__REGISTER_JOB:
            return NORNS_REGISTER_JOB;
        case NORNS__RPC__RESPONSE__TYPE__UPDATE_JOB:
@@ -75,8 +85,12 @@ remap_response(int norns_rpc_type) {
            return NORNS_ADD_PROCESS;
        case NORNS__RPC__RESPONSE__TYPE__REMOVE_PROCESS:
            return NORNS_REMOVE_PROCESS;
        case NORNS__RPC__RESPONSE__TYPE__PING:
            return NORNS_PING;
        case NORNS__RPC__REQUEST__TYPE__REGISTER_BACKEND:
            return NORNS_REGISTER_BACKEND;
        case NORNS__RPC__REQUEST__TYPE__UPDATE_BACKEND:
            return NORNS_UPDATE_BACKEND;
        case NORNS__RPC__REQUEST__TYPE__UNREGISTER_BACKEND:
            return NORNS_UNREGISTER_BACKEND;
        case NORNS__RPC__RESPONSE__TYPE__BAD_REQUEST:
            // intentionally fall through
        default:
@@ -95,8 +109,6 @@ build_request_msg(norns_rpc_type_t type, va_list ap) {

    norns__rpc__request__init(req_msg);

    //req_msg->type = type;

    switch(type) {
        case NORNS_SUBMIT_IOTASK:
        {
@@ -113,6 +125,15 @@ build_request_msg(norns_rpc_type_t type, va_list ap) {
            break;
        }

        case NORNS_PING:
        {

            if((req_msg->type = remap_request(type)) < 0) {
                goto cleanup_on_error;
            }
            break;
        }

        case NORNS_REGISTER_JOB:
        case NORNS_UPDATE_JOB:
        case NORNS_UNREGISTER_JOB:
@@ -141,6 +162,7 @@ build_request_msg(norns_rpc_type_t type, va_list ap) {

            break;
        }

        case NORNS_ADD_PROCESS:
        case NORNS_REMOVE_PROCESS:
        {
@@ -171,14 +193,44 @@ build_request_msg(norns_rpc_type_t type, va_list ap) {
            break;
        }

        case NORNS_PING:
        case NORNS_REGISTER_BACKEND:
        case NORNS_UPDATE_BACKEND:
        case NORNS_UNREGISTER_BACKEND:
        {
            const struct norns_cred* auth =
                va_arg(ap, const struct norns_cred*);
            const char* const prefix =
                va_arg(ap, const char* const);
            const struct norns_backend* backend = 
                va_arg(ap, const struct norns_backend*);

            (void) auth;

            if((req_msg->type = remap_request(type)) < 0) {
                goto cleanup_on_error;
            }

            if(type == NORNS_UNREGISTER_BACKEND) {
                req_msg->prefix = xstrdup(prefix);

                if(req_msg->prefix == NULL) {
                    goto cleanup_on_error;
                }

                req_msg->backend = NULL;
            }
            else {

                req_msg->prefix = NULL;

                if((req_msg->backend = build_backend_msg(backend)) == NULL) {
                    goto cleanup_on_error;
                }
            }

            break;
        }

    }

    return req_msg;
@@ -195,6 +247,11 @@ void
free_request_msg(Norns__Rpc__Request* req) {
}

void
free_backend_msg(Norns__Rpc__Request__Backend* msg) {
    //TODO
}

void 
free_job_msg(Norns__Rpc__Request__Job* msg) {

@@ -224,6 +281,46 @@ free_job_msg(Norns__Rpc__Request__Job* msg) {
    xfree(msg);
}

Norns__Rpc__Request__Backend*
build_backend_msg(const struct norns_backend* backend) {

    assert(backend != NULL);

    Norns__Rpc__Request__Backend* backendmsg = 
        (Norns__Rpc__Request__Backend*) xmalloc(sizeof(*backendmsg));

    if(backendmsg == NULL) {
        return NULL;
    }

    norns__rpc__request__backend__init(backendmsg);

    backendmsg->prefix = xstrdup(backend->b_prefix);

    if(backendmsg->prefix == NULL) {
        goto error_cleanup;
    }

    backendmsg->type = backend->b_type;
    backendmsg->mount = xstrdup(backend->b_mount);

    if(backendmsg->mount == NULL) {
        goto error_cleanup;
    }

    backendmsg->quota = backend->b_quota;

    return backendmsg;

error_cleanup:

    if(backendmsg != NULL) {
        free_backend_msg(backendmsg);
    }

    return NULL;
}

Norns__Rpc__Request__Job*
build_job_msg(const struct norns_job* job) {

@@ -261,20 +358,28 @@ build_job_msg(const struct norns_job* job) {
    // add backends
    jobmsg->n_backends = job->jb_nbackends;
    jobmsg->backends = 
        xmalloc(job->jb_nbackends*sizeof(Norns__Rpc__Request__Job__Backend*));
        xmalloc(job->jb_nbackends*sizeof(Norns__Rpc__Request__Backend*));
    
    if(jobmsg->backends == NULL){
        goto error_cleanup;
    }

    for(size_t i=0; i<job->jb_nbackends; ++i) {
        jobmsg->backends[i] = xmalloc(sizeof(Norns__Rpc__Request__Job__Backend));

        jobmsg->backends[i] = build_backend_msg(job->jb_backends[i]);

        if(jobmsg->backends[i] == NULL) {
            goto error_cleanup;
        }

        norns__rpc__request__job__backend__init(jobmsg->backends[i]);
        /*
        jobmsg->backends[i] = xmalloc(sizeof(Norns__Rpc__Request__Backend));

        if(jobmsg->backends[i] == NULL) {
            goto error_cleanup;
        }

        norns__rpc__request__backend__init(jobmsg->backends[i]);
        jobmsg->backends[i]->type = job->jb_backends[i]->b_type;
        jobmsg->backends[i]->mount = xstrdup(job->jb_backends[i]->b_mount);

@@ -283,6 +388,7 @@ build_job_msg(const struct norns_job* job) {
        }

        jobmsg->backends[i]->quota = job->jb_backends[i]->b_quota;
        */
    }

    return jobmsg;
@@ -497,36 +603,6 @@ pack_to_buffer(norns_rpc_type_t type, msgbuffer_t* buf, ...) {

    return NORNS_SUCCESS;

#if 0
    va_list ap;
    va_start(ap, buf);

    Norns__Rpc__Request* req = NULL;
    void* req_buf = NULL;
    size_t req_len = 0;

    if((req = build_request_msg2(type, ap)) == NULL) {
        goto cleanup_on_error;
    }

    norns__rpc__request__pack(req, req_buf);

    req_len = norns__rpc__request__get_packed_size(req);
    req_buf = xmalloc_nz(req_len);

    if(req_buf == NULL) {
        goto cleanup_on_error;
    }

    buf->b_data = req_buf;
    buf->b_size = req_len;

    va_end(ap);
    free_request_msg(req);

    return NORNS_SUCCESS;
#endif

cleanup_on_error:
    if(req_msg != NULL) {
        free_request_msg(req_msg);
Loading