Commit 6152a5f2 authored by Ramon Nou's avatar Ramon Nou
Browse files

Stage-Out

parent 01ac95c7
Loading
Loading
Loading
Loading
Loading
+341 −66
Original line number Diff line number Diff line
@@ -38,6 +38,12 @@
#include "defaults.h"
#include "utils.h"

#include <dirent.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

/**
 * Slurm SPANK plugin to handle the ADMIRE adhoc storage CLI. Options are
 * forwarded to scord on srun, salloc and sbatch. See the struct spank_option
@@ -436,7 +442,16 @@ process_config(int ac, char** av, scord_plugin_config_t* cfg) {

    return 0;
}

ADM_server_t scord_server = NULL;
ADM_node_t* nodes = NULL;
ADM_job_resources_t job_resources = NULL;
ADM_adhoc_resources_t adhoc_resources = NULL;
ADM_adhoc_context_t adhoc_ctx = NULL;
ADM_adhoc_storage_t adhoc_storage = NULL;
ADM_job_requirements_t scord_reqs = NULL;
ADM_job_t scord_job = NULL;
ADM_transfer_t transfer = NULL;
char* adhoc_path = NULL;
static int
scord_register_job(spank_t sp, scord_plugin_config_t cfg,
                   scord_nodelist_t nodelist, uint32_t jobid) {
@@ -444,7 +459,7 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
    int rc = 0;
    int nnodes = 0;

    ADM_server_t scord_server = NULL;
    /* ADM_server_t scord_server = NULL;
     ADM_node_t* nodes = NULL;
     ADM_job_resources_t job_resources = NULL;
     ADM_adhoc_resources_t adhoc_resources = NULL;
@@ -453,7 +468,7 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
     ADM_job_requirements_t scord_reqs = NULL;
     ADM_job_t scord_job = NULL;
     ADM_transfer_t transfer = NULL;
    char* adhoc_path = NULL;
     char* adhoc_path = NULL; */

    /* First determine the node on which to launch scord-ctl (typically the
     * first node of the allocation)  */
@@ -586,7 +601,8 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
        case ADM_ADHOC_STORAGE_GEKKOFS:
            spank_setenv(sp, "ADHOC_TYPE", "gekkofs", 1);

            spank_setenv(sp, "LIBGKFS_HOSTS_FILE", "/tmp/gekkofs/gkfs_hosts.txt", 1);
            spank_setenv(sp, "LIBGKFS_HOSTS_FILE",
                         "/tmp/gekkofs/gkfs_hosts.txt", 1);
            break;
        case ADM_ADHOC_STORAGE_EXPAND:
            spank_setenv(sp, "ADHOC_TYPE", "expand", 1);
@@ -597,34 +613,38 @@ scord_register_job(spank_t sp, scord_plugin_config_t cfg,
        case ADM_ADHOC_STORAGE_HERCULES:
            spank_setenv(sp, "ADHOC_TYPE", "hercules", 1);
            break;
        
    }
    spank_setenv(sp, "ADHOC_PATH", adhoc_path, 1);


    if(input_datasets_count > 0) {
        // divide input_datasets into sources and targets
    ADM_dataset_t * sources = malloc((input_datasets_count) * sizeof(ADM_dataset_t));
    ADM_dataset_t * targets = malloc((input_datasets_count) * sizeof(ADM_dataset_t));;
        ADM_dataset_t* sources =
                malloc((input_datasets_count) * sizeof(ADM_dataset_t));
        ADM_dataset_t* targets =
                malloc((input_datasets_count) * sizeof(ADM_dataset_t));
        ;

        for(unsigned int i = 0; i < input_datasets_count; i++) {
            // ADM_dataset_route_list_t r_inputs;
            sources[i] = scord_reqs->r_inputs->l_routes[i].d_src;
            targets[i] = scord_reqs->r_inputs->l_routes[i].d_dst;
        }
        // Unfortunaly we have to sleep or cargo will not find the instance up.
        sleep(5);

    if (ADM_transfer_datasets(scord_server, scord_job, sources,
                                input_datasets_count,
                                targets,
                                input_datasets_count, 0, 0,
    ADM_MAPPING_ONE_TO_ONE, &transfer)!=
       ADM_SUCCESS) {
        if(ADM_transfer_datasets(
                   scord_server, scord_job, sources, input_datasets_count,
                   targets, input_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE,
                   &transfer, true) != ADM_SUCCESS) {
            slurm_error("%s: adhoc storage transfer failed", plugin_name);
            rc = -1;
            goto end;
        }
    }


end:
    if(adhoc_path) {
    /*   if(adhoc_path) {
           free(adhoc_path);
       }

@@ -655,7 +675,7 @@ end:
       if(scord_server) {
           ADM_server_destroy(scord_server);
       }

   */
    return rc;
}

@@ -821,3 +841,258 @@ cleanup:

    return ec;
}


void
remove_dir_content(const char* path) {
    struct dirent* de;
    char fname[300];
    DIR* dr = opendir(path);
    if(dr == NULL) {
        return;
    }
    while((de = readdir(dr)) != NULL) {
        int ret = -1;
        struct stat statbuf;
        sprintf(fname, "%s/%s", path, de->d_name);
        if(!strcmp(de->d_name, ".") || !strcmp(de->d_name, ".."))
            continue;
        if(!stat(fname, &statbuf)) {
            if(S_ISDIR(statbuf.st_mode)) {

                ret = unlinkat(dirfd(dr), fname, AT_REMOVEDIR);
                if(ret != 0) {
                    remove_dir_content(fname);
                    ret = unlinkat(dirfd(dr), fname, AT_REMOVEDIR);
                }
            } else {

                unlink(fname);
            }
        }
    }
    closedir(dr);
}


static int
scord_unregister_job(spank_t sp, scord_plugin_config_t cfg,
                     scord_nodelist_t nodelist, uint32_t jobid) {
    (void) sp;
    (void) jobid;
    int rc = 0;

    /* First determine the node on which to launch scord-ctl (typically the
     * first node of the allocation)  */
    ADM_node_t ctl_node = scord_nodelist_get_node(nodelist, 0);
    cfg.scordctl_info.addr = margo_address_create(
            cfg.scordctl_info.proto, ADM_node_get_hostname(ctl_node),
            cfg.scordctl_info.port);

    if(!cfg.scordctl_info.addr) {
        slurm_error("%s: failed to compute address scordctl server",
                    plugin_name);
        return -1;
    }

    /* The Cargo master will also typically reside on the first node of the
     * allocation */
    cfg.cargo_info.addr = margo_address_create(cfg.cargo_info.proto,
                                               ADM_node_get_hostname(ctl_node),
                                               cfg.cargo_info.port);

    slurm_debug("%s: %s: scord_info:", plugin_name, __func__);
    slurm_debug("%s: %s:   addr: \"%s\",", plugin_name, __func__,
                cfg.scord_info.addr);
    slurm_debug("%s: %s:   proto: \"%s\",", plugin_name, __func__,
                cfg.scord_info.proto);
    slurm_debug("%s: %s:   port: %d,", plugin_name, __func__,
                cfg.scord_info.port);

    slurm_debug("%s: %s: scordctl_info:", plugin_name, __func__);
    slurm_debug("%s: %s:   addr: \"%s\",", plugin_name, __func__,
                cfg.scordctl_info.addr);
    slurm_debug("%s: %s:   proto: \"%s\",", plugin_name, __func__,
                cfg.scordctl_info.proto);
    slurm_debug("%s: %s:   port: %d,", plugin_name, __func__,
                cfg.scordctl_info.port);
    slurm_debug("%s: %s: cargo_info:", plugin_name, __func__);
    slurm_debug("%s: %s:   addr: \"%s\",", plugin_name, __func__,
                cfg.cargo_info.addr);
    slurm_debug("%s: %s:   proto: \"%s\",", plugin_name, __func__,
                cfg.cargo_info.proto);
    slurm_debug("%s: %s:   port: %d,", plugin_name, __func__,
                cfg.cargo_info.port);


    // Step 1 : Stage-out

    // divide input_datasets into sources and targets
    if(output_datasets_count > 0) {
        ADM_dataset_t* sources =
                malloc((output_datasets_count) * sizeof(ADM_dataset_t));
        ADM_dataset_t* targets =
                malloc((output_datasets_count) * sizeof(ADM_dataset_t));
        ;

        for(unsigned int i = 0; i < output_datasets_count; i++) {
            // ADM_dataset_route_list_t r_inputs;
            sources[i] = scord_reqs->r_outputs->l_routes[i].d_src;
            targets[i] = scord_reqs->r_outputs->l_routes[i].d_dst;
        }


        if(ADM_transfer_datasets(
                   scord_server, scord_job, sources, output_datasets_count,
                   targets, output_datasets_count, 0, 0, ADM_MAPPING_ONE_TO_ONE,
                   &transfer, true) != ADM_SUCCESS) {
            slurm_error("%s: adhoc storage transfer failed", plugin_name);
            rc = -1;
            goto end;
        }
    }

    // remove_adhoc_storage

    ADM_remove_adhoc_storage(scord_server, adhoc_storage);
    // remove all the files (this should be done on all the nodes.. TODO)
    remove_dir_content(adhoc_path);

    // remove job
    ADM_remove_job(scord_server, scord_job);

end:
    if(adhoc_path) {
        free(adhoc_path);
    }

    if(scord_job) {
        ADM_job_destroy(scord_job);
    }

    if(scord_reqs) {
        ADM_job_requirements_destroy(scord_reqs);
    }

    if(adhoc_storage) {
        ADM_adhoc_storage_destroy(adhoc_storage);
    }

    if(adhoc_ctx) {
        ADM_adhoc_context_destroy(adhoc_ctx);
    }

    if(adhoc_resources) {
        ADM_adhoc_resources_destroy(adhoc_resources);
    }

    if(job_resources) {
        ADM_job_resources_destroy(job_resources);
    }

    if(scord_server) {
        ADM_server_destroy(scord_server);
    }

    return rc;
}


int
slurm_spank_exit(spank_t sp, int ac, char** av) {

    (void) sp;
    (void) ac;
    (void) av;


    spank_err_t rc = ESPANK_SUCCESS;

    // spank_context_t sctx = spank_context();


    //    slurm_debug("%s: %s() registering options", plugin_name, __func__);

    /* register adm/scord options */
    //    struct spank_option* opt = &spank_opts[0];
    //    while(opt->name) {
    //        rc = spank_option_register(sp, opt++);
    //    }


    /* No ADMIRE options were passed to the job, nothing to do here */
    if(!scord_flag) {
        return 0;
    }

    /* Get relative for the node executing id. Job registration is only done
     * by the node with ID 0 */

    uint32_t nodeid;

    if((rc = spank_get_item(sp, S_JOB_NODEID, &nodeid)) != ESPANK_SUCCESS) {
        slurm_error("%s: failed to get node id: %s", plugin_name,
                    spank_strerror(rc));
        return -1;
    }

    slurm_info("%s: %s: node id: %d", plugin_name, __func__, nodeid);

    if(nodeid != 0) {
        return 0;
    }

    scord_plugin_config_t cfg = default_cfg;

    if(process_config(ac, av, &cfg) != 0) {
        return -1;
    }

    /* get job id */
    uint32_t jobid;

    if((rc = spank_get_item(sp, S_JOB_ID, &jobid)) != ESPANK_SUCCESS) {
        slurm_info("%s: failed to get jobid: %s", plugin_name,
                   spank_strerror(rc));
        return -1;
    }

    slurm_info("%s: %s: job id: %d", plugin_name, __func__, jobid);

    /* list of job nodes */
    hostlist_t* hostlist = get_slurm_hostlist(sp);
    if(!hostlist) {
        slurm_info("%s: failed to retrieve hostlist", plugin_name);
        return -1;
    }

    char buf[256];
    slurm_hostlist_ranged_string(hostlist, sizeof(buf), buf);
    slurm_info("%s: %s: hostlist: %s", plugin_name, __func__, buf);

    scord_nodelist_t nodelist = scord_nodelist_create(hostlist);

    int ec;

    if(!nodelist) {
        slurm_info("%s: failed to create nodelist", plugin_name);
        ec = -1;
        goto cleanup;
    }


    // We get here, do stage-out and clean up

    scord_unregister_job(sp, cfg, nodelist, jobid);


cleanup:
    if(cfg.scordctl_info.addr) {
        free((void*) cfg.scordctl_info.addr);
    }

    scord_nodelist_destroy(nodelist);
    slurm_hostlist_destroy(hostlist);

    return ec;
}
 No newline at end of file
+1 −1
Original line number Diff line number Diff line
@@ -7,5 +7,5 @@ EnvironmentFile=%h/.config/cargo/%I.cfg
ExecStart=@CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/slurm/cargoctl start -s ${CARGO_ADDRESS} -H ${CARGO_HOSTS} -n ${CARGO_NUM_NODES}
ExecStop=@CMAKE_INSTALL_FULL_DATADIR@/@PROJECT_NAME@/slurm/cargoctl stop -s ${CARGO_ADDRESS}
Restart=no
PrivateTmp=true
PrivateTmp=false
NoNewPrivileges=true
+32 −2
Original line number Diff line number Diff line
@@ -227,8 +227,8 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
                      ADM_dataset_t sources[], size_t sources_len,
                      ADM_dataset_t targets[], size_t targets_len,
                      ADM_qos_limit_t limits[], size_t limits_len,
                      ADM_transfer_mapping_t mapping,
                      ADM_transfer_t* transfer) {
                      ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer,
                      bool wait = false) {

    const auto rv = scord::detail::transfer_datasets(
            scord::server{server}, scord::job{job},
@@ -241,6 +241,36 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
    }

    *transfer = static_cast<ADM_transfer_t>(rv.value());
    if(wait) {
        auto rv_wait = scord::detail::query_transfer(
                scord::server{server}, scord::job{job},
                scord::transfer{rv.value()});
        if(!rv_wait) {
            if(rv_wait.error().value() == scord::error_code::no_such_entity)
            {
                return ADM_SUCCESS;
            }
            else
                return rv_wait.error();
        }
        auto status = rv_wait.value().status();

        while(status == scord::transfer_state::type::running or
              status == scord::transfer_state::type::queued) {
            sleep(5);
            rv_wait = scord::detail::query_transfer(
                    scord::server{server}, scord::job{job},
                    scord::transfer{rv.value()});
            if(!rv_wait) {
                if(rv_wait.error().value()  == scord::error_code::no_such_entity) {
                    return ADM_SUCCESS;
                }
                else
                    return rv_wait.error();
            }
            status = rv_wait.value().status();
        }
    }

    return ADM_SUCCESS;
}
+1 −1
Original line number Diff line number Diff line
@@ -249,7 +249,7 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job,
                      ADM_dataset_t sources[], size_t sources_len,
                      ADM_dataset_t targets[], size_t targets_len,
                      ADM_qos_limit_t limits[], size_t limits_len,
                      ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer);
                      ADM_transfer_mapping_t mapping, ADM_transfer_t* transfer, bool wait);

/**
 * Sets the obtained bw for the transfer operation