Loading src/common/network/proto/rpc_types.c +118 −0 Original line number Diff line number Diff line Loading @@ -95,6 +95,124 @@ hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) { return ret; } hg_return_t hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { (void) proc; (void) data; hg_return_t ret = HG_SUCCESS; ADM_storage_t* storage = (ADM_storage_t*) data; ADM_storage_t tmp = NULL; hg_size_t storage_length = 0; switch(hg_proc_get_op(proc)) { case HG_ENCODE: // find out the length of the adm_storage object we need to send storage_length = *storage ? sizeof(adm_storage) : 0; ret = hg_proc_hg_size_t(proc, &storage_length); if(ret != HG_SUCCESS) { break; } if(!storage_length) { return HG_SUCCESS; } // if we actually need to send an adm_storage object, // write each of its fields to the mercury buffer tmp = *storage; // 1. the storage type ret = hg_proc_hg_uint32_t(proc, &tmp->s_type); if(ret != HG_SUCCESS) { break; } // 2. the storage id ret = hg_proc_hg_const_string_t(proc, &tmp->s_id); if(ret != HG_SUCCESS) { break; } // 3. the appropriate storage context switch(tmp->s_type) { case ADM_STORAGE_GEKKOFS: case ADM_STORAGE_DATACLAY: case ADM_STORAGE_EXPAND: case ADM_STORAGE_HERCULES: ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: case ADM_STORAGE_GPFS: ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_adhoc_ctx); break; } break; case HG_DECODE: // find out the length of the adm_storage object ret = hg_proc_hg_size_t(proc, &storage_length); if(ret != HG_SUCCESS) { break; } if(!storage_length) { *storage = NULL; break; } // if the received adm_storage object was not NULL, read each of // its fields from the mercury buffer tmp = (adm_storage*) calloc(1, sizeof(adm_storage)); // 1. the storage type ret = hg_proc_uint32_t(proc, &tmp->s_type); if(ret != HG_SUCCESS) { break; } // 2. the storage id ret = hg_proc_hg_const_string_t(proc, &tmp->s_id); if(ret != HG_SUCCESS) { break; } // 3. the appropriate storage context switch(tmp->s_type) { case ADM_STORAGE_GEKKOFS: case ADM_STORAGE_DATACLAY: case ADM_STORAGE_EXPAND: case ADM_STORAGE_HERCULES: ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: case ADM_STORAGE_GPFS: ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_adhoc_ctx); break; } // return the newly-created ctx *storage = tmp; break; case HG_FREE: tmp = *storage; free(tmp); break; } return ret; } hg_return_t hg_proc_ADM_adhoc_context_t(hg_proc_t proc, void* data) { Loading src/common/network/proto/rpc_types.h +53 −43 Original line number Diff line number Diff line Loading @@ -126,40 +126,6 @@ MERCURY_GEN_STRUCT_PROC( ); // clang-format on // TODO: union decoder struct adm_storage { const char* s_id; ADM_storage_type_t s_type; union { ADM_adhoc_context_t s_adhoc_ctx; ADM_pfs_context_t s_pfs_ctx; }; }; typedef struct adm_storage_resources { // TODO: undefined for now int32_t placeholder; } adm_storage_resources; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_storage_resources, ((hg_int32_t) (placeholder)) ); // clang-format on typedef struct adm_data_operation { // TODO: undefined for now int32_t placeholder; } adm_data_operation; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_data_operation, ((hg_int32_t) (placeholder)) ); // clang-format on typedef struct adm_adhoc_context { /** The adhoc storage system execution mode */ ADM_adhoc_mode_t c_mode; Loading Loading @@ -196,6 +162,44 @@ MERCURY_GEN_STRUCT_PROC( ); // clang-format on // TODO: union decoder typedef struct adm_storage { const char* s_id; ADM_storage_type_t s_type; union { adm_adhoc_context s_adhoc_ctx; adm_pfs_context s_pfs_ctx; }; } adm_storage; hg_return_t hg_proc_ADM_storage_t(hg_proc_t proc, void* data); typedef struct adm_storage_resources { // TODO: undefined for now int32_t placeholder; } adm_storage_resources; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_storage_resources, ((hg_int32_t) (placeholder)) ); // clang-format on typedef struct adm_data_operation { // TODO: undefined for now int32_t placeholder; } adm_data_operation; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_data_operation, ((hg_int32_t) (placeholder)) ); // clang-format on struct adm_dataset_list { /** An array of datasets */ adm_dataset* l_datasets; Loading @@ -209,25 +213,30 @@ hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data); hg_return_t hg_proc_ADM_adhoc_context_t(hg_proc_t proc, void* data); hg_return_t hg_proc_ADM_pfs_context_t(hg_proc_t proc, void* data); /** The I/O requirements for a job */ typedef struct adm_job_requirements { /** An array of input datasets */ ADM_dataset_list_t r_inputs; /** An array of output datasets */ ADM_dataset_list_t r_outputs; /** An optional definition for a specific adhoc storage instance */ adm_adhoc_context* r_adhoc_ctx; /** An optional definition for a specific storage instance */ ADM_storage_t r_storage; } adm_job_requirements; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_job_requirements, adm_job_requirements, // NOLINT ((ADM_dataset_list_t) (r_inputs)) ((ADM_dataset_list_t) (r_outputs)) ((ADM_adhoc_context_t) (r_adhoc_ctx)) ((ADM_storage_t) (r_storage)) ); // clang-format on // clang-format off /// ADM_register_job MERCURY_GEN_PROC(ADM_register_job_in_t, ((adm_job_requirements) (reqs))) Loading Loading @@ -479,6 +488,7 @@ MERCURY_GEN_PROC(ADM_get_statistics_in_t, MERCURY_GEN_PROC(ADM_get_statistics_out_t, ((int32_t) (ret))((hg_const_string_t) (job_statistics))) // clang-format on #ifdef __cplusplus }; // extern "C" Loading Loading
src/common/network/proto/rpc_types.c +118 −0 Original line number Diff line number Diff line Loading @@ -95,6 +95,124 @@ hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data) { return ret; } hg_return_t hg_proc_ADM_storage_t(hg_proc_t proc, void* data) { (void) proc; (void) data; hg_return_t ret = HG_SUCCESS; ADM_storage_t* storage = (ADM_storage_t*) data; ADM_storage_t tmp = NULL; hg_size_t storage_length = 0; switch(hg_proc_get_op(proc)) { case HG_ENCODE: // find out the length of the adm_storage object we need to send storage_length = *storage ? sizeof(adm_storage) : 0; ret = hg_proc_hg_size_t(proc, &storage_length); if(ret != HG_SUCCESS) { break; } if(!storage_length) { return HG_SUCCESS; } // if we actually need to send an adm_storage object, // write each of its fields to the mercury buffer tmp = *storage; // 1. the storage type ret = hg_proc_hg_uint32_t(proc, &tmp->s_type); if(ret != HG_SUCCESS) { break; } // 2. the storage id ret = hg_proc_hg_const_string_t(proc, &tmp->s_id); if(ret != HG_SUCCESS) { break; } // 3. the appropriate storage context switch(tmp->s_type) { case ADM_STORAGE_GEKKOFS: case ADM_STORAGE_DATACLAY: case ADM_STORAGE_EXPAND: case ADM_STORAGE_HERCULES: ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: case ADM_STORAGE_GPFS: ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_adhoc_ctx); break; } break; case HG_DECODE: // find out the length of the adm_storage object ret = hg_proc_hg_size_t(proc, &storage_length); if(ret != HG_SUCCESS) { break; } if(!storage_length) { *storage = NULL; break; } // if the received adm_storage object was not NULL, read each of // its fields from the mercury buffer tmp = (adm_storage*) calloc(1, sizeof(adm_storage)); // 1. the storage type ret = hg_proc_uint32_t(proc, &tmp->s_type); if(ret != HG_SUCCESS) { break; } // 2. the storage id ret = hg_proc_hg_const_string_t(proc, &tmp->s_id); if(ret != HG_SUCCESS) { break; } // 3. the appropriate storage context switch(tmp->s_type) { case ADM_STORAGE_GEKKOFS: case ADM_STORAGE_DATACLAY: case ADM_STORAGE_EXPAND: case ADM_STORAGE_HERCULES: ret = hg_proc_ADM_adhoc_context_t(proc, &tmp->s_adhoc_ctx); break; case ADM_STORAGE_LUSTRE: case ADM_STORAGE_GPFS: ret = hg_proc_ADM_pfs_context_t(proc, &tmp->s_adhoc_ctx); break; } // return the newly-created ctx *storage = tmp; break; case HG_FREE: tmp = *storage; free(tmp); break; } return ret; } hg_return_t hg_proc_ADM_adhoc_context_t(hg_proc_t proc, void* data) { Loading
src/common/network/proto/rpc_types.h +53 −43 Original line number Diff line number Diff line Loading @@ -126,40 +126,6 @@ MERCURY_GEN_STRUCT_PROC( ); // clang-format on // TODO: union decoder struct adm_storage { const char* s_id; ADM_storage_type_t s_type; union { ADM_adhoc_context_t s_adhoc_ctx; ADM_pfs_context_t s_pfs_ctx; }; }; typedef struct adm_storage_resources { // TODO: undefined for now int32_t placeholder; } adm_storage_resources; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_storage_resources, ((hg_int32_t) (placeholder)) ); // clang-format on typedef struct adm_data_operation { // TODO: undefined for now int32_t placeholder; } adm_data_operation; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_data_operation, ((hg_int32_t) (placeholder)) ); // clang-format on typedef struct adm_adhoc_context { /** The adhoc storage system execution mode */ ADM_adhoc_mode_t c_mode; Loading Loading @@ -196,6 +162,44 @@ MERCURY_GEN_STRUCT_PROC( ); // clang-format on // TODO: union decoder typedef struct adm_storage { const char* s_id; ADM_storage_type_t s_type; union { adm_adhoc_context s_adhoc_ctx; adm_pfs_context s_pfs_ctx; }; } adm_storage; hg_return_t hg_proc_ADM_storage_t(hg_proc_t proc, void* data); typedef struct adm_storage_resources { // TODO: undefined for now int32_t placeholder; } adm_storage_resources; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_storage_resources, ((hg_int32_t) (placeholder)) ); // clang-format on typedef struct adm_data_operation { // TODO: undefined for now int32_t placeholder; } adm_data_operation; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_data_operation, ((hg_int32_t) (placeholder)) ); // clang-format on struct adm_dataset_list { /** An array of datasets */ adm_dataset* l_datasets; Loading @@ -209,25 +213,30 @@ hg_proc_ADM_dataset_list_t(hg_proc_t proc, void* data); hg_return_t hg_proc_ADM_adhoc_context_t(hg_proc_t proc, void* data); hg_return_t hg_proc_ADM_pfs_context_t(hg_proc_t proc, void* data); /** The I/O requirements for a job */ typedef struct adm_job_requirements { /** An array of input datasets */ ADM_dataset_list_t r_inputs; /** An array of output datasets */ ADM_dataset_list_t r_outputs; /** An optional definition for a specific adhoc storage instance */ adm_adhoc_context* r_adhoc_ctx; /** An optional definition for a specific storage instance */ ADM_storage_t r_storage; } adm_job_requirements; // clang-format off MERCURY_GEN_STRUCT_PROC( adm_job_requirements, adm_job_requirements, // NOLINT ((ADM_dataset_list_t) (r_inputs)) ((ADM_dataset_list_t) (r_outputs)) ((ADM_adhoc_context_t) (r_adhoc_ctx)) ((ADM_storage_t) (r_storage)) ); // clang-format on // clang-format off /// ADM_register_job MERCURY_GEN_PROC(ADM_register_job_in_t, ((adm_job_requirements) (reqs))) Loading Loading @@ -479,6 +488,7 @@ MERCURY_GEN_PROC(ADM_get_statistics_in_t, MERCURY_GEN_PROC(ADM_get_statistics_out_t, ((int32_t) (ret))((hg_const_string_t) (job_statistics))) // clang-format on #ifdef __cplusplus }; // extern "C" Loading