Commit 5ad4dcd1 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

plugins/slurm: Refactor scord_register_job()

Also, move job registration into `scord` to `slurm_spank_user_init` so
that it works both in `srun` and `sbatch`/`salloc`.
parent 04c0017e
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@ configure_file(defaults.h.in defaults.h @ONLY)

target_sources(
  slurm-plugin PRIVATE slurmadmcli.c ${CMAKE_CURRENT_BINARY_DIR}/defaults.h
                       utils.c utils.h
)

target_include_directories(
+170 −87
Original line number Diff line number Diff line
@@ -33,6 +33,7 @@

#include <scord/scord.h>
#include "defaults.h"
#include "utils.h"

/**
 * Slurm SPANK plugin to handle the ADMIRE adhoc storage CLI. Options are
@@ -263,119 +264,161 @@ process_config(int ac, char** av, scord_plugin_config_t* cfg) {
}

static int
scord_register_job(const char* scord_proto, const char* scord_addr,
                   const char* nodelist, uint32_t jobid) {
    int rc = 0;
scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist,
                   uint32_t jobid) {

    ADM_server_t scord_server;
    scord_server = ADM_server_create(scord_proto, scord_addr);
    if(!scord_server) {
        slurm_error("slurmadmcli: scord server creation failed");
        rc = -1;
        goto end;
    int rc = 0;
    int nnodes = 0;

    ADM_server_t scord_server = NULL;
    ADM_node_t* nodes = NULL;
    ADM_job_resources_t job_resources = NULL;
    ADM_adhoc_resources_t adhoc_resources = NULL;
    ADM_adhoc_context_t adhoc_ctx = NULL;
    ADM_adhoc_storage_t adhoc_storage = NULL;
    ADM_job_requirements_t scord_reqs = NULL;
    ADM_job_t scord_job = NULL;
    char* adhoc_path = NULL;

    /* First determine the node on which to launch scord-ctl (typically the
     * first node of the allocation)  */
    ADM_node_t ctl_node = scord_nodelist_get_node(nodelist, 0);
    cfg.scordctl_info.addr = margo_address_create(
            cfg.scordctl_info.proto, ADM_node_get_hostname(ctl_node),
            cfg.scordctl_info.port);

    if(!cfg.scordctl_info.addr) {
        slurm_error("%s: failed to compute address scordctl server",
                    plugin_name);
        return -1;
    }

    /* list of job nodes */
    hostlist_t hl = slurm_hostlist_create(nodelist);
    if(!hl) {
        slurm_error("slurmadmcli: slurm_hostlist creation failed");
    slurm_debug("%s: %s: scord_info:", plugin_name, __func__);
    slurm_debug("%s: %s:   addr: \"%s\",", plugin_name, __func__,
                cfg.scord_info.addr);
    slurm_debug("%s: %s:   proto: \"%s\",", plugin_name, __func__,
                cfg.scord_info.proto);
    slurm_debug("%s: %s:   port: %d,", plugin_name, __func__,
                cfg.scord_info.port);

    slurm_debug("%s: %s: scordctl_info:", plugin_name, __func__);
    slurm_debug("%s: %s:   addr: \"%s\",", plugin_name, __func__,
                cfg.scordctl_info.addr);
    slurm_debug("%s: %s:   proto: \"%s\",", plugin_name, __func__,
                cfg.scordctl_info.proto);
    slurm_debug("%s: %s:   port: %d,", plugin_name, __func__,
                cfg.scordctl_info.port);

    /* Register the job with the scord server */
    scord_server = ADM_server_create(cfg.scord_info.proto, cfg.scord_info.addr);
    if(!scord_server) {
        slurm_error("%s: scord server creation failed", plugin_name);
        rc = -1;
        goto end;
    }

    int nnodes = slurm_hostlist_count(hl);
    nnodes = scord_nodelist_get_nodecount(nodelist);
    if(nnodes <= 0) {
        slurm_error("slurmadmcli: wrong slurm_hostlist count");
        slurm_error("%s: wrong scord_nodelist count", plugin_name);
        rc = -1;
        goto end;
    }

    ADM_node_t* nodes = reallocarray(NULL, nnodes, sizeof(ADM_node_t));
    nodes = scord_nodelist_get_nodes(nodelist);
    if(!nodes) {
        slurm_error("slurmadmcli: out of memory");
        slurm_error("%s: wrong scord_nodelist_nodes", plugin_name);
        rc = -1;
        goto end;
    }

    size_t i = 0;
    char* nodename;
    while((nodename = slurm_hostlist_shift(hl))) {
        nodes[i] = ADM_node_create(nodename, ADM_NODE_REGULAR);
        if(!nodes[i]) {
            slurm_error("slurmadmcli: scord node creation failed");
            rc = -1;
            goto end;
        }
        i++;
    }

    ADM_job_resources_t job_resources;
    job_resources = ADM_job_resources_create(nodes, nnodes);
    if(!job_resources) {
        slurm_error("slurmadmcli: job_resources creation failed");
        slurm_error("%s: job_resources creation failed", plugin_name);
        rc = -1;
        goto end;
    }

    /* take the ADHOC_NNODES first nodes for the adhoc */
    ADM_adhoc_resources_t adhoc_resources;
    adhoc_resources = ADM_adhoc_resources_create(
            nodes, adhoc_nnodes < nnodes ? adhoc_nnodes : nnodes);
    if(!adhoc_resources) {
        slurm_error("slurmadmcli: adhoc_resources creation failed");
        slurm_error("%s: adhoc_resources creation failed", plugin_name);
        rc = -1;
        goto end;
    }

    ADM_adhoc_context_t adhoc_ctx;
    adhoc_ctx = ADM_adhoc_context_create(
            NULL, adhoc_mode, ADM_ADHOC_ACCESS_RDWR, adhoc_walltime, false);
    adhoc_ctx = ADM_adhoc_context_create(cfg.scordctl_info.addr, adhoc_mode,
                                         ADM_ADHOC_ACCESS_RDWR, adhoc_walltime,
                                         false);
    if(!adhoc_ctx) {
        slurm_error("slurmadmcli: adhoc_context creation failed");
        slurm_error("%s: adhoc_context creation failed", plugin_name);
        rc = -1;
        goto end;
    }

    ADM_adhoc_storage_t adhoc_storage;
    if(ADM_register_adhoc_storage(
               scord_server, "mystorage", ADM_ADHOC_STORAGE_GEKKOFS, adhoc_ctx,
               adhoc_resources, &adhoc_storage) != ADM_SUCCESS) {
        slurm_error("slurmadmcli: adhoc_storage registration failed");
        slurm_error("%s: adhoc_storage registration failed", plugin_name);
        rc = -1;
        goto end;
    }

    /* no inputs or outputs */
    ADM_job_requirements_t scord_reqs;
    scord_reqs = ADM_job_requirements_create(NULL, 0, NULL, 0, adhoc_storage);
    if(!scord_reqs) {
        slurm_error("slurmadmcli: scord job_requirements creation");
        slurm_error("%s: scord job_requirements creation", plugin_name);
        rc = -1;
        goto end;
    }

    ADM_job_t scord_job;
    if(ADM_register_job(scord_server, job_resources, scord_reqs, jobid,
                        &scord_job) != ADM_SUCCESS) {
        slurm_error("slurmadmcli: scord job registration failed");
        slurm_error("%s: scord job registration failed", plugin_name);
        rc = -1;
        goto end;
    }

    if(ADM_deploy_adhoc_storage(scord_server, adhoc_storage, NULL) !=
    if(ADM_deploy_adhoc_storage(scord_server, adhoc_storage, &adhoc_path) !=
       ADM_SUCCESS) {
        slurm_error("slurmadmcli: adhoc storage deployment failed");
        slurm_error("%s: adhoc storage deployment failed", plugin_name);
        rc = -1;
        goto end;
    }

end:
    slurm_hostlist_destroy(hl);
    ADM_adhoc_resources_destroy(adhoc_resources);
    if(adhoc_path) {
        free(adhoc_path);
    }

    if(scord_job) {
        ADM_remove_job(scord_server, scord_job);
    }

    if(scord_reqs) {
        ADM_job_requirements_destroy(scord_reqs);
    }

    if(adhoc_storage) {
        ADM_adhoc_storage_destroy(adhoc_storage);
    }

    if(adhoc_ctx) {
        ADM_adhoc_context_destroy(adhoc_ctx);
    }

    if(adhoc_resources) {
        ADM_adhoc_resources_destroy(adhoc_resources);
    }

    if(job_resources) {
        ADM_job_resources_destroy(job_resources);
    }

    if(scord_server) {
        ADM_server_destroy(scord_server);
    }

    return rc;
}

@@ -398,17 +441,55 @@ slurm_spank_init(spank_t sp, int ac, char** av) {
    return rc == ESPANK_SUCCESS ? 0 : -1;
}


/**
 * Called in local context only after all options have been processed.
 * This is called after the job ID and step IDs are available. This happens in
 * `srun` after the allocation is made, but before tasks are launched.
 *
 * ┌-----------------------┐
 * | Command | Context     |
 * ├---------|-------------┤
 * | srun    | S_CTX_LOCAL |
 * └-----------------------┘
 *
 * Available in the following contexts:
 *  S_CTX_LOCAL (srun)
 */
int
slurm_spank_local_user_init(spank_t sp, int ac, char** av) {

    (void) sp;
    (void) ac;
    (void) av;

    if(!scord_flag)
    return 0;
}

/**
 * Called after privileges are temporarily dropped. (remote context only)
 *
 * ┌------------------------┐
 * | Command | Context      |
 * ├---------|--------------┤
 * | srun    | S_CTX_REMOTE |
 * | salloc  | S_CTX_REMOTE |
 * | sbatch  | S_CTX_REMOTE |
 * └------------------------┘
 *
 * Available in the following contexts:
 *  S_CTX_REMOTE (slurmstepd)
 */
int
slurm_spank_user_init(spank_t sp, int ac, char** av) {

    (void) sp;
    (void) ac;
    (void) av;

    if(!scord_flag) {
        return 0;
    }

    const char* scord_addr = SCORD_SERVER_DEFAULT;
    const char* scord_proto = SCORD_PROTO_DEFAULT;
    const char* scordctl_bin = SCORDCTL_PROG_DEFAULT;
    scord_plugin_config_t cfg = default_cfg;

    if(process_config(ac, av, &cfg) != 0) {
@@ -418,47 +499,49 @@ slurm_spank_local_user_init(spank_t sp, int ac, char** av) {
    /* get job id */
    spank_err_t rc;
    uint32_t jobid;
    char sjobid[INT32_STR_LEN];

    if((rc = spank_get_item(sp, S_JOB_ID, &jobid)) != ESPANK_SUCCESS) {
        slurm_error("slurmadmcli: failed to get jobid: %s", spank_strerror(rc));
        slurm_error("%s: failed to get jobid: %s", plugin_name,
                    spank_strerror(rc));
        return -1;
    }
    if(snprintf(sjobid, INT32_STR_LEN, "%" PRIu32, jobid) < 0) {
        slurm_error("slurmadmcli: failed to convert jobid");

    slurm_debug("%s: %s: job id: %d", plugin_name, __func__, jobid);

    /* list of job nodes */
    hostlist_t hostlist = get_slurm_hostlist(sp);
    if(!hostlist) {
        slurm_error("%s: failed to retrieve hostlist", plugin_name);
        return -1;
    }

    /* get list of nodes. /!\ at this point env SLURM_NODELIST is
       set, but not SLURM_JOB_NODELIST! */
    const char* nodelist = getenv("SLURM_NODELIST");
    char buf[256];
    slurm_hostlist_ranged_string(hostlist, sizeof(buf), buf);
    slurm_debug("%s: %s: hostlist: %s", plugin_name, __func__, buf);

    scord_nodelist_t nodelist = scord_nodelist_create(hostlist);

    int ec;

    if(!nodelist) {
        slurm_error("slurmadmcli: failed to get node list");
        return -1;
        slurm_error("%s: failed to create nodelist", plugin_name);
        ec = -1;
        goto cleanup;
    }

    if((ec = scord_register_job(cfg, nodelist, jobid)) != 0) {
        slurm_error("%s: failed to register job with scord", plugin_name);
        ec = -1;
        goto cleanup;
    }

    /* launch one scord-ctl task on one node in the allocation, let Slurm decide
     * which */
    pid_t pid;
    if((pid = fork()) < 0) {
        slurm_error("slurmadmcli: failed to start scord-ctl: %s",
                    strerror(errno));
        return -1;
    } else if(pid == 0) {
        char* argv[] = {"srun",
                        "-N1",
                        "-n1",
                        "--overlap",
                        "--cpu-bind=none",
                        "--jobid",
                        sjobid,
                        (char* const) scordctl_bin,
                        NULL};
        execvp(argv[0], argv);
        slurm_error("slurmadmcli: failed to srun scord-ctl: %s",
                    strerror(errno));
        return 0;
cleanup:
    if(cfg.scordctl_info.addr) {
        free((void*) cfg.scordctl_info.addr);
    }

    return scord_register_job(scord_proto, scord_addr, nodelist, jobid);
    scord_nodelist_destroy(nodelist);
    slurm_hostlist_destroy(hostlist);

    return ec;
}

plugins/slurm/utils.c

0 → 100644
+246 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Inria, France.
 * Copyright 2023, Barcelona Supercomputing Center (BSC), Spain.
 * All rights reserved.
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of scord.
 *
 * scord is free software: you can redistribute it and/or modify
 * it under the terms of the Lesser GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * scord is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the Lesser GNU General Public License
 * along with scord.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 *****************************************************************************/

#include <string.h>
#include <slurm/slurm.h>
#include <netdb.h>
#include <arpa/inet.h>
#include "utils.h"

extern const char plugin_name[];

hostlist_t
get_slurm_hostlist(spank_t sp) {

    /* get list of nodes. /!\ at this point env SLURM_NODELIST is
       set, but not SLURM_JOB_NODELIST! */

    char* nodelist = NULL;

    spank_context_t sctx = spank_context();

    if(sctx != S_CTX_LOCAL && sctx != S_CTX_ALLOCATOR && sctx != S_CTX_REMOTE) {
        return NULL;
    }

    if(sctx == S_CTX_LOCAL || sctx == S_CTX_ALLOCATOR) {
        nodelist = getenv("SLURM_NODELIST");

        if(!nodelist) {
            slurm_error("%s: failed to get SLURM_NODELIST", plugin_name);
            return NULL;
        }
    } else {

        spank_err_t ec = ESPANK_SUCCESS;
        int size = 256;
        char* buffer = malloc(sizeof(char) * size);

        ec = spank_getenv(sp, "SLURM_NODELIST", buffer, size);

        if(ec != ESPANK_SUCCESS) {
            slurm_error("%s: failed to get SLURM_NODELIST: %s", plugin_name,
                        spank_strerror(ec));
            return NULL;
        }

        nodelist = buffer;
    }

    slurm_debug("%s: SLURM_NODELIST=%s", plugin_name, nodelist);

    hostlist_t hl = NULL;
    hl = slurm_hostlist_create(nodelist);

    if(!hl) {
        slurm_error("%s: slurm_hostlist_create() failed", plugin_name);
        return NULL;
    }

    return hl;
}

scord_nodelist_t
scord_nodelist_create(hostlist_t hostlist) {

    ADM_node_t* nodes = NULL;
    char* host = NULL;

    /* get number of nodes */
    int n = slurm_hostlist_count(hostlist);
    if(n <= 0) {
        slurm_error("%s: slurm_hostlist_count() failed", plugin_name);
        goto error;
    }

    /* allocate array of ADM_node_t */
    nodes = calloc(n, sizeof(ADM_node_t));
    if(!nodes) {
        slurm_error("%s: calloc() failed", plugin_name);
        goto error;
    }

    /* fill array of ADM_node_t */
    for(int i = 0; i < n; i++) {
        host = slurm_hostlist_shift(hostlist);
        if(!host) {
            slurm_error("%s: slurm_hostlist_shift() failed", plugin_name);
            goto error;
        }

        nodes[i] = ADM_node_create(host, ADM_NODE_REGULAR);

        if(!nodes[i]) {
            slurm_error("%s: ADM_node_create() failed", plugin_name);
            goto error;
        }
    }

    scord_nodelist_t nodelist = calloc(1, sizeof(struct scord_nodelist));

    if(!nodelist) {
        slurm_error("%s: calloc() failed", plugin_name);
        goto error;
    }

    nodelist->nodes = nodes;
    nodelist->nnodes = n;

    return nodelist;

error:
    if(nodes) {
        for(int i = 0; i < n; i++) {
            if(nodes[i]) {
                ADM_node_destroy(nodes[i]);
            }
        }
        free(nodes);
    }

    return NULL;
}

int
scord_nodelist_get_nodecount(scord_nodelist_t nodelist) {
    return nodelist ? (int) nodelist->nnodes : -1;
}

ADM_node_t*
scord_nodelist_get_nodes(scord_nodelist_t nodelist) {
    if(!nodelist) {
        return NULL;
    }
    return nodelist->nodes;
}

ADM_node_t
scord_nodelist_get_node(scord_nodelist_t nodelist, int index) {
    if(!nodelist || index < 0 || index >= nodelist->nnodes) {
        return NULL;
    }
    return nodelist->nodes[index];
}

void
scord_nodelist_destroy(scord_nodelist_t nodelist) {
    if(nodelist) {
        if(nodelist->nodes) {
            for(ssize_t i = 0; i < nodelist->nnodes; i++) {
                if(nodelist->nodes[i]) {
                    ADM_node_destroy(nodelist->nodes[i]);
                }
            }
            free(nodelist->nodes);
        }
        free(nodelist);
    }
}

int
resolve_host(const char* hostname, char* buffer) {

    struct addrinfo hints, *result;
    int rv;

    memset(&hints, 0, sizeof(hints));
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_DGRAM;

    if((rv = getaddrinfo(hostname, NULL, &hints, &result)) != 0) {
        return rv;
    }

    // we only return the first AF_INET address
    for(struct addrinfo* rp = result; rp != NULL; rp = rp->ai_next) {
        switch(rp->ai_family) {
            case AF_INET:
                inet_ntop(AF_INET,
                          &((struct sockaddr_in*) rp->ai_addr)->sin_addr,
                          buffer, INET_ADDRSTRLEN);
                freeaddrinfo(result);
                return 0;

            default:
                continue;
        }
    }

    freeaddrinfo(result);
    return EAI_NONAME;
}

const char*
margo_address_create(const char* protocol, const char* hostname, int port) {

    const char sep[] = "://";

    if(!protocol) {
        return strndup(hostname, strlen(hostname));
    }

    if(!hostname) {
        return NULL;
    }

    int rv;
    char buffer[INET_ADDRSTRLEN];
    if((rv = resolve_host(hostname, buffer)) != 0) {
        slurm_error("%s: resolve_host() failed: %s", plugin_name,
                    gai_strerror(rv));
        return NULL;
    }

    size_t n = snprintf(NULL, 0, "%s%s%s:%d", protocol, sep, buffer, port);
    char* addr = malloc(n + 1);

    if(!addr) {
        return NULL;
    }

    snprintf(addr, n + 1, "%s%s%s:%d", protocol, sep, buffer, port);
    return addr;
}

plugins/slurm/utils.h

0 → 100644
+59 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2022-2023, Inria, France.
 * Copyright 2023, Barcelona Supercomputing Center (BSC), Spain.
 * All rights reserved.
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of scord.
 *
 * scord is free software: you can redistribute it and/or modify
 * it under the terms of the Lesser GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * scord is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the Lesser GNU General Public License
 * along with scord.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: LGPL-3.0-or-later
 *****************************************************************************/

#ifndef SCORD_SLURM_PLUGIN_UTILS_H
#define SCORD_SLURM_PLUGIN_UTILS_H

#include <slurm/spank.h>
#include <scord/types.h>

hostlist_t
get_slurm_hostlist(spank_t sp);

typedef struct scord_nodelist {
    ADM_node_t* nodes;
    ssize_t nnodes;
}* scord_nodelist_t;

scord_nodelist_t
scord_nodelist_create(hostlist_t hostlist);

int
scord_nodelist_get_nodecount(scord_nodelist_t nodelist);

ADM_node_t*
scord_nodelist_get_nodes(scord_nodelist_t nodelist);

ADM_node_t
scord_nodelist_get_node(scord_nodelist_t nodelist, int index);

void
scord_nodelist_destroy(scord_nodelist_t nodelist);

const char*
margo_address_create(const char* protocol, const char* hostname, int port);

#endif // SCORD_SLURM_PLUGIN_UTILS_H