Verified Commit cbe7df06 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

ADM_register_job.c: Fix asserts and cleanup

parent dab9e834
Loading
Loading
Loading
Loading
+135 −60
Original line number Diff line number Diff line
@@ -25,7 +25,6 @@
#include <stdlib.h>
#include <stdio.h>
#include <admire.h>
#include <assert.h>
#include "common.h"

#define NJOB_NODES   50
@@ -42,82 +41,158 @@ main(int argc, char* argv[]) {
        exit(EXIT_FAILURE);
    }

    int exit_status = EXIT_SUCCESS;
    ADM_server_t server = ADM_server_create("tcp", argv[1]);

    ADM_job_t job;
    ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES);
    assert(job_nodes);
    ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES);
    assert(adhoc_nodes);
    ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS);
    assert(inputs);
    ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS);
    assert(outputs);

    ADM_job_resources_t job_resources =
            ADM_job_resources_create(job_nodes, NJOB_NODES);
    assert(job_resources);

    ADM_adhoc_resources_t adhoc_resources =
            ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES);
    assert(adhoc_resources);

    ADM_adhoc_context_t ctx = ADM_adhoc_context_create(
            ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources,
    int exit_status = EXIT_FAILURE;
    ADM_return_t ret = ADM_SUCCESS;
    ADM_server_t server = NULL;

    // adhoc information
    const char* adhoc_name = "adhoc_storage_42";
    ADM_node_t* adhoc_nodes = NULL;
    ADM_adhoc_resources_t adhoc_resources = NULL;
    ADM_adhoc_context_t adhoc_ctx = NULL;
    ADM_storage_t adhoc_storage = NULL;

    // job information
    ADM_node_t* job_nodes = NULL;
    ADM_job_t job = NULL;
    ADM_job_resources_t job_resources = NULL;
    ADM_job_requirements_t reqs = NULL;
    uint64_t slurm_job_id = 42;
    ADM_dataset_t* inputs = NULL;
    ADM_dataset_t* outputs = NULL;

    // Let's prepare all the information required by the API calls.
    // ADM_register_job() often requires an adhoc storage to have been
    // registered onto the system, so let's prepare first the data required
    // to call ADM_register_adhoc_storage():

    // 1. the jobs required by the associated adhoc storage
    adhoc_nodes = prepare_nodes(NADHOC_NODES);

    if(adhoc_nodes == NULL) {
        fprintf(stderr, "Fatal error preparing adhoc nodes\n");
        goto cleanup;
    }

    // 2. the adhoc storage resources
    adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES);

    if(adhoc_resources == NULL) {
        fprintf(stderr, "Fatal error preparing adhoc resources\n");
        goto cleanup;
    }

    // 3. the adhoc storage execution context
    adhoc_ctx = ADM_adhoc_context_create(ADM_ADHOC_MODE_SEPARATE_NEW,
                                         ADM_ADHOC_ACCESS_RDWR, adhoc_resources,
                                         100, false);
    assert(ctx);

    const char* name = "adhoc_storage_42";
    if(adhoc_ctx == NULL) {
        fprintf(stderr, "Fatal error preparing adhoc context\n");
        goto cleanup;
    }


    // All the information required by the ADM_register_adhoc_storage() API is
    // now ready. Let's actually contact the server:

    ADM_storage_t adhoc_storage;
    ADM_return_t ret = ADM_register_adhoc_storage(
            server, name, ADM_STORAGE_GEKKOFS, ctx, &adhoc_storage);
    // 1. Find the server endpoint
    if((server = ADM_server_create("tcp", argv[1])) == NULL) {
        fprintf(stderr, "Fatal error creating server\n");
        goto cleanup;
    }

    if(ret != ADM_SUCCESS) {
        fprintf(stderr,
                "ADM_register_adhoc_storage() remote procedure not completed "
                "successfully: %s\n",
    // 2. Register the adhoc storage
    if(ADM_register_adhoc_storage(server, adhoc_name, ADM_STORAGE_GEKKOFS,
                                  adhoc_ctx, &adhoc_storage) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        exit_status = EXIT_FAILURE;
        goto cleanup;
    }

    ADM_job_requirements_t reqs = ADM_job_requirements_create(
            inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage);
    assert(reqs);

    uint64_t slurm_job_id = 42;
    ret = ADM_register_job(server, job_resources, reqs, slurm_job_id, &job);
    // Now that we have an existing adhoc storage registered into the
    // system, let's prepare all the information for actually registering the
    // job with ADM_register_job():

    if(ret != ADM_SUCCESS) {
        fprintf(stderr,
                "ADM_register_job() remote procedure not completed "
                "successfully: %s\n",
                ADM_strerror(ret));
        exit_status = EXIT_FAILURE;
    // 1. the job's resources
    job_nodes = prepare_nodes(NJOB_NODES);

    if(job_nodes == NULL) {
        fprintf(stderr, "Fatal error preparing job nodes\n");
        goto cleanup;
    }

    fprintf(stdout, "ADM_register_job() remote procedure completed "
                    "successfully\n");
    job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES);

cleanup:
    for(int i = 0; i < NINPUTS; ++i) {
        ADM_dataset_destroy(inputs[i]);
    if(job_resources == NULL) {
        fprintf(stderr, "Fatal error preparing job resources\n");
        goto cleanup;
    }

    for(int i = 0; i < NOUTPUTS; ++i) {
        ADM_dataset_destroy(outputs[i]);
    // 2. the job's requirements
    inputs = prepare_datasets("input-dataset-%d", NINPUTS);

    if(inputs == NULL) {
        fprintf(stderr, "Fatal error preparing input datasets\n");
        goto cleanup;
    }

    ADM_storage_destroy(adhoc_storage);
    outputs = prepare_datasets("output-dataset-%d", NOUTPUTS);

    ADM_adhoc_context_destroy(ctx);
    if(outputs == NULL) {
        fprintf(stderr, "Fatal error preparing output datasets\n");
        goto cleanup;
    }

    ADM_job_requirements_destroy(reqs);
    if((reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS,
                                           adhoc_storage)) == NULL) {
        fprintf(stderr, "ADM_job_requirements_create() failed");
        goto cleanup;
    }


    // All the information required by the ADM_register_job() API is now ready.
    // Let's actually contact the server:
    if(ADM_register_job(server, job_resources, reqs, slurm_job_id, &job) !=
       ADM_SUCCESS) {
        fprintf(stderr, "ADM_register_job() failed: %s\n", ADM_strerror(ret));
        goto cleanup;
    }

    // At this point, the job can execute...
    exit_status = EXIT_SUCCESS;

    ADM_remove_job(server, job);
    // When the job finishes, it is necessary to notify the server
    if((ret = ADM_remove_job(server, job)) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_remove_job() failed: %s\n", ADM_strerror(ret));
        job = NULL;
        exit_status = EXIT_FAILURE;
        // intentionally fall through...
    }

    // It is also nice to remove the adhoc storage instance once it's no
    // longer needed
    if((ret = ADM_remove_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) {
        fprintf(stderr, "ADM_remove_adhoc_storage() failed: %s\n",
                ADM_strerror(ret));
        adhoc_storage = NULL;
        exit_status = EXIT_FAILURE;
        // intentionally fall through...
    }

cleanup:
    ADM_server_destroy(server);

    ADM_job_requirements_destroy(reqs);
    destroy_datasets(outputs, NOUTPUTS);
    destroy_datasets(inputs, NINPUTS);
    ADM_job_resources_destroy(job_resources);
    destroy_nodes(job_nodes, NJOB_NODES);

    ADM_adhoc_context_destroy(adhoc_ctx);
    ADM_adhoc_resources_destroy(adhoc_resources);
    destroy_nodes(adhoc_nodes, NADHOC_NODES);

    exit(exit_status);
}
+24 −0
Original line number Diff line number Diff line
@@ -27,6 +27,22 @@ prepare_nodes(size_t n) {
    return nodes;
}

void
destroy_nodes(ADM_node_t nodes[], size_t n) {

    if(!nodes) {
        return;
    }

    for(size_t i = 0; i < n; ++i) {
        if(nodes[i]) {
            ADM_node_destroy(nodes[i]);
        }
    }

    free(nodes);
}

ADM_dataset_t*
prepare_datasets(const char* pattern, size_t n) {

@@ -52,6 +68,10 @@ prepare_datasets(const char* pattern, size_t n) {
void
destroy_datasets(ADM_dataset_t datasets[], size_t n) {

    if(!datasets) {
        return;
    }

    for(size_t i = 0; i < n; ++i) {
        if(datasets[i]) {
            ADM_dataset_destroy(datasets[i]);
@@ -83,6 +103,10 @@ prepare_qos_limits(size_t n) {
void
destroy_qos_limits(ADM_qos_limit_t* limits, size_t n) {

    if(!limits) {
        return;
    }

    for(size_t i = 0; i < n; ++i) {
        if(limits[i]) {
            ADM_qos_limit_destroy(limits[i]);
+3 −0
Original line number Diff line number Diff line
@@ -6,6 +6,9 @@
ADM_node_t*
prepare_nodes(size_t n);

void
destroy_nodes(ADM_node_t nodes[], size_t n);

ADM_dataset_t*
prepare_datasets(const char* pattern, size_t n);