Commit dcbae549 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Initial support for I/O tasks

parent 9d77f618
Loading
Loading
Loading
Loading
+15 −1
Original line number Diff line number Diff line
@@ -23,7 +23,8 @@

bin_PROGRAMS = \
	app \
	job_registration
	job_registration \
	task_submission

app_SOURCES = \
	app.c
@@ -49,3 +50,16 @@ job_registration_CPPFLAGS = \

job_registration_LDADD = \
	$(top_builddir)/lib/libnorns.la


task_submission_SOURCES = \
	task_submission.c

task_submission_CFLAGS = \
	-std=gnu99 -Wall -Wextra

task_submission_CPPFLAGS = \
	-I$(top_srcdir)/include

task_submission_LDADD = \
	$(top_builddir)/lib/libnorns.la
+2 −2
Original line number Diff line number Diff line
@@ -31,14 +31,14 @@

void print_iotd(struct norns_iotd* iotdp){
    fprintf(stdout, "iotd -> struct nornds_iotd {\n");
    fprintf(stdout, "  ni_tid = %d;\n", iotdp->ni_tid);
    fprintf(stdout, "  io_taskid = %d;\n", iotdp->io_taskid);
    fprintf(stdout, "};\n");
}


int main() {
    struct norns_iotd iotd = {
        .ni_tid = 0,
        .io_taskid = 0,
    };

    fprintf(stdout, "calling norns_transfer(&iotd)\n");
+3 −3
Original line number Diff line number Diff line
@@ -74,7 +74,7 @@ int main(int argc, char* argv[]) {
        size_t n = strlen(str_mount);

        backends[i]->b_mount = strndup(str_mount, n);
        backends[i]->b_type = NORNS_LOCAL_NVML;
        backends[i]->b_type = NORNS_BACKEND_LOCAL_NVML;
        backends[i]->b_quota = 1024;
    }

@@ -93,8 +93,8 @@ int main(int argc, char* argv[]) {
    }

    // register processes with access to this job
    for(int i=0; i<1; ++i) {
        if((rv = norns_add_process(&cred, 42, (pid_t) i, (gid_t) i)) != NORNS_SUCCESS) {
    for(int i=0; i<5; ++i) {
        if((rv = norns_add_process(&cred, 42, (uid_t) i, (gid_t) i, (pid_t) i)) != NORNS_SUCCESS) {
            fprintf(stderr, "norns_add_process failed: %s\n", norns_strerror(rv));
        }
    }
+141 −0
Original line number Diff line number Diff line
/* * Copyright (C) 2017 Barcelona Supercomputing Center
 *                    Centro Nacional de Supercomputacion
 *
 * This file is part of the Data Scheduler, a daemon for tracking and managing
 * requests for asynchronous data transfer in a hierarchical storage environment.
 *
 * See AUTHORS file in the top level directory for information
 * regarding developers and contributors.
 *
 * The Data Scheduler is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * Data Scheduler is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * along with Data Scheduler.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <string.h>

#include <norns.h>

const char* ex_hosts[5] = {
    "node-00",
    "node-01",
    "node-02",
    "node-03",
    "node-04",
};

int main(int argc, char* argv[]) {

    (void) argc;
    (void) argv;
    struct norns_cred cred;

    // create job descriptor from example data
    // 1. fill in hostnames
    int num_hosts1 = 3;
    const char** hosts1 = NORNS_PLIST_ALLOC(const char*, num_hosts1);

    for(int i=0; i<num_hosts1; ++i) {
        hosts1[i] = ex_hosts[i];
    }

    int num_hosts2 = 2;
    const char** hosts2 = NORNS_PLIST_ALLOC(const char*, num_hosts2);

    for(int i=0; i<num_hosts2; ++i) {
        hosts2[i] = ex_hosts[num_hosts1+i];
    }

    // 2. declare which backends the job is authorized to use
    // and provide info on them
    int num_backends = 3;
    struct norns_backend** backends = NORNS_PLIST_ALLOC(struct norns_backend*, num_backends); 

    for(int i=0; i<num_backends; ++i) {

        backends[i] = NORNS_ALLOC(sizeof(struct norns_backend));

        char str_mount[50];

        snprintf(str_mount, sizeof(str_mount), "/mnt/a-%d", i);
        size_t n = strlen(str_mount);

        backends[i]->b_mount = strndup(str_mount, n);
        backends[i]->b_type = NORNS_BACKEND_LOCAL_NVML;
        backends[i]->b_quota = 1024;
    }

    struct norns_job job = {
        .jb_hosts = hosts1,
        .jb_nhosts = num_hosts1,
        .jb_backends = backends,
        .jb_nbackends = num_backends
    };

    int rv;

    // register a job with ID 42
    if((rv = norns_register_job(&cred, 42, &job)) != NORNS_SUCCESS) {
            fprintf(stderr, "ERROR: norns_register_job failed: %s\n", norns_strerror(rv));
    }

    // register processes with access to this job
    for(int i=0; i<5; ++i) {
        if((rv = norns_add_process(&cred, 42, (uid_t) i, (gid_t) i, (pid_t) i)) != NORNS_SUCCESS) {
            fprintf(stderr, "norns_add_process failed: %s\n", norns_strerror(rv));
        }
    }


    // submit a task
    struct norns_iotd task = {
        .io_taskid = 0,
        .io_optype = 28,
        .io_src = {
            .in_type = NORNS_BACKEND_LOCAL_NVML,
            .in_path = {
//                .p_hostname = "node42",
                .p_hostname = NULL,
                .p_datapath = "nvm://foobar.bin",
            },
        },
        .io_dst = {
            .out_type = NORNS_BACKEND_LUSTRE,
            .out_path = {
                .p_hostname = "node42",
                .p_datapath = "nvm://baz.bin",
            }
        }
    };

    if((rv = norns_transfer(&task)) != NORNS_SUCCESS) {
        fprintf(stderr, "ERROR: norns_transfer failed: %s\n", norns_strerror(rv));
    }
    else {
        fprintf(stdout, "Task submitted (ID: %d)\n", task.io_taskid);
    }


    // unregister the job
    if((rv = norns_unregister_job(&cred, 42)) != NORNS_SUCCESS) {
        fprintf(stderr, "norns_unregister_job failed: %s\n", norns_strerror(rv));
    }

    NORNS_PLIST_FREE(hosts1);
    NORNS_PLIST_FREE(hosts2);
    NORNS_PLIST_FREE(backends);

}
+88 −20
Original line number Diff line number Diff line
@@ -42,27 +42,95 @@ struct norns_cred {
    gid_t cr_gid;    /* GID of the process */
};

/* Data resource descriptor  */
struct norns_resource {
    const char* r_hostname;     /* hostname */
    const char* r_path;         /* path to "data" (i.e. file or directory) */
    uint32_t    r_type;         /* type of resource */
struct norns_membuf {
    void*  b_addr;      /* memory address */
    size_t b_size;      /* memory size */
};

struct norns_path {
    const char* p_hostname;     /* hostname (NULL if local) */
    const char* p_datapath;     /* path to "data" (i.e. file or directory) */
};

/* I/O task descriptor */
struct norns_iotd {
    uint32_t            ni_tid;     /* task identifier */
/* Input data resource descriptor  */
struct norns_data_in {

    // options:
    // - read from local nvm and write to lustre
    // - read from lustre and write to local nvm
    // - read from remote nvm and write to local nvm
    // - read from local nvm and write to remote nvm
    // - read from process memory and write to local nvm
    // - read from process memory and write to lustre 
    // - echofs: "read" from lustre into echofs
    // - echofs: "write" from echofs to lustre
    //
    //
    // - NEXTGenIO input resources: 
    // 1.   local nvm       nvm://path/to/dir/[file]                DAX-NVML
    // 2.   local tmpfs     tmpfs://path/to/dir/[file]              DAX-NVML
    // 3.   lustre          lustre://path/to/dir/[file]             POSIX
    // 4.   remote nvm      nvm@hostname://path/to/dir/[file]       DAX-NVML+RDMA/TCP
    // 5.   echofs          echofs://path/to/dir/[file]             CUSTOM
    // 6.   process memory  [pointer + size]                        MEMORY
    //
    // - NEXTGenIO output resources:
    // 1.   local nvm       nvm://path/to/dir/[file]                DAX-NVML
    // 2.   local tmpfs     tmpfs://path/to/dir/[file]              DAX-NVML
    // 3.   lustre (path)   lustre://path/to/dir/[file]             POSIX
    // 4.   remote nvm      nvm@hostname:://path/to/dir/[file]      DAX-NVML+RDMA/TCP
    // 5.   echofs          echofs://path/to/dir/[file]             CUSTOM

    uint32_t    in_type;         /* type of resource */
    union {
        struct norns_membuf __in_buffer;
        struct norns_path __in_path;
    } __in_location;

#define in_buffer __in_location.__in_buffer
#define in_path __in_location.__in_path
};

    struct norns_resource ni_src;   /* data source */
    struct norns_resource ni_dst;   /* data destination */
    uint32_t            ni_type;    /* operation to be performed */
    struct norns_cred*  ni_auth;   /* process credentials (NULL if unprivileged) */
/* Output data resource descriptor  */
struct norns_data_out {

    // options:
    // - read from local nvm and write to lustre
    // - read from lustre and write to local nvm
    // - read from remote nvm and write to local nvm
    // - read from local nvm and write to remote nvm
    // - read from process memory and write to local nvm
    // - read from process memory and write to lustre 
    // - echofs: "read" from lustre into echofs
    // - echofs: "write" from echofs to lustre
    //
    //
    // - input resources: 
    // 1.   local nvm       nvm://path/to/dir/[file]
    // 2.   lustre          lustre://path/to/dir/[file]
    // 3.   remote nvm      nvm@hostname://path/to/dir/[file]
    // 4.   echofs          echofs://path/to/dir/[file]
    // 5.   process memory  [pointer + size]
    //
    // - output resources:
    // 1.   local nvm (path)    nvm://foobar
    // 2.   lustre (path)       lustre://foobar
    // 3.   remote nvm          nvm@hostname:://foobar
    // 4.   echofs              echofs://foobar

    uint32_t    out_type;         /* type of resource */
    struct norns_path out_path;
};

    /* Internal members. */
    pid_t       __pid;      /* pid of the process that made the request */
    uint32_t    __jobid;    /* job id of the process that made the request (XXX Slurm dependent)*/

/* I/O task descriptor */
struct norns_iotd {
    uint32_t            io_taskid;     /* task identifier */
    uint32_t            io_optype;    /* operation to be performed */

    struct norns_data_in io_src;   /* data source */
    struct norns_data_out io_dst;   /* data destination */
//    struct norns_cred*  ni_auth;   /* process credentials (NULL if unprivileged) */
};


@@ -130,9 +198,9 @@ int norns_error(struct norns_iotd* iotdp) __THROW;
/* Storage backend descriptor */
struct norns_backend {
    int         b_type;
    const char* b_prefix; /* prefix ID for this backend (e.g. nvm01, tmpfs02, ...) */
    const char* b_mount; /* mount point */
    size_t      b_quota; /* backend capacity (in megabytes) allocated to the job */

    size_t      b_quota; /* backend capacity (in megabytes) allocated to the job for writing */
};

#define NORNS_ALLOC(size)       \
@@ -165,7 +233,7 @@ struct norns_backend {
struct norns_job {
    const char**            jb_hosts;  /* NULL-terminated list of hostnames participating in the job */
    size_t                  jb_nhosts; /* entries in hostname list */
    struct norns_backend**  jb_backends; /* NULL-terminated list of storage backends the job will use */
    struct norns_backend**  jb_backends; /* NULL-terminated list of storage backends the job is allowed to use */
    size_t                  jb_nbackends; /* entries in backend list */
};

@@ -184,10 +252,10 @@ int norns_update_job(struct norns_cred* auth, uint32_t jobid, struct norns_job*
int norns_unregister_job(struct norns_cred* auth, uint32_t jobid);

/* Add a process to a registered batch job */
int norns_add_process(struct norns_cred* auth, uint32_t jobid, pid_t pid, gid_t gid);
int norns_add_process(struct norns_cred* auth, uint32_t jobid, uid_t uid, gid_t gid, pid_t pid);

/* Remove a process from a registered batch job */
int norns_remove_process(struct norns_cred* auth, uint32_t jobid, pid_t pid, gid_t gid);
int norns_remove_process(struct norns_cred* auth, uint32_t jobid, uid_t uid, gid_t gid, pid_t pid);


char* norns_strerror(int errnum);
Loading