Verified Commit 5db5a598 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Refactor ADM_storage_t, ADM_adhoc_context_t, and ADM_pfs_context_t

parent 6368b88f
Loading
Loading
Loading
Loading
+91 −18
Original line number Diff line number Diff line
@@ -227,10 +227,41 @@ ADM_dataset_info_create();
ADM_return_t
ADM_dataset_info_destroy(ADM_dataset_info_t dataset_info);

/** A storage tier handle */
typedef struct {
    // TODO: empty for now
} ADM_storage_t;
typedef enum {
    ADM_STORAGE_GEKKOFS,
    ADM_STORAGE_DATACLAY,
    ADM_STORAGE_EXPAND,
    ADM_STORAGE_HERCULES,
    ADM_STORAGE_LUSTRE,
    ADM_STORAGE_GPFS,
} ADM_storage_type_t;

/** A storage tier */
typedef struct adm_storage* ADM_storage_t;

/**
 * Create a ADM_STORAGE to represent a storage tier.
 *
 * @remark ADM_STORAGEs need to be freed by calling ADM_storage_destroy().
 *
 * @param[in] id An identifier for the storage tier
 * @param[in] type The type for the storage tier being created.
 * @param[in] ctx Some specific context information for the storage tier or
 * NULL if none is required. For instance, an adhoc storage system may find it
 * useful to provide an ADM_adhoc_context_t describing the instance.
 * @return A valid ADM_STORAGE if successful, or NULL in case of failure.
 */
ADM_storage_t
ADM_storage_create(const char* id, ADM_storage_type_t type, void* ctx);

/**
 * Destroy a ADM_STORAGE created by ADM_storage_destroy().
 *
 * @param[in] storage A valid ADM_STORAGE
 * @return ADM_SUCCESS or corresponding ADM error code
 */
ADM_return_t
ADM_storage_destroy(ADM_storage_t storage);

/** Information about resources assigned to a storage tier */
typedef struct {
@@ -270,18 +301,60 @@ typedef enum {
typedef struct adm_adhoc_data_distribution* ADM_adhoc_data_distribution_t;

/** The context for an  adhoc storage instance */
typedef struct {
    /** The adhoc storage system execution mode */
    ADM_adhoc_mode_t c_mode;
    /** The adhoc storage system access type */
    ADM_adhoc_access_t c_access;
    /** The number of nodes for the adhoc storage system */
    uint32_t c_nodes;
    /** The adhoc storage system walltime */
    uint32_t c_walltime;
    /** Whether the adhoc storage system should flush data in the background */
    bool c_should_bg_flush;
} ADM_adhoc_context_t;
typedef struct adm_adhoc_context* ADM_adhoc_context_t;

/**
 * Create an ADM_ADHOC_CONTEXT from information about how an adhoc storage
 * instance should be executed.
 *
 * @remark ADM_ADHOC_CONTEXTs need to be freed by calling
 * ADM_adhoc_context_destroy().
 *
 * @param[in] exec_mode The adhoc storage system execution mode
 * @param[in] access_type The adhoc storage system execution type
 * @param[in] nodes The number of nodes for the adhoc storage system
 * @param[in] walltime The adhoc storage system walltime
 * @param[in] should_flush Whether the adhoc storage system should flush data in
 * the background
 * @return A valid ADM_ADHOC_CONTEXT if successful. NULL otherwise.
 */
ADM_adhoc_context_t
ADM_adhoc_context_create(ADM_adhoc_mode_t exec_mode,
                         ADM_adhoc_access_t access_type, uint32_t nodes,
                         uint32_t walltime, bool should_flush);

/**
 * Destroy an ADM_ADHOC_CONTEXT created by ADM_adhoc_context_create().
 *
 * @param[in] ctx A valid ADM_ADHOC_CONTEXT
 * @return ADM_SUCCESS or corresponding ADM error code
 */
ADM_return_t
ADM_adhoc_context_destroy(ADM_adhoc_context_t ctx);

/** The context for a parallel file system storage */
typedef struct adm_pfs_context* ADM_pfs_context_t;

/**
 * Create an ADM_PFS_CONTEXT from information about how a PFS is configured.
 *
 * @remark ADM_PFS_CONTEXTs need to be freed by calling
 * ADM_pfs_context_destroy().
 *
 * @param[in] mountpoint The PFS mount point
 * @return A valid ADM_PFS_CONTEXT if successful. NULL otherwise.
 */
ADM_pfs_context_t
ADM_pfs_context_create(const char* mountpoint);

/**
 * Destroy an ADM_PFS_CONTEXT created by ADM_pfs_context_create().
 *
 * @param[in] ctx A valid ADM_PFS_CONTEXT
 * @return ADM_SUCCESS or corresponding ADM error code
 */
ADM_return_t
ADM_pfs_context_destroy(ADM_pfs_context_t ctx);

typedef ADM_adhoc_context_t* ADM_adhoc_storage_handle_t;

@@ -300,7 +373,7 @@ typedef struct adm_job_requirements* ADM_job_requirements_t;
 * @param[in] outputs An array of DATASET_DESCRIPTORS describing the output
 * information generated by the job.
 * @param[in] outputs_len The number of DATASET_DESCRIPTORS stored in outputs.
 * @param[in] adhoc_storage An optional ADHOC_DESCRIPTOR describing the adhoc
 * @param[in] storage An optional ADHOC_DESCRIPTOR describing the adhoc
 * storage system required by the job (can be set to NULL if no adhoc storage
 * system is required).
 * @return A valid ADM_job_requirements_t if sucessfull or NULL in case of
@@ -309,7 +382,7 @@ typedef struct adm_job_requirements* ADM_job_requirements_t;
ADM_job_requirements_t
ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len,
                            ADM_dataset_t outputs[], size_t outputs_len,
                            ADM_adhoc_storage_handle_t adhoc_storage);
                            ADM_storage_t storage);

/**
 * Destroy a ADM_job_requirements_t created by ADM_job_requirements_create().
+162 −3
Original line number Diff line number Diff line
@@ -73,6 +73,33 @@ struct adm_dataset_info {
    // TODO: empty for now
};

struct adm_storage {
    const char* s_id;
    ADM_storage_type_t s_type;
    union {
        ADM_adhoc_context_t s_adhoc_ctx;
        ADM_pfs_context_t s_pfs_ctx;
    };
};

struct adm_adhoc_context {
    /** The adhoc storage system execution mode */
    ADM_adhoc_mode_t c_mode;
    /** The adhoc storage system access type */
    ADM_adhoc_access_t c_access;
    /** The number of nodes for the adhoc storage system */
    uint32_t c_nodes;
    /** The adhoc storage system walltime */
    uint32_t c_walltime;
    /** Whether the adhoc storage system should flush data in the background */
    bool c_should_bg_flush;
};

struct adm_pfs_context {
    /** The PFS mount point */
    const char* c_mount;
};

/** The I/O requirements for a job */
struct adm_job_requirements {
    /** An array of input datasets */
@@ -84,7 +111,7 @@ struct adm_job_requirements {
    /** The number of datasets in r_outputs */
    size_t r_num_outputs;
    /** An optional definition for a specific adhoc storage instance */
    ADM_adhoc_storage_handle_t r_adhoc_storage;
    ADM_adhoc_context_t r_adhoc_ctx;
};

ADM_server_t
@@ -282,10 +309,131 @@ ADM_dataset_info_destroy(ADM_dataset_info_t dataset_info) {
    return ret;
}

ADM_storage_t
ADM_storage_create(const char* id, ADM_storage_type_t type, void* ctx) {

    struct adm_storage* adm_storage =
            (struct adm_storage*) malloc(sizeof(*adm_storage));

    if(!adm_storage) {
        LOGGER_ERROR("Could not allocate ADM_storage_t");
        return NULL;
    }

    if(!id) {
        LOGGER_ERROR("Null storage id")
        return NULL;
    }

    if(!ctx) {
        LOGGER_ERROR("Null storage context")
        return NULL;
    }

    adm_storage->s_id = id;
    adm_storage->s_type = type;

    switch(adm_storage->s_type) {
        case ADM_STORAGE_GEKKOFS:
        case ADM_STORAGE_DATACLAY:
        case ADM_STORAGE_EXPAND:
        case ADM_STORAGE_HERCULES:
            adm_storage->s_adhoc_ctx = *((ADM_adhoc_context_t*) ctx);
            break;

        case ADM_STORAGE_LUSTRE:
        case ADM_STORAGE_GPFS:
            adm_storage->s_pfs_ctx = *((ADM_pfs_context_t*) ctx);
            break;
    }

    return adm_storage;
}

ADM_return_t
ADM_storage_destroy(ADM_storage_t storage) {

    ADM_return_t ret = ADM_SUCCESS;

    if(!storage) {
        LOGGER_ERROR("Invalid ADM_storage_t")
        return ADM_EBADARGS;
    }

    free(storage);
    return ret;
}

ADM_adhoc_context_t
ADM_adhoc_context_create(ADM_adhoc_mode_t exec_mode,
                         ADM_adhoc_access_t access_type, uint32_t nodes,
                         uint32_t walltime, bool should_flush) {

    struct adm_adhoc_context* adm_adhoc_context =
            (struct adm_adhoc_context*) malloc(sizeof(*adm_adhoc_context));

    if(!adm_adhoc_context) {
        LOGGER_ERROR("Could not allocate ADM_adhoc_context_t");
        return NULL;
    }

    adm_adhoc_context->c_mode = exec_mode;
    adm_adhoc_context->c_access = access_type;
    adm_adhoc_context->c_nodes = nodes;
    adm_adhoc_context->c_walltime = walltime;
    adm_adhoc_context->c_should_bg_flush = should_flush;

    return adm_adhoc_context;
}

ADM_return_t
ADM_adhoc_context_destroy(ADM_adhoc_context_t ctx) {


    ADM_return_t ret = ADM_SUCCESS;

    if(!ctx) {
        LOGGER_ERROR("Invalid ADM_adhoc_context_t")
        return ADM_EBADARGS;
    }

    free(ctx);
    return ret;
}

ADM_pfs_context_t
ADM_pfs_context_create(const char* mountpoint) {

    struct adm_pfs_context* adm_pfs_context =
            (struct adm_pfs_context*) malloc(sizeof(*adm_pfs_context));

    if(!adm_pfs_context) {
        LOGGER_ERROR("Could not allocate ADM_adhoc_context_t");
        return NULL;
    }

    adm_pfs_context->c_mount = mountpoint;

    return adm_pfs_context;
}

ADM_return_t
ADM_pfs_context_destroy(ADM_pfs_context_t ctx) {
    ADM_return_t ret = ADM_SUCCESS;

    if(!ctx) {
        LOGGER_ERROR("Invalid ADM_pfs_context_t")
        return ADM_EBADARGS;
    }

    free(ctx);
    return ret;
}

ADM_job_requirements_t
ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len,
                            ADM_dataset_t outputs[], size_t outputs_len,
                            ADM_adhoc_storage_handle_t adhoc_storage) {
                            ADM_storage_t storage) {

    struct adm_job_requirements* adm_job_reqs =
            (struct adm_job_requirements*) malloc(
@@ -300,7 +448,18 @@ ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len,
    adm_job_reqs->r_num_inputs = inputs_len;
    adm_job_reqs->r_outputs = outputs;
    adm_job_reqs->r_num_outputs = outputs_len;
    adm_job_reqs->r_adhoc_storage = adhoc_storage;
    adm_job_reqs->r_adhoc_ctx = NULL;

    if(storage) {
        if(storage->s_type != ADM_STORAGE_GEKKOFS &&
           storage->s_type != ADM_STORAGE_DATACLAY &&
           storage->s_type != ADM_STORAGE_EXPAND &&
           storage->s_type != ADM_STORAGE_HERCULES) {
            LOGGER_ERROR("Invalid adhoc_storage")
            return NULL;
        }
        adm_job_reqs->r_adhoc_ctx = storage->s_adhoc_ctx;
    }

    return adm_job_reqs;
}