Verified Commit 528c6ab1 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Add ADM_dataset_list_t and related functions

parent 49b7a3c8
Loading
Loading
Loading
Loading
+69 −0
Original line number Diff line number Diff line
@@ -24,3 +24,72 @@

#include "rpc_types.h"

hg_return_t
hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) {
    hg_return_t ret = HG_SUCCESS;
    ADM_dataset_list_t* list = (ADM_dataset_list_t*) data;
    ADM_dataset_list_t tmp = NULL;

    hg_size_t length = 0;

    switch(hg_proc_get_op(proc)) {

        case HG_ENCODE:
            tmp = *list;
            // write the length of the list
            length = tmp->l_length;
            ret = hg_proc_hg_size_t(proc, &tmp->l_length);

            if(ret != HG_SUCCESS) {
                break;
            }

            // write the list
            for(size_t i = 0; i < length; ++i) {
                ret = hg_proc_adm_dataset(proc, &tmp->l_datasets[i]);

                if(ret != HG_SUCCESS) {
                    break;
                }
            }
            break;

        case HG_DECODE: {

            // find out the length of the list
            ret = hg_proc_hg_size_t(proc, &length);

            if(ret != HG_SUCCESS) {
                break;
            }

            // loop and create list elements
            tmp = (ADM_dataset_list_t) calloc(1, sizeof(struct adm_dataset_list));
            tmp->l_length = length;
            tmp->l_datasets =
                    (adm_dataset*) calloc(length, sizeof(adm_dataset));

            for(size_t i = 0; i < length; ++i) {
                ret = hg_proc_adm_dataset(proc, &tmp->l_datasets[i]);

                if(ret != HG_SUCCESS) {
                    break;
                }
            }

            // return the newly-created list
            *list = tmp;

            break;
        }

        case HG_FREE:
            tmp = *list;
            free(tmp->l_datasets);
            free(tmp);
            ret = HG_SUCCESS;
            break;
    }

    return ret;
}
+23 −9
Original line number Diff line number Diff line
@@ -196,21 +196,35 @@ MERCURY_GEN_STRUCT_PROC(
);
// clang-format on

struct adm_dataset_list {
    /** An array of datasets */
    adm_dataset* l_datasets;
    /** The length of the array */
    size_t l_length;
};

hg_return_t
hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data);


/** The I/O requirements for a job */
typedef struct adm_job_requirements {
    /** An array of input datasets */
    ADM_dataset_t* r_inputs;
    /** The number of datasets in r_inputs */
    size_t r_num_inputs;
    /** A list of output datasets */
    ADM_dataset_t* r_outputs;
    /** The number of datasets in r_outputs */
    size_t r_num_outputs;
    ADM_dataset_list_t r_inputs;
    /** An array of output datasets */
    ADM_dataset_list_t r_outputs;
    /** An optional definition for a specific adhoc storage instance */
    ADM_adhoc_context_t r_adhoc_ctx;
    adm_adhoc_context* r_adhoc_ctx;
} adm_job_requirements;

// TODO: MERCURY_GEN_STRUCT_PROC
// clang-format off
MERCURY_GEN_STRUCT_PROC(
    adm_job_requirements,
        ((ADM_dataset_list_t) (r_inputs))
        ((ADM_dataset_list_t) (r_outputs))
        ((adm_adhoc_context) (r_adhoc_ctx))
);
// clang-format on


/// ADM_register_job
+26 −0
Original line number Diff line number Diff line
@@ -88,6 +88,9 @@ typedef struct adm_dataset* ADM_dataset_t;
/** Information about a dataset */
typedef struct adm_dataset_info* ADM_dataset_info_t;

/** A list of datasets */
typedef struct adm_dataset_list* ADM_dataset_list_t;


/* ----------------------------------------------------- */
/*              Storage tiers                            */
@@ -319,6 +322,29 @@ ADM_dataset_info_create();
ADM_return_t
ADM_dataset_info_destroy(ADM_dataset_info_t dataset_info);

/**
 * Create a dataset list from an array of ADM_DATASETs and its
 * length.
 *
 * @remark Dataset lists need to be freed by calling ADM_dataset_list_destroy().
 *
 * @param[in] datasets The array of datasets.
 * @param[in] len The length of the array.
 * @return A valid ADM_dataset_list_t if successful or NULL in case of
 * failure.
 */
ADM_dataset_list_t
ADM_dataset_list_create(ADM_dataset_t datasets[], size_t len);

/**
 * Destroy a dataset list created by ADM_dataset_list_create().
 *
 * @param[in] list A valid ADM_dataset_list_t
 * @return ADM_SUCCESS or corresponding ADM error code
 */
ADM_return_t
ADM_dataset_list_destroy(ADM_dataset_list_t list);


/* ----------------------------------------------------- */
/*              Storage tiers                            */
+158 −16
Original line number Diff line number Diff line
@@ -103,18 +103,42 @@ ADM_dataset_t
ADM_dataset_create(const char* id) {

    struct adm_dataset* adm_dataset =
            (struct adm_dataset*) malloc(sizeof(struct adm_dataset));
            (struct adm_dataset*) calloc(1, sizeof(struct adm_dataset));

    if(!adm_dataset) {
        LOGGER_ERROR("Could not allocate ADM_dataset_t")
        return NULL;
    }

    adm_dataset->d_id = id;
    if(id) {
        size_t n = strlen(id);
        adm_dataset->d_id = (const char*) calloc(n + 1, sizeof(char));
        strncpy((char*) adm_dataset->d_id, id, n);
    }

    return adm_dataset;
}

ADM_dataset_t
ADM_dataset_copy(ADM_dataset_t dst, const ADM_dataset_t src) {

    if(!src || !dst) {
        return NULL;
    }

    // copy all primitive types
    *dst = *src;

    // duplicate copy any pointer types
    if(src->d_id) {
        size_t n = strlen(src->d_id);
        dst->d_id = (const char*) calloc(n + 1, sizeof(char));
        strncpy((char*) dst->d_id, src->d_id, n);
    }

    return dst;
}

ADM_return_t
ADM_dataset_destroy(ADM_dataset_t dataset) {
    ADM_return_t ret = ADM_SUCCESS;
@@ -124,6 +148,10 @@ ADM_dataset_destroy(ADM_dataset_t dataset) {
        return ADM_EBADARGS;
    }

    if(dataset->d_id) {
        free((void*) dataset->d_id);
    }

    free(dataset);
    return ret;
}
@@ -233,6 +261,74 @@ ADM_dataset_info_destroy(ADM_dataset_info_t dataset_info) {
    return ret;
}

ADM_dataset_list_t
ADM_dataset_list_create(ADM_dataset_t datasets[], size_t length) {

    ADM_dataset_list_t p = (ADM_dataset_list_t) malloc(sizeof(*p));

    if(!p) {
        LOGGER_ERROR("Could not allocate ADM_dataset_list_t")
        return NULL;
    }

    const char* error_msg = NULL;

    p->l_length = length;
    p->l_datasets = (struct adm_dataset*) calloc(length, sizeof(adm_dataset));

    if(!p->l_datasets) {
        error_msg = "Could not allocate ADM_dataset_list_t";
        goto cleanup_on_error;
    }

    for(size_t i = 0; i < length; ++i) {
        if(!ADM_dataset_copy(&p->l_datasets[i], datasets[i])) {
            error_msg = "Could not allocate ADM_dataset_list_t";
            goto cleanup_on_error;
        };

        fprintf(stderr, "o: %s -> %s\n", datasets[i]->d_id,
                p->l_datasets[i].d_id);
    }

    return p;

cleanup_on_error:
    if(p->l_datasets) {
        free(p->l_datasets);
    }
    free(p);

    if(error_msg) {
        LOGGER_ERROR(error_msg);
    }

    return NULL;
}

ADM_return_t
ADM_dataset_list_destroy(ADM_dataset_list_t list) {
    ADM_return_t ret = ADM_SUCCESS;

    if(!list) {
        LOGGER_ERROR("Invalid ADM_pfs_context_t")
        return ADM_EBADARGS;
    }

    // We cannot call ADM_dataset_destroy here because adm_datasets
    // are stored as a consecutive array in memory. Thus, we free
    // the dataset ids themselves and then the array.
    if(list->l_datasets) {
        for(size_t i = 0; i < list->l_length; ++i) {
            free((void*) list->l_datasets[i].d_id);
        }
        free(list->l_datasets);
    }

    free(list);
    return ret;
}

ADM_storage_t
ADM_storage_create(const char* id, ADM_storage_type_t type, void* ctx) {

@@ -426,24 +522,62 @@ ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len,
        return NULL;
    }

    adm_job_reqs->r_inputs = inputs;
    adm_job_reqs->r_num_inputs = inputs_len;
    adm_job_reqs->r_outputs = outputs;
    adm_job_reqs->r_num_outputs = outputs_len;
    ADM_dataset_list_t inputs_list = NULL;
    ADM_dataset_list_t outputs_list = NULL;
    const char* error_msg = NULL;

    inputs_list = ADM_dataset_list_create(inputs, inputs_len);

    if(!inputs_list) {
        error_msg = "Could not allocate ADM_job_requirements_t";
        goto cleanup_on_error;
    }

    outputs_list = ADM_dataset_list_create(outputs, outputs_len);

    if(!outputs_list) {
        error_msg = "Could not allocate ADM_job_requirements_t";
        goto cleanup_on_error;
    }

    adm_job_reqs->r_inputs = inputs_list;
    adm_job_reqs->r_outputs = outputs_list;

    if(!storage) {
        adm_job_reqs->r_adhoc_ctx = NULL;
        return adm_job_reqs;
    }

    if(storage) {
    if(storage->s_type != ADM_STORAGE_GEKKOFS &&
       storage->s_type != ADM_STORAGE_DATACLAY &&
       storage->s_type != ADM_STORAGE_EXPAND &&
       storage->s_type != ADM_STORAGE_HERCULES) {
            LOGGER_ERROR("Invalid adhoc_storage")
            return NULL;
        error_msg = "Invalid adhoc_storage argument";
        goto cleanup_on_error;
    }
    adm_job_reqs->r_adhoc_ctx = storage->s_adhoc_ctx;
    }

    return adm_job_reqs;

cleanup_on_error:

    if(outputs_list) {
        ADM_dataset_list_destroy(outputs_list);
    }

    if(inputs_list) {
        ADM_dataset_list_destroy(inputs_list);
    }

    if(adm_job_reqs) {
        ADM_job_requirements_destroy(adm_job_reqs);
    }

    if(error_msg) {
        LOGGER_ERROR(error_msg);
    }

    return NULL;
}

ADM_return_t
@@ -455,6 +589,14 @@ ADM_job_requirements_destroy(ADM_job_requirements_t reqs) {
        return ADM_EBADARGS;
    }

    if(reqs->r_inputs) {
        ADM_dataset_list_destroy(reqs->r_inputs);
    }

    if(reqs->r_outputs) {
        ADM_dataset_list_destroy(reqs->r_outputs);
    }

    free(reqs);
    return ret;
}