diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 77bb2a0b2ecf5c768390990dce1225f6b274653f..8dd6fc82fd157f17b9b9d4ae1456c6d7ae922f06 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; @@ -143,7 +144,8 @@ main(int argc, char* argv[]) { cleanup: ADM_remove_job(server, job); ADM_server_destroy(server); - destroy_datasets(inputs, NINPUTS); - destroy_datasets(outputs, NOUTPUTS); + destroy_routes(inputs, NINPUTS); + destroy_routes(outputs, NOUTPUTS); + destroy_routes(expected_outputs, NEXPOUTPUTS); exit(exit_status); } diff --git a/examples/c/ADM_connect_data_operation.c b/examples/c/ADM_connect_data_operation.c index 8ae8f55ea0a3ce493f1eca9288ae299f85f285f5..b766b0e903cffae4d885d7a0b16f1f67b3ba6afb 100644 --- a/examples/c/ADM_connect_data_operation.c +++ b/examples/c/ADM_connect_data_operation.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; @@ -108,8 +109,8 @@ main(int argc, char* argv[]) { exit_status = EXIT_SUCCESS; bool should_stream = false; - ret = ADM_connect_data_operation(server, job, inputs[0], outputs[0], - should_stream); + ret = ADM_connect_data_operation(server, job, /*inputs[0]*/ NULL, + /*outputs[0]*/ NULL, should_stream); if(ret != ADM_SUCCESS) { @@ -125,6 +126,7 @@ main(int argc, char* argv[]) { "successfully\n"); cleanup: + /* for(int i = 0; i < NINPUTS; ++i) { ADM_dataset_destroy(inputs[i]); } @@ -132,6 +134,7 @@ cleanup: for(int i = 0; i < NOUTPUTS; ++i) { ADM_dataset_destroy(outputs[i]); } + */ ADM_remove_job(server, job); ADM_server_destroy(server); diff --git a/examples/c/ADM_define_data_operation.c b/examples/c/ADM_define_data_operation.c index 04a151c497c4e88b95c434b7a4b6506458e11a2e..c7a225f691a9e49e2809d851c39f0522497789e2 100644 --- a/examples/c/ADM_define_data_operation.c +++ b/examples/c/ADM_define_data_operation.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -56,10 +51,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -70,8 +70,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -91,7 +91,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index b5317eb193e9e283915cff495e2a5fef29ca8a8e..f568818b01c63db95bdaf83c03131f63d0ef96be 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -27,10 +27,6 @@ #include #include "common.h" -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -82,9 +78,9 @@ main(int argc, char* argv[]) { } // 3. the adhoc storage execution context - adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); if(adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing adhoc context\n"); @@ -115,9 +111,9 @@ main(int argc, char* argv[]) { // system, let's prepare a new execution context for the adhoc // storage system - new_adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 200, false); + new_adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 200, false); if(new_adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing new adhoc context\n"); diff --git a/examples/c/ADM_finalize_data_operation.c b/examples/c/ADM_finalize_data_operation.c index 89c421ea1c78e51ab66f1141a0a4e570c283236e..f32b5e4711cb863aa9b70ff68c6450e03a94b984 100644 --- a/examples/c/ADM_finalize_data_operation.c +++ b/examples/c/ADM_finalize_data_operation.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_get_pending_transfers.c b/examples/c/ADM_get_pending_transfers.c index 6789136cf19e5018d395e82067fe8df395f80feb..4184db301cd807ae61b4423bcabeb4a047a63d6c 100644 --- a/examples/c/ADM_get_pending_transfers.c +++ b/examples/c/ADM_get_pending_transfers.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_get_qos_constraints.c b/examples/c/ADM_get_qos_constraints.c index be8498ee58579ce6312c60101a29f936146d1cc3..92807bccad69571c3708d9ff32149ed020b91b74 100644 --- a/examples/c/ADM_get_qos_constraints.c +++ b/examples/c/ADM_get_qos_constraints.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_get_statistics.c b/examples/c/ADM_get_statistics.c index 4f0c7db51a95a18898b27b71aa4eea33039328f5..e657ab818f249fd3df9cb60a4143df8c8953307f 100644 --- a/examples/c/ADM_get_statistics.c +++ b/examples/c/ADM_get_statistics.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index 5a5957bdeb46646c39045ec455a06d4b3ecd4008..cb97a730c798e36118c321099683f750f907a983 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index bfab8b6b1098e511859f9b2567d430fbe01ecb4e..34552d9170816544c8a01a624ab4d6681dcc9e12 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index 0d2ff17e3ac23b331683feec6be8c819dd9b9fec..b86d29a60f6a360afdc9d02d137b869a304d992e 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -27,10 +27,6 @@ #include #include "common.h" -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -78,9 +74,9 @@ main(int argc, char* argv[]) { } // 3. define the adhoc execution context - adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); if(adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing adhoc context\n"); diff --git a/examples/c/ADM_register_job.c b/examples/c/ADM_register_job.c index e4be41538f28117bcaec7e0ad95911a76039e14d..cbff276e17045894aa5b69cc5a433b4de47386eb 100644 --- a/examples/c/ADM_register_job.c +++ b/examples/c/ADM_register_job.c @@ -27,11 +27,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -63,8 +58,9 @@ main(int argc, char* argv[]) { ADM_job_resources_t job_resources = NULL; ADM_job_requirements_t reqs = NULL; uint64_t slurm_job_id = 42; - ADM_dataset_t* inputs = NULL; - ADM_dataset_t* outputs = NULL; + ADM_dataset_route_t* inputs = NULL; + ADM_dataset_route_t* outputs = NULL; + ADM_dataset_route_t* expected_outputs = NULL; // Let's prepare all the information required by the API calls. // ADM_register_job() often requires an adhoc storage to have been @@ -88,9 +84,9 @@ main(int argc, char* argv[]) { } // 3. the adhoc storage execution context - adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); if(adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing adhoc context\n"); @@ -137,21 +133,29 @@ main(int argc, char* argv[]) { } // 2. the job's requirements - inputs = prepare_datasets("input-dataset-%d", NINPUTS); + inputs = prepare_routes("%s-input-dataset-%d", NINPUTS); if(inputs == NULL) { fprintf(stderr, "Fatal error preparing input datasets\n"); goto cleanup; } - outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + outputs = prepare_routes("%s-output-dataset-%d", NOUTPUTS); if(outputs == NULL) { fprintf(stderr, "Fatal error preparing output datasets\n"); goto cleanup; } + expected_outputs = prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + + if(expected_outputs == NULL) { + fprintf(stderr, "Fatal error preparing expected output datasets\n"); + goto cleanup; + } + if((reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, + expected_outputs, NEXPOUTPUTS, adhoc_storage)) == NULL) { fprintf(stderr, "ADM_job_requirements_create() failed"); goto cleanup; @@ -191,8 +195,9 @@ cleanup: ADM_server_destroy(server); ADM_job_requirements_destroy(reqs); - destroy_datasets(outputs, NOUTPUTS); - destroy_datasets(inputs, NINPUTS); + destroy_routes(outputs, NOUTPUTS); + destroy_routes(inputs, NINPUTS); + destroy_routes(expected_outputs, NEXPOUTPUTS); ADM_job_resources_destroy(job_resources); destroy_nodes(job_nodes, NJOB_NODES); diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index 17d8cf78292b611b48f94fbe0a2ed2566e740c1b..28f47c1bf172bdeb0b971d4052d2f7895ae8c3d9 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -27,10 +27,6 @@ #include #include "common.h" -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -80,9 +76,9 @@ main(int argc, char* argv[]) { } // 3. the adhoc storage execution context - adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); if(adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing adhoc context\n"); diff --git a/examples/c/ADM_remove_job.c b/examples/c/ADM_remove_job.c index ddf7a2d6f2064df0241c056c767b485e16ba55dc..7ced330c4c6377e7f14580ce8214fb2af06d3e61 100644 --- a/examples/c/ADM_remove_job.c +++ b/examples/c/ADM_remove_job.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_set_dataset_information.c b/examples/c/ADM_set_dataset_information.c index 548befd80a4773c717dcff6645cf35455784dee3..2a8b4120ca88113b31135752d6c5ea279a092f45 100644 --- a/examples/c/ADM_set_dataset_information.c +++ b/examples/c/ADM_set_dataset_information.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_set_io_resources.c b/examples/c/ADM_set_io_resources.c index cdf07a3bbefc20c5f840cb60522bf980ad37ee77..b38a7d334e2966b5c098c3a1d1b813b854c36677 100644 --- a/examples/c/ADM_set_io_resources.c +++ b/examples/c/ADM_set_io_resources.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,18 +50,23 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { assert(job_resources); ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_set_qos_constraints.c b/examples/c/ADM_set_qos_constraints.c index a7f07bc11165cafae34ad9b1fff07328e0019b15..80c5e6ba412e09580959c1d202e6cae8d9e45885 100644 --- a/examples/c/ADM_set_qos_constraints.c +++ b/examples/c/ADM_set_qos_constraints.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index 8ba26be5909a38059d6f4e5377b8016b8b645af8..2a1d06f8a5314ab64a5466fe4a0d8c0fdf80db2f 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_terminate_adhoc_storage.c b/examples/c/ADM_terminate_adhoc_storage.c index dafd05e84a27dab0871aa0cca46ab14b52b92761..6677c5fee8ced0fac7ea99cfbe31ec648d497af5 100644 --- a/examples/c/ADM_terminate_adhoc_storage.c +++ b/examples/c/ADM_terminate_adhoc_storage.c @@ -27,10 +27,6 @@ #include #include "common.h" -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -82,9 +78,9 @@ main(int argc, char* argv[]) { } // 3. the adhoc storage execution context - adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); if(adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing adhoc context\n"); @@ -115,9 +111,9 @@ main(int argc, char* argv[]) { // system, let's prepare a new execution context for the adhoc // storage system - new_adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 200, false); + new_adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 200, false); if(new_adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing new adhoc context\n"); diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index 343c603e223ae667938eed9b86808488ec65d82a..2433091f417c5a7dce19ad9585d2ba1357740069 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -28,14 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 -#define NSOURCES 5 -#define NTARGETS 5 -#define NLIMITS 3 - int main(int argc, char* argv[]) { @@ -58,10 +50,15 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); + assert(expected_outputs); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -72,8 +69,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -93,7 +90,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index 823f5504ef32b92dffcc1ae6f4489effbcb41e6a..ed33af03448372aea35181db262f126fa3607544 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -27,11 +27,6 @@ #include #include "common.h" -#define NADHOC_NODES 25 -#define N_NEW_ADHOC_NODES 10 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -84,9 +79,9 @@ main(int argc, char* argv[]) { } // 3. the adhoc storage execution context - adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); if(adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing adhoc context\n"); @@ -124,9 +119,9 @@ main(int argc, char* argv[]) { goto cleanup; } - new_adhoc_ctx = ADM_adhoc_context_create(cli_args.controller_address, - ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 200, false); + new_adhoc_ctx = ADM_adhoc_context_create( + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 200, false); if(new_adhoc_ctx == NULL) { fprintf(stderr, "Fatal error preparing new adhoc context\n"); diff --git a/examples/c/ADM_update_job.c b/examples/c/ADM_update_job.c index 25ca673525c4455c0438cbfa9498bb6b9c6cf5d7..70b8180c71fee278fa7acc120e823e5fd112a697 100644 --- a/examples/c/ADM_update_job.c +++ b/examples/c/ADM_update_job.c @@ -28,11 +28,6 @@ #include #include "common.h" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -55,10 +50,14 @@ main(int argc, char* argv[]) { assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); - ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); + ADM_dataset_route_t* inputs = + prepare_routes("%s-input-dataset-%d", NINPUTS); assert(inputs); - ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); + ADM_dataset_route_t* outputs = + prepare_routes("%s-output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_dataset_route_t* expected_outputs = + prepare_routes("%s-exp-output-dataset-%d", NEXPOUTPUTS); ADM_job_resources_t job_resources = ADM_job_resources_create(job_nodes, NJOB_NODES); @@ -69,8 +68,8 @@ main(int argc, char* argv[]) { assert(adhoc_resources); ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW, - ADM_ADHOC_ACCESS_RDWR, 100, false); + cli_args.controller_address, cli_args.data_stager_address, + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); assert(ctx); const char* name = "adhoc_storage_42"; @@ -90,7 +89,8 @@ main(int argc, char* argv[]) { } ADM_job_requirements_t reqs = ADM_job_requirements_create( - inputs, NINPUTS, outputs, NOUTPUTS, adhoc_storage); + inputs, NINPUTS, outputs, NOUTPUTS, expected_outputs, NEXPOUTPUTS, + adhoc_storage); assert(reqs); uint64_t slurm_job_id = 42; @@ -128,14 +128,9 @@ main(int argc, char* argv[]) { cleanup: - for(int i = 0; i < NINPUTS; ++i) { - ADM_dataset_destroy(inputs[i]); - } - - for(int i = 0; i < NOUTPUTS; ++i) { - ADM_dataset_destroy(outputs[i]); - } - + destroy_routes(inputs, NINPUTS); + destroy_routes(outputs, NOUTPUTS); + destroy_routes(expected_outputs, NEXPOUTPUTS); ADM_remove_job(server, job); ADM_server_destroy(server); exit(exit_status); diff --git a/examples/c/common.c b/examples/c/common.c index b19935986c31a73fde8798e65a0b0d2e660fa8a0..af6622058c7aea36199ab4d0403a8e12b0dc92a2 100644 --- a/examples/c/common.c +++ b/examples/c/common.c @@ -18,11 +18,16 @@ process_args(int argc, char* argv[], test_info_t test_info, cli_args_t* args) { ++required_args; } + if(test_info.requires_data_stager) { + ++required_args; + } + if(argc != required_args) { fprintf(stderr, "ERROR: missing arguments\n"); - fprintf(stderr, "Usage: %s%s%s\n", test_info.name, + fprintf(stderr, "Usage: %s%s%s%s\n", test_info.name, test_info.requires_server ? " " : "", - test_info.requires_controller ? " " : ""); + test_info.requires_controller ? " " : "", + test_info.requires_data_stager ? " " : ""); return -1; } @@ -109,6 +114,52 @@ destroy_datasets(ADM_dataset_t datasets[], size_t n) { free(datasets); } +ADM_dataset_route_t* +prepare_routes(const char* pattern, size_t n) { + + ADM_dataset_route_t* routes = calloc(n, sizeof(ADM_dataset_route_t)); + + if(!routes) { + return NULL; + } + + for(size_t i = 0; i < n; ++i) { + size_t len = snprintf(NULL, 0, pattern, "XXX", i); + char* id = (char*) alloca(len + 1); + snprintf(id, len + 1, pattern, "src", i); + ADM_dataset_t src = ADM_dataset_create(id); + snprintf(id, len + 1, pattern, "dst", i); + ADM_dataset_t dst = ADM_dataset_create(id); + + if(!src || !dst) { + return NULL; + } + + routes[i] = ADM_dataset_route_create(src, dst); + if(!routes[i]) { + return NULL; + } + } + + return routes; +} + +void +destroy_routes(ADM_dataset_route_t routes[], size_t n) { + + if(!routes) { + return; + } + + for(size_t i = 0; i < n; ++i) { + if(routes[i]) { + ADM_dataset_route_destroy(routes[i]); + } + } + + free(routes); +} + ADM_qos_limit_t* prepare_qos_limits(size_t n) { diff --git a/examples/c/common.h b/examples/c/common.h index 53d472113ab6c0e1d831319f0979adb99fd4001a..08ed9a8c784fd121bf18eb7f627a3bec2bcb229a 100644 --- a/examples/c/common.h +++ b/examples/c/common.h @@ -3,6 +3,16 @@ #include +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define N_NEW_ADHOC_NODES 10 +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NEXPOUTPUTS 1 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 3 + #define TESTNAME \ (__builtin_strrchr(__FILE__, '/') ? __builtin_strrchr(__FILE__, '/') + 1 \ : __FILE__) @@ -11,11 +21,13 @@ typedef struct { const char* name; bool requires_server; bool requires_controller; + bool requires_data_stager; } test_info_t; typedef struct { const char* server_address; const char* controller_address; + const char* data_stager_address; } cli_args_t; int @@ -33,6 +45,12 @@ prepare_datasets(const char* pattern, size_t n); void destroy_datasets(ADM_dataset_t datasets[], size_t n); +ADM_dataset_route_t* +prepare_routes(const char* pattern, size_t n); + +void +destroy_routes(ADM_dataset_route_t routes[], size_t n); + ADM_qos_limit_t* prepare_qos_limits(size_t n); diff --git a/examples/cxx/ADM_cancel_transfer.cpp b/examples/cxx/ADM_cancel_transfer.cpp index d2791147037f6c826c404c9f8e470d93b8a81a8e..7ed4a55c3a36e3e9c37b505065f196d3a13eaf4d 100644 --- a/examples/cxx/ADM_cancel_transfer.cpp +++ b/examples/cxx/ADM_cancel_transfer.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_connect_data_operation.cpp b/examples/cxx/ADM_connect_data_operation.cpp index d3bc9ac1d63c15841e4cea875d81ff3b7a5d1176..74499a8deb53f492e4cebbf71206c7aa26191585 100644 --- a/examples/cxx/ADM_connect_data_operation.cpp +++ b/examples/cxx/ADM_connect_data_operation.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_define_data_operation.cpp b/examples/cxx/ADM_define_data_operation.cpp index e34a8d05568000ab5243c7f1e2c980b756825aef..097a8c00926639f750bbbd3576d9e2778cacee94 100644 --- a/examples/cxx/ADM_define_data_operation.cpp +++ b/examples/cxx/ADM_define_data_operation.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_deploy_adhoc_storage.cpp b/examples/cxx/ADM_deploy_adhoc_storage.cpp index 93030a2706abe9862c864a96f7a36c191b583ef0..0944ee1a694436215482e5a4fcb6a36c0210713c 100644 --- a/examples/cxx/ADM_deploy_adhoc_storage.cpp +++ b/examples/cxx/ADM_deploy_adhoc_storage.cpp @@ -38,6 +38,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -51,8 +52,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { diff --git a/examples/cxx/ADM_finalize_data_operation.cpp b/examples/cxx/ADM_finalize_data_operation.cpp index a9148a199b83d1d589412b3a1f612b0bac99a588..acc18a1978b87b745e785435e66e567088ce4895 100644 --- a/examples/cxx/ADM_finalize_data_operation.cpp +++ b/examples/cxx/ADM_finalize_data_operation.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_get_pending_transfers.cpp b/examples/cxx/ADM_get_pending_transfers.cpp index 60f7d9eb7976f33614b0d2ad16798050016204ed..e5bd97e072cce04fbf4a36bb0037de78b5e21b5f 100644 --- a/examples/cxx/ADM_get_pending_transfers.cpp +++ b/examples/cxx/ADM_get_pending_transfers.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_get_qos_constraints.cpp b/examples/cxx/ADM_get_qos_constraints.cpp index 389ac2b199d14312b396c109f156eb1e8bf2ad26..04d3c62e01a3cdf2bdf7c4db442a7c931cb80a09 100644 --- a/examples/cxx/ADM_get_qos_constraints.cpp +++ b/examples/cxx/ADM_get_qos_constraints.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_get_statistics.cpp b/examples/cxx/ADM_get_statistics.cpp index a19e506b061a2dd7b21ec8ea9c3f909ad9e06294..15a12c95ff66e4cd9d2e53f76861500c3ead083d 100644 --- a/examples/cxx/ADM_get_statistics.cpp +++ b/examples/cxx/ADM_get_statistics.cpp @@ -29,11 +29,10 @@ int main(int argc, char* argv[]) { - test_info test_info{ - .name = TESTNAME, - .requires_server = true, - .requires_controller = true, - }; + test_info test_info{.name = TESTNAME, + .requires_server = true, + .requires_controller = true, + .requires_data_stager = true}; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_get_transfer_priority.cpp b/examples/cxx/ADM_get_transfer_priority.cpp index 983c39c18bed25e01368f5744a79f8f91b78c45c..6db261c9ad038cba2fcab7a48165c2dab9606dcc 100644 --- a/examples/cxx/ADM_get_transfer_priority.cpp +++ b/examples/cxx/ADM_get_transfer_priority.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_link_transfer_to_data_operation.cpp b/examples/cxx/ADM_link_transfer_to_data_operation.cpp index 2227afd1e9ef3095e524c8d65252c0cbd9c863dd..a8256cc2e65886c4bd2197fc20a7fd984155b075 100644 --- a/examples/cxx/ADM_link_transfer_to_data_operation.cpp +++ b/examples/cxx/ADM_link_transfer_to_data_operation.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_ping.cpp b/examples/cxx/ADM_ping.cpp index ef796074bc7d6f9ba4aad2dbf59d2394157421de..abc6259613dc656221b243975f2e2f0bb34f1319 100644 --- a/examples/cxx/ADM_ping.cpp +++ b/examples/cxx/ADM_ping.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = false, + .requires_data_stager = false, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 0dfd0f9ba7ccfea5e2de05ec45105d0c67ebbb5b..d2c07d40f67ec12317881d77e015a54ebc012789 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -38,6 +38,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -51,8 +52,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index 55ac84d6570dc2739a12d9ce07846cdbfcc78e26..2da8de734b33a6695a02bf785c93e84e6d33760e 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -38,6 +38,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -46,14 +47,19 @@ main(int argc, char* argv[]) { const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); - const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); - const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS); + const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS); + const auto expected_outputs = + prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { @@ -62,7 +68,8 @@ main(int argc, char* argv[]) { server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); - scord::job::requirements reqs(inputs, outputs, adhoc_storage); + scord::job::requirements reqs(inputs, outputs, expected_outputs, + adhoc_storage); [[maybe_unused]] const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); diff --git a/examples/cxx/ADM_register_pfs_storage.cpp b/examples/cxx/ADM_register_pfs_storage.cpp index 1078262c91a9233a35a53175064961d7fe7f28dd..54ac2f0092139a6561eff10baf874d9b23a20324 100644 --- a/examples/cxx/ADM_register_pfs_storage.cpp +++ b/examples/cxx/ADM_register_pfs_storage.cpp @@ -34,6 +34,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = false, + .requires_data_stager = false, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_remove_adhoc_storage.cpp b/examples/cxx/ADM_remove_adhoc_storage.cpp index aa496ee6fd002006a39fa7ce4626b5a698b85980..05d9d8b0bf2ca54b5ca52460b82badf4deaf531f 100644 --- a/examples/cxx/ADM_remove_adhoc_storage.cpp +++ b/examples/cxx/ADM_remove_adhoc_storage.cpp @@ -38,6 +38,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -51,8 +52,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { diff --git a/examples/cxx/ADM_remove_job.cpp b/examples/cxx/ADM_remove_job.cpp index 4a3ae71cdf7b09ad908520267e879a989ae1a6b6..408a16ef27a91997503d44152a9818281404afd2 100644 --- a/examples/cxx/ADM_remove_job.cpp +++ b/examples/cxx/ADM_remove_job.cpp @@ -26,11 +26,6 @@ #include #include "common.hpp" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -38,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -46,14 +42,19 @@ main(int argc, char* argv[]) { const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); - const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); - const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS); + const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS); + const auto expected_outputs = + prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; @@ -63,7 +64,8 @@ main(int argc, char* argv[]) { server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); - scord::job::requirements reqs(inputs, outputs, adhoc_storage); + scord::job::requirements reqs(inputs, outputs, expected_outputs, + adhoc_storage); [[maybe_unused]] const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); diff --git a/examples/cxx/ADM_remove_pfs_storage.cpp b/examples/cxx/ADM_remove_pfs_storage.cpp index 8e5bd527ea2a0de334092552c4b4b3616a11f082..21bcfddf4d309275daa622ec42a57b02c5cf292f 100644 --- a/examples/cxx/ADM_remove_pfs_storage.cpp +++ b/examples/cxx/ADM_remove_pfs_storage.cpp @@ -34,6 +34,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = false, + .requires_data_stager = false, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_set_dataset_information.cpp b/examples/cxx/ADM_set_dataset_information.cpp index 863e677424d113350f2eaeb9f5b4c7a95ab5a1e6..7ddc6c0d4641e41210060ce3f0a0f40f85faec02 100644 --- a/examples/cxx/ADM_set_dataset_information.cpp +++ b/examples/cxx/ADM_set_dataset_information.cpp @@ -34,6 +34,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_set_io_resources.cpp b/examples/cxx/ADM_set_io_resources.cpp index 425b4642379614b73bd0115e6669b2c64fdec12e..17acf0ae9e2eed45826e1cfb0ae4eea9a85c5a23 100644 --- a/examples/cxx/ADM_set_io_resources.cpp +++ b/examples/cxx/ADM_set_io_resources.cpp @@ -34,6 +34,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_set_qos_constraints.cpp b/examples/cxx/ADM_set_qos_constraints.cpp index d487b57603e81a9adc07f0bd3a248aea628c9c2f..55e6efb04eda1ace73e28f9370067d88fbb936d1 100644 --- a/examples/cxx/ADM_set_qos_constraints.cpp +++ b/examples/cxx/ADM_set_qos_constraints.cpp @@ -33,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_set_transfer_priority.cpp b/examples/cxx/ADM_set_transfer_priority.cpp index 39819b478966842685c233044fc90d3c84d48b09..86416f1c06264db81850f13093d7b34fd20f8cd1 100644 --- a/examples/cxx/ADM_set_transfer_priority.cpp +++ b/examples/cxx/ADM_set_transfer_priority.cpp @@ -34,6 +34,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/ADM_terminate_adhoc_storage.cpp b/examples/cxx/ADM_terminate_adhoc_storage.cpp index e7b26594d6ba785851e54a85f941daecffc16eb9..737e1d4fbf3b766786fe15ccb557c2105c7b2bbf 100644 --- a/examples/cxx/ADM_terminate_adhoc_storage.cpp +++ b/examples/cxx/ADM_terminate_adhoc_storage.cpp @@ -38,6 +38,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -51,8 +52,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 2b4e6f02c6373f7f404c305a449b97bdb328ba57..1028658c880a016ab6676618323d6bafd38b4269 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -41,6 +41,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -49,8 +50,10 @@ main(int argc, char* argv[]) { const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); - const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); - const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS); + const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS); + const auto expected_outputs = + prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); @@ -60,8 +63,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { @@ -69,7 +75,8 @@ main(int argc, char* argv[]) { server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); - scord::job::requirements reqs(inputs, outputs, adhoc_storage); + scord::job::requirements reqs(inputs, outputs, expected_outputs, + adhoc_storage); const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); diff --git a/examples/cxx/ADM_update_adhoc_storage.cpp b/examples/cxx/ADM_update_adhoc_storage.cpp index dbb2ac93c1c3fa9be59fed26ae1c89e38db0c3ce..a7b88deb0b3d1ee0c43b34c43a3e2629f6198d55 100644 --- a/examples/cxx/ADM_update_adhoc_storage.cpp +++ b/examples/cxx/ADM_update_adhoc_storage.cpp @@ -37,6 +37,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -51,8 +52,11 @@ main(int argc, char* argv[]) { std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, + cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, - scord::adhoc_storage::access_type::read_write, 100, false}; + scord::adhoc_storage::access_type::read_write, + 100, + false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; const auto new_adhoc_resources = diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index 1ee22d589cfbcf9573c5d315aa44344478f00e8e..72533de8ba11277d0fb26778789ff30dff0d0e36 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -26,11 +26,6 @@ #include #include "common.hpp" -#define NJOB_NODES 50 -#define NADHOC_NODES 25 -#define NINPUTS 10 -#define NOUTPUTS 5 - int main(int argc, char* argv[]) { @@ -38,6 +33,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = true, + .requires_data_stager = true, }; const auto cli_args = process_args(argc, argv, test_info); @@ -47,18 +43,21 @@ main(int argc, char* argv[]) { const auto job_nodes = prepare_nodes(NJOB_NODES); const auto new_job_nodes = prepare_nodes(NJOB_NODES * 2); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); - const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); - const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + const auto inputs = prepare_routes("{}-input-dataset-{}", NINPUTS); + const auto outputs = prepare_routes("{}-output-dataset-{}", NOUTPUTS); + const auto expected_outputs = + prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); const auto gkfs_storage = scord::register_adhoc_storage( server, "foobar", scord::adhoc_storage::type::gekkofs, scord::adhoc_storage::ctx{ - cli_args.controller_address, + cli_args.controller_address, cli_args.data_stager_address, scord::adhoc_storage::execution_mode::separate_new, scord::adhoc_storage::access_type::read_write, 100, false}, scord::adhoc_storage::resources{adhoc_nodes}); - scord::job::requirements reqs{inputs, outputs, gkfs_storage}; + scord::job::requirements reqs{inputs, outputs, expected_outputs, + gkfs_storage}; const auto new_inputs = prepare_datasets("input-new-dataset-{}", NINPUTS); const auto new_outputs = diff --git a/examples/cxx/ADM_update_pfs_storage.cpp b/examples/cxx/ADM_update_pfs_storage.cpp index 019fe1104fa0ffb466822cacb1db9dcca6a8168b..699d0ea9a238d584b1578a4f97cfe6ebb2ebc67c 100644 --- a/examples/cxx/ADM_update_pfs_storage.cpp +++ b/examples/cxx/ADM_update_pfs_storage.cpp @@ -34,6 +34,7 @@ main(int argc, char* argv[]) { .name = TESTNAME, .requires_server = true, .requires_controller = false, + .requires_data_stager = false, }; const auto cli_args = process_args(argc, argv, test_info); diff --git a/examples/cxx/common.cpp b/examples/cxx/common.cpp index cf83a8a90369d9b5145576297ac78f0870c8bf36..bc290c1f5861e1725308548ba3af273304a3af86 100644 --- a/examples/cxx/common.cpp +++ b/examples/cxx/common.cpp @@ -15,17 +15,24 @@ process_args(int argc, char* argv[], const test_info& test_info) { ++required_args; } + if(test_info.requires_data_stager) { + ++required_args; + } + if(argc != required_args) { fmt::print(stderr, "ERROR: missing arguments\n"); - fmt::print(stderr, "Usage: {}{}{}\n", test_info.name, + fmt::print(stderr, "Usage: {}{}{}{}\n", test_info.name, test_info.requires_server ? " " : "", - test_info.requires_controller ? " " - : ""); + test_info.requires_controller ? " " : "", + test_info.requires_data_stager ? " " + : ""); exit(EXIT_FAILURE); } return cli_args{test_info.requires_server ? std::string{argv[1]} : ""s, - test_info.requires_controller ? std::string{argv[2]} : ""s}; + test_info.requires_controller ? std::string{argv[2]} : ""s, + test_info.requires_data_stager ? std::string{argv[3]} + : ""s}; } std::vector @@ -50,6 +57,19 @@ prepare_datasets(const std::string& pattern, size_t n) { return datasets; } +std::vector +prepare_routes(const std::string& pattern, size_t n) { + std::vector routes; + routes.reserve(n); + for(size_t i = 0; i < n; ++i) { + routes.emplace_back( + scord::dataset{fmt::format(fmt::runtime(pattern), "src", i)}, + scord::dataset{fmt::format(fmt::runtime(pattern), "dst", i)}); + } + + return routes; +} + std::vector prepare_qos_limits(size_t n) { diff --git a/examples/cxx/common.hpp b/examples/cxx/common.hpp index 374ad2644fff45251062ba13f54644f12fff9754..4cc0f836f058a7a4d96549727ee257ceedf5a316 100644 --- a/examples/cxx/common.hpp +++ b/examples/cxx/common.hpp @@ -4,6 +4,12 @@ #include #include +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NEXPOUTPUTS 1 + #define TESTNAME \ (__builtin_strrchr(__FILE__, '/') ? __builtin_strrchr(__FILE__, '/') + 1 \ : __FILE__) @@ -12,11 +18,13 @@ struct test_info { std::string name; bool requires_server; bool requires_controller; + bool requires_data_stager; }; struct cli_args { std::string server_address; std::string controller_address; + std::string data_stager_address; }; cli_args @@ -28,6 +36,9 @@ prepare_nodes(size_t n); std::vector prepare_datasets(const std::string& pattern, size_t n); +std::vector +prepare_routes(const std::string& pattern, size_t n); + std::vector prepare_qos_limits(size_t n); diff --git a/plugins/slurm/defaults.h.in b/plugins/slurm/defaults.h.in index 0568e0990df4a4d11348b24e1f215257aa411cf9..9de23bc60bdbf9b9bacde3be859b1928c1cad93d 100644 --- a/plugins/slurm/defaults.h.in +++ b/plugins/slurm/defaults.h.in @@ -37,6 +37,9 @@ #define SCORDCTL_PROTO_DEFAULT SCORD_PROTO_DEFAULT #define SCORDCTL_PORT_DEFAULT @SCORD_CTL_BIND_PORT@ #define SCORDCTL_TMPDIR_DEFAULT "/tmp" +#define CARGO_PROG_DEFAULT "@CARGO_PROGRAM@" +#define CARGO_PROTO_DEFAULT SCORD_PROTO_DEFAULT +#define CARGO_PORT_DEFAULT 62000 // clang-format on diff --git a/plugins/slurm/slurmadmcli.c b/plugins/slurm/slurmadmcli.c index 204c084005da5898d05bc578015152f308d6ea49..61d10078627ca702b97ff695265cdfc11a9dbe80 100644 --- a/plugins/slurm/slurmadmcli.c +++ b/plugins/slurm/slurmadmcli.c @@ -32,6 +32,7 @@ #include #include +#include #include "defaults.h" #include "utils.h" @@ -48,16 +49,16 @@ #define ADHOCID_LEN 64 #define INT32_STR_LEN 16 /* 16 chars are enough to fit an int32 in decimal */ -#define TAG_NNODES 0 -#define TAG_ADHOC_TYPE 1 -#define TAG_ADHOC_OVERLAP 2 -#define TAG_ADHOC_EXCLUSIVE 3 -#define TAG_ADHOC_DEDICATED 4 -#define TAG_ADHOC_REMOTE 5 -#define TAG_DATASET_INPUT 6 -#define TAG_DATASET_OUTPUT 7 -#define TAG_DATASET_EXPECT_OUTPUT 8 -#define TAG_DATASET_INOUT 9 +#define TAG_NNODES 0 +#define TAG_ADHOC_TYPE 1 +#define TAG_ADHOC_OVERLAP 2 +#define TAG_ADHOC_EXCLUSIVE 3 +#define TAG_ADHOC_DEDICATED 4 +#define TAG_ADHOC_REMOTE 5 +#define TAG_DATASET_INPUT 6 +#define TAG_DATASET_OUTPUT 7 +#define TAG_DATASET_EXPECTED_OUTPUT 8 +#define TAG_DATASET_EXPECTED_INOUT_DATASET 9 // clang-format off SPANK_PLUGIN (admire-cli, 1) @@ -71,6 +72,14 @@ static long adhoc_walltime = 0; static ADM_adhoc_mode_t adhoc_mode = ADM_ADHOC_MODE_IN_JOB_SHARED; static ADM_adhoc_storage_type_t adhoc_type = 0; static char adhoc_id[ADHOCID_LEN] = {0}; +ADM_dataset_route_t* input_datasets = NULL; +size_t input_datasets_count = 0; +ADM_dataset_route_t* output_datasets = NULL; +size_t output_datasets_count = 0; +ADM_dataset_route_t* expected_output_datasets = NULL; +size_t expected_output_datasets_count = 0; +ADM_dataset_route_t* expected_inout_datasets = NULL; +size_t expected_inout_datasets_count = 0; /* server-related options */ typedef struct { @@ -84,6 +93,7 @@ typedef struct { typedef struct { scord_server_info_t scord_info; scord_server_info_t scordctl_info; + scord_server_info_t cargo_info; } scord_plugin_config_t; @@ -97,7 +107,12 @@ static scord_plugin_config_t default_cfg = { .proto = SCORDCTL_PROTO_DEFAULT, .port = SCORDCTL_PORT_DEFAULT, .prog = SCORDCTL_PROG_DEFAULT, - .tmpdir = SCORDCTL_TMPDIR_DEFAULT}}; + .tmpdir = SCORDCTL_TMPDIR_DEFAULT}, + .cargo_info = {.addr = NULL, + .proto = CARGO_PROTO_DEFAULT, + .port = CARGO_PORT_DEFAULT, + .prog = CARGO_PROG_DEFAULT, + .tmpdir = NULL}}; static int process_opts(int tag, const char* optarg, int remote); @@ -169,9 +184,9 @@ struct spank_option spank_opts[] = { (spank_opt_cb_f) process_opts /* callback */ }, { - "adm-input", "dataset-routing", + "adm-input-datasets", "dataset-route[,dataset-route...]", "Define datasets that should be transferred between the PFS " - "and the ad-hoc storage service. The `dataset-routing` is " + "and the ad-hoc storage service. The `dataset-route` is " "defined as `ORIGIN-TIER:PATH TARGET-TIER:PATH`. For example," "to transfer the file `input000.dat` from the Lustre PFS to " "the an on-demand GekkoFS ad-hoc storage service, the option " @@ -182,7 +197,7 @@ struct spank_option spank_opts[] = { (spank_opt_cb_f) process_opts /* callback */ }, { - "adm-output", "dataset-routing", + "adm-output-datasets", "dataset-route[,dataset-route...]", "Define datasets that should be automatically transferred " "between the ad-hoc storage system and the PFS. The ad-hoc " "storage will guarantee that the dataset is not transferred " @@ -194,22 +209,24 @@ struct spank_option spank_opts[] = { (spank_opt_cb_f) process_opts /* callback */ }, { - "adm-expect-output", "dataset-routing", + "adm-expected-output-datasets", + "dataset-route[,dataset-route...]", "Define datasets that are expected to be generated by the " "application. When using this option, the application itself " "MUST use the programmatic APIs defined in `scord-user.h`to " "explicitly request the transfer of the datasets.", 1, /* option takes an argument */ - TAG_DATASET_EXPECT_OUTPUT, /* option tag */ + TAG_DATASET_EXPECTED_OUTPUT, /* option tag */ (spank_opt_cb_f) process_opts /* callback */ }, { - "adm-expect-inout", "dataset-routing", + "adm-expected-inout-datasets", + "dataset-route[,dataset-route...]", "Define the datasets that should be transferred INTO " "the ad-hoc storage AND BACK when finished.", - 1, /* option takes an argument */ - TAG_DATASET_INOUT, /* option tag */ - (spank_opt_cb_f) process_opts /* callback */ + 1, /* option takes an argument */ + TAG_DATASET_EXPECTED_INOUT_DATASET, /* option tag */ + (spank_opt_cb_f) process_opts /* callback */ }, SPANK_OPTIONS_TABLE_END}; @@ -217,7 +234,8 @@ int process_opts(int tag, const char* optarg, int remote) { (void) remote; - slurm_debug("%s: %s() called", plugin_name, __func__); + slurm_debug("%s: %s(tag: %d, optarg: %s, remote: %d) called", plugin_name, + __func__, tag, optarg, remote); /* srun & sbatch/salloc */ spank_context_t sctx = spank_context(); @@ -278,6 +296,62 @@ process_opts(int tag, const char* optarg, int remote) { adhoc_id[ADHOCID_LEN - 1] = '\0'; return 0; + case TAG_DATASET_INPUT: + if(input_datasets) { + free(input_datasets); + } + + if(scord_utils_parse_dataset_routes(optarg, &input_datasets, + &input_datasets_count) != + ADM_SUCCESS) { + slurm_error("%s: %s: failed to parse dataset route: %s", + plugin_name, __func__, optarg); + return -1; + } + return 0; + + case TAG_DATASET_OUTPUT: + if(output_datasets) { + free(output_datasets); + } + + if(scord_utils_parse_dataset_routes(optarg, &output_datasets, + &output_datasets_count) != + ADM_SUCCESS) { + slurm_error("%s: %s: failed to parse dataset route: %s", + plugin_name, __func__, optarg); + return -1; + } + return 0; + + case TAG_DATASET_EXPECTED_OUTPUT: + if(expected_output_datasets) { + free(expected_output_datasets); + } + + if(scord_utils_parse_dataset_routes( + optarg, &expected_output_datasets, + &expected_output_datasets_count) != ADM_SUCCESS) { + slurm_error("%s: %s: failed to parse dataset route: %s", + plugin_name, __func__, optarg); + return -1; + } + return 0; + + case TAG_DATASET_EXPECTED_INOUT_DATASET: + if(expected_inout_datasets) { + free(expected_inout_datasets); + } + + if(scord_utils_parse_dataset_routes( + optarg, &expected_inout_datasets, + &expected_inout_datasets_count) != ADM_SUCCESS) { + slurm_error("%s: %s: failed to parse dataset route: %s", + plugin_name, __func__, optarg); + return -1; + } + return 0; + default: return -1; } @@ -305,6 +379,8 @@ process_config(int ac, char** av, scord_plugin_config_t* cfg) { &cfg->scordctl_info.port), EXPAND_SCORD_OPT("scordctl_tmpdir", TYPE_STR, &cfg->scordctl_info.tmpdir), + EXPAND_SCORD_OPT("cargo_prog", TYPE_STR, &cfg->cargo_info.prog), + EXPAND_SCORD_OPT("cargo_port", TYPE_INT, &cfg->cargo_info.port), }; #undef EXPAND_SCORD_OPT @@ -389,6 +465,12 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, return -1; } + /* The Cargo master will also typically reside on the first node of the + * allocation */ + cfg.cargo_info.addr = margo_address_create(cfg.cargo_info.proto, + ADM_node_get_hostname(ctl_node), + cfg.cargo_info.port); + slurm_debug("%s: %s: scord_info:", plugin_name, __func__); slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, cfg.scord_info.addr); @@ -405,6 +487,14 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, slurm_debug("%s: %s: port: %d,", plugin_name, __func__, cfg.scordctl_info.port); + slurm_debug("%s: %s: cargo_info:", plugin_name, __func__); + slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, + cfg.cargo_info.addr); + slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, + cfg.cargo_info.proto); + slurm_debug("%s: %s: port: %d,", plugin_name, __func__, + cfg.cargo_info.port); + /* Register the job with the scord server */ scord_server = ADM_server_create(cfg.scord_info.proto, cfg.scord_info.addr); if(!scord_server) { @@ -443,9 +533,9 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, goto end; } - adhoc_ctx = ADM_adhoc_context_create(cfg.scordctl_info.addr, adhoc_mode, - ADM_ADHOC_ACCESS_RDWR, adhoc_walltime, - false); + adhoc_ctx = ADM_adhoc_context_create( + cfg.scordctl_info.addr, cfg.cargo_info.addr, adhoc_mode, + ADM_ADHOC_ACCESS_RDWR, adhoc_walltime, false); if(!adhoc_ctx) { slurm_error("%s: adhoc_context creation failed", plugin_name); rc = -1; @@ -460,8 +550,14 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, goto end; } + slurm_debug("Creating job requirements: %zu inputs, %zu outputs", + input_datasets_count, output_datasets_count); + /* no inputs or outputs */ - scord_reqs = ADM_job_requirements_create(NULL, 0, NULL, 0, adhoc_storage); + scord_reqs = ADM_job_requirements_create( + input_datasets, input_datasets_count, output_datasets, + output_datasets_count, expected_output_datasets, + expected_output_datasets_count, adhoc_storage); if(!scord_reqs) { slurm_error("%s: scord job_requirements creation", plugin_name); rc = -1; diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index 086288087cbfd7ce92d5e00ccc71a52450977e9d..cc6b18107b3bb41216801d760b682809813693d0 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -85,12 +85,13 @@ add_library(libscord SHARED) target_sources( libscord - PUBLIC scord/scord.h scord/scord.hpp - PRIVATE libscord.cpp c_wrapper.cpp detail/impl.hpp detail/impl.cpp env.hpp + PUBLIC scord/scord.h scord/scord.hpp scord/types.hpp + PRIVATE libscord.cpp c_wrapper.cpp utils.cpp detail/impl.hpp detail/impl.cpp + env.hpp ) set(public_headers, "") -list(APPEND public_headers "scord/scord.h" "scord/scord.hpp") +list(APPEND public_headers "scord/scord.h" "scord/scord.hpp" "scord/types.hpp") set_target_properties(libscord PROPERTIES PUBLIC_HEADER "${public_headers}") diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 3ace0a2d16734bf6fa5aa5afe707f9ae894454a5..17c8c2c5e5b20e1d33e6387c68a316687b9b1708 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -234,7 +234,7 @@ register_adhoc_storage(const server& srv, const std::string& name, LOGGER_INFO("rpc {:<} body: {{name: {}, type: {}, adhoc_ctx: {}, " "adhoc_resources: {}}}", - rpc, name, type, ctx, resources); + rpc, std::quoted(name), type, ctx, resources); if(const auto& call_rv = endp.call(rpc.name(), name, type, ctx, resources); @@ -336,7 +336,7 @@ register_pfs_storage(const server& srv, const std::string& name, const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc {:<} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, - name, type, ctx); + std::quoted(name), type, ctx); if(const auto& call_rv = endp.call(rpc.name(), name, type, ctx); call_rv.has_value()) { diff --git a/src/lib/scord/types.h b/src/lib/scord/types.h index 633f849bb4fb2d33a68286782163474649c42621..e518cb8f3962f8cdc4053af98bbe83224973b3f8 100644 --- a/src/lib/scord/types.h +++ b/src/lib/scord/types.h @@ -108,12 +108,18 @@ typedef struct adm_job_requirements* ADM_job_requirements_t; /** A dataset */ typedef struct adm_dataset* ADM_dataset_t; +/** Routing information for a dataset */ +typedef struct adm_dataset_route* ADM_dataset_route_t; + /** Information about a dataset */ typedef struct adm_dataset_info* ADM_dataset_info_t; /** A list of datasets */ typedef struct adm_dataset_list* ADM_dataset_list_t; +/** A list of dataset routes */ +typedef struct adm_dataset_route_list* ADM_dataset_route_list_t; + /** A list of QoS limits */ typedef struct adm_qos_limit_list* ADM_qos_limit_list_t; @@ -360,12 +366,16 @@ ADM_job_resources_destroy(ADM_job_resources_t res); * @remark JOB_REQUIREMENTS created by this function need to be freed by calling * ADM_job_requirements_destroy(). * - * @param[in] inputs An array of DATASET_DESCRIPTORS describing the input + * @param[in] inputs An array of DATASET_ROUTES describing the input * information required by the job. - * @param[in] inputs_len The number of DATASET_DESCRIPTORS stored in inputs. - * @param[in] outputs An array of DATASET_DESCRIPTORS describing the output + * @param[in] inputs_len The number of DATASET_ROUTES stored in inputs. + * @param[in] outputs An array of DATASET_ROUTES describing the output * information generated by the job. - * @param[in] outputs_len The number of DATASET_DESCRIPTORS stored in outputs. + * @param[in] outputs_len The number of DATASET_ROUTES stored in outputs. + * @param[in] expected_outputs An array of DATASET_ROUTES describing the + * expected output information generated by the job. + * @param[in] expected_outputs_len The number of DATASET_ROUTES stored in + * expected_outputs. * @param[in] adhoc_storage An optional ADHOC_DESCRIPTOR describing the adhoc * storage system required by the job (can be set to NULL if no adhoc storage * system is required). @@ -373,8 +383,10 @@ ADM_job_resources_destroy(ADM_job_resources_t res); * failure. */ ADM_job_requirements_t -ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, - ADM_dataset_t outputs[], size_t outputs_len, +ADM_job_requirements_create(ADM_dataset_route_t inputs[], size_t inputs_len, + ADM_dataset_route_t outputs[], size_t outputs_len, + ADM_dataset_route_t expected_outputs[], + size_t expected_outputs_len, ADM_adhoc_storage_t adhoc_storage); /** @@ -425,6 +437,28 @@ ADM_dataset_create(const char* id); ADM_return_t ADM_dataset_destroy(ADM_dataset_t dataset); +/** + * Create a dataset route from a source and destination dataset. + * + * @remark Dataset routes need to be freed by calling + * ADM_dataset_route_destroy(). + * + * @param source The source dataset + * @param destination The destination dataset + * @return A valid ADM_dataset_route_t if successful or NULL in case of failure. + */ +ADM_dataset_route_t +ADM_dataset_route_create(ADM_dataset_t source, ADM_dataset_t destination); + +/** + * Destroy a dataset route created by ADM_dataset_route_create(). + * + * @param route A valid ADM_dataset_route_t + * @return ADM_SUCCESS or corresponding ADM error code + */ +ADM_return_t +ADM_dataset_route_destroy(ADM_dataset_route_t route); + /** * Create a dataset from a user-provided id (e.g. a path for POSIX-like file * systems or key for key-value stores). @@ -536,6 +570,8 @@ ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res); * * @param[in] ctl_address The address of the control node for the * adhoc storage system + * @param[in] stager_address The address of the data stager for the + * adhoc storage system * @param[in] exec_mode The adhoc storage system execution mode * @param[in] access_type The adhoc storage system execution type * @param[in] walltime The adhoc storage system walltime @@ -544,7 +580,8 @@ ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res); * @return A valid ADM_ADHOC_CONTEXT if successful. NULL otherwise. */ ADM_adhoc_context_t -ADM_adhoc_context_create(const char* ctl_address, ADM_adhoc_mode_t exec_mode, +ADM_adhoc_context_create(const char* ctl_address, const char* stager_address, + ADM_adhoc_mode_t exec_mode, ADM_adhoc_access_t access_type, uint32_t walltime, bool should_flush); diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 4f34f36aadb4f2ea11aa0ab6ec015e7a83b6b995..f7445b30f786581aae08e22571c5c6cfeb92ef49 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -190,6 +190,7 @@ private: }; struct dataset; +struct dataset_route; struct adhoc_storage { @@ -236,14 +237,19 @@ struct adhoc_storage { ctx() = default; - ctx(std::string controller_address, execution_mode exec_mode, - access_type access_type, std::uint32_t walltime, bool should_flush); + ctx(std::string controller_address, std::string data_stager_address, + execution_mode exec_mode, access_type access_type, + std::uint32_t walltime, bool should_flush); explicit ctx(ADM_adhoc_context_t ctx); explicit operator ADM_adhoc_context_t() const; std::string const& controller_address() const; + + std::string const& + data_stager_address() const; + execution_mode exec_mode() const; enum access_type @@ -257,6 +263,7 @@ struct adhoc_storage { void serialize(Archive&& ar) { ar & m_controller_address; + ar & m_data_stager_address; ar & m_exec_mode; ar & m_access_type; ar & m_walltime; @@ -265,6 +272,7 @@ struct adhoc_storage { private: std::string m_controller_address; + std::string m_data_stager_address; execution_mode m_exec_mode; enum access_type m_access_type; std::uint32_t m_walltime; @@ -409,17 +417,21 @@ struct job { struct requirements { requirements(); - requirements(std::vector inputs, - std::vector outputs); - requirements(std::vector inputs, - std::vector outputs, + requirements(std::vector inputs, + std::vector outputs, + std::vector expected_outputs); + requirements(std::vector inputs, + std::vector outputs, + std::vector expected_outputs, scord::adhoc_storage adhoc_storage); explicit requirements(ADM_job_requirements_t reqs); - std::vector + std::vector const& inputs() const; - std::vector + std::vector const& outputs() const; + std::vector const& + expected_outputs() const; std::optional adhoc_storage() const; @@ -430,12 +442,14 @@ struct job { serialize(Archive& ar) { ar & m_inputs; ar & m_outputs; + ar & m_expected_outputs; ar & m_adhoc_storage; } private: - std::vector m_inputs; - std::vector m_outputs; + std::vector m_inputs; + std::vector m_outputs; + std::vector m_expected_outputs; std::optional m_adhoc_storage; }; @@ -656,6 +670,33 @@ private: std::unique_ptr m_pimpl; }; +struct dataset_route { + dataset_route(); + explicit dataset_route(scord::dataset src, scord::dataset dst); + explicit dataset_route(ADM_dataset_route_t route); + dataset_route(const dataset_route&) noexcept; + dataset_route(dataset_route&&) noexcept; + dataset_route& + operator=(const dataset_route&) noexcept; + dataset_route& + operator=(dataset_route&&) noexcept; + ~dataset_route(); + + scord::dataset const& + source() const; + + scord::dataset const& + destination() const; + + template + void + serialize(Archive& ar); + +private: + class impl; + std::unique_ptr m_pimpl; +}; + } // namespace scord @@ -719,6 +760,31 @@ struct fmt::formatter> } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const scord::dataset_route& r, FormatContext& ctx) const { + const auto str = fmt::format("{{src: {}, dst: {}}}", r.source(), + r.destination()); + return formatter::format(str, ctx); + } +}; + +template <> +struct fmt::formatter> + : fmt::formatter { + // parse is inherited from formatter. + template + auto + format(const std::vector& v, + FormatContext& ctx) const { + const auto str = fmt::format("[{}]", fmt::join(v, ", ")); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter : fmt::formatter { // parse is inherited from formatter. @@ -923,11 +989,13 @@ struct fmt::formatter : formatter { template auto format(const scord::adhoc_storage::ctx& c, FormatContext& ctx) const { - return format_to(ctx.out(), - "{{controller: {}, execution_mode: {}, " - "access_type: {}, walltime: {}, should_flush: {}}}", - std::quoted(c.controller_address()), c.exec_mode(), - c.access_type(), c.walltime(), c.should_flush()); + return format_to( + ctx.out(), + "{{controller: {}, data_stager: {}, execution_mode: {}, " + "access_type: {}, walltime: {}, should_flush: {}}}", + std::quoted(c.controller_address()), + std::quoted(c.data_stager_address()), c.exec_mode(), + c.access_type(), c.walltime(), c.should_flush()); } }; @@ -1045,8 +1113,10 @@ struct fmt::formatter : formatter { auto format(const scord::job::requirements& r, FormatContext& ctx) const { return formatter::format( - fmt::format("{{inputs: {}, outputs: {}, adhoc_storage: {}}}", - r.inputs(), r.outputs(), r.adhoc_storage()), + fmt::format("{{inputs: {}, outputs: {}, " + "expected_outputs: {}, adhoc_storage: {}}}", + r.inputs(), r.outputs(), r.expected_outputs(), + r.adhoc_storage()), ctx); } }; diff --git a/src/lib/scord/utils.h b/src/lib/scord/utils.h new file mode 100644 index 0000000000000000000000000000000000000000..bcdb134c873093699fc66f6043ed82dff7740045 --- /dev/null +++ b/src/lib/scord/utils.h @@ -0,0 +1,42 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of the scord API. + * + * The scord API is free software: you can redistribute it and/or modify + * it under the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The scord API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with the scord API. If not, see . + * + * SPDX-License-Identifier: LGPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_UTILS_H +#define SCORD_UTILS_H + +#include "types.h" + +#ifdef __cplusplus +extern "C" { +#endif + +ADM_return_t +scord_utils_parse_dataset_routes(const char* routes, + ADM_dataset_route_t** parsed_routes, + size_t* parsed_routes_count); +#ifdef __cplusplus +} // extern "C" +#endif + +#endif // SCORD_UTILS_H diff --git a/src/lib/types.c b/src/lib/types.c index ab439d33911b70bbb97f6b4d0fa82ecaf83d4882..eef6ac3832fe4501f9a794314acff17971f83ae4 100644 --- a/src/lib/types.c +++ b/src/lib/types.c @@ -264,6 +264,79 @@ ADM_dataset_destroy(ADM_dataset_t dataset) { return ret; } +ADM_dataset_route_t +ADM_dataset_route_create(ADM_dataset_t source, ADM_dataset_t destination) { + + struct adm_dataset_route* adm_dataset_route = + (struct adm_dataset_route*) malloc( + sizeof(struct adm_dataset_route)); + + if(!adm_dataset_route) { + LOGGER_ERROR("Could not allocate ADM_dataset_route_t"); + return NULL; + } + + adm_dataset_route->d_src = ADM_dataset_create(source->d_id); + + if(!adm_dataset_route->d_src) { + LOGGER_ERROR("Could not allocate ADM_dataset_t"); + return NULL; + } + + adm_dataset_route->d_dst = ADM_dataset_create(destination->d_id); + + if(!adm_dataset_route->d_dst) { + LOGGER_ERROR("Could not allocate ADM_dataset_t"); + return NULL; + } + + return adm_dataset_route; +} + +ADM_dataset_route_t +ADM_dataset_route_copy(ADM_dataset_route_t dst, const ADM_dataset_route_t src) { + + if(!src || !dst) { + return NULL; + } + + // copy all primitive types + *dst = *src; + + // duplicate copy any pointer types + if(src->d_src) { + dst->d_src = ADM_dataset_create(src->d_src->d_id); + } + + if(src->d_dst) { + dst->d_dst = ADM_dataset_create(src->d_dst->d_id); + } + + return dst; +} + +ADM_return_t +ADM_dataset_route_destroy(ADM_dataset_route_t route) { + + ADM_return_t ret = ADM_SUCCESS; + + if(!route) { + LOGGER_ERROR("Invalid ADM_dataset_route_t"); + return ADM_EBADARGS; + } + + if(route->d_src) { + ADM_dataset_destroy(route->d_src); + } + + if(route->d_dst) { + ADM_dataset_destroy(route->d_dst); + } + + free(route); + return ret; +} + ADM_qos_entity_t ADM_qos_entity_create(ADM_qos_scope_t scope, void* data) { @@ -448,6 +521,70 @@ ADM_dataset_list_destroy(ADM_dataset_list_t list) { return ret; } +ADM_dataset_route_list_t +ADM_dataset_route_list_create(ADM_dataset_route_t routes[], size_t length) { + + ADM_dataset_route_list_t p = (ADM_dataset_route_list_t) malloc(sizeof(*p)); + + if(!p) { + LOGGER_ERROR("Could not allocate ADM_dataset_route_list_t"); + return NULL; + } + + const char* error_msg = NULL; + + p->l_length = length; + p->l_routes = (struct adm_dataset_route*) calloc( + length, sizeof(struct adm_dataset_route)); + + if(!p->l_routes) { + error_msg = "Could not allocate ADM_dataset_route_list_t"; + goto cleanup_on_error; + } + + for(size_t i = 0; i < length; ++i) { + if(!ADM_dataset_route_copy(&p->l_routes[i], routes[i])) { + error_msg = "Could not allocate ADM_dataset_route_list_t"; + goto cleanup_on_error; + }; + } + + return p; + +cleanup_on_error: + if(p->l_routes) { + free(p->l_routes); + } + free(p); + + LOGGER_ERROR(error_msg); + + return NULL; +} + +ADM_return_t +ADM_dataset_route_list_destroy(ADM_dataset_route_list_t list) { + ADM_return_t ret = ADM_SUCCESS; + + if(!list) { + LOGGER_ERROR("Invalid ADM_dataset_route_list_t"); + return ADM_EBADARGS; + } + + // We cannot call ADM_dataset_route_destroy here because adm_dataset_routes + // are stored as a consecutive array in memory. Thus, we free + // the dataset route ids themselves and then the array. + if(list->l_routes) { + for(size_t i = 0; i < list->l_length; ++i) { + ADM_dataset_route_destroy(&list->l_routes[i]); + } + free(list->l_routes); + } + + free(list); + return ret; +} + ADM_adhoc_storage_t ADM_adhoc_storage_create(const char* name, ADM_adhoc_storage_type_t type, uint64_t id, ADM_adhoc_context_t adhoc_ctx, @@ -676,7 +813,8 @@ ADM_data_operation_destroy(ADM_data_operation_t op) { } ADM_adhoc_context_t -ADM_adhoc_context_create(const char* ctl_address, ADM_adhoc_mode_t exec_mode, +ADM_adhoc_context_create(const char* ctl_address, const char* stager_address, + ADM_adhoc_mode_t exec_mode, ADM_adhoc_access_t access_type, uint32_t walltime, bool should_flush) { @@ -685,6 +823,11 @@ ADM_adhoc_context_create(const char* ctl_address, ADM_adhoc_mode_t exec_mode, return NULL; } + if(!stager_address) { + LOGGER_ERROR("The address to the stager cannot be NULL"); + return NULL; + } + struct adm_adhoc_context* adm_adhoc_context = (struct adm_adhoc_context*) malloc(sizeof(*adm_adhoc_context)); @@ -699,6 +842,11 @@ ADM_adhoc_context_create(const char* ctl_address, ADM_adhoc_mode_t exec_mode, (const char*) calloc(n + 1, sizeof(char)); strcpy((char*) adm_adhoc_context->c_ctl_address, ctl_address); + n = strlen(stager_address); + adm_adhoc_context->c_stager_address = + (const char*) calloc(n + 1, sizeof(char)); + strcpy((char*) adm_adhoc_context->c_stager_address, stager_address); + adm_adhoc_context->c_mode = exec_mode; adm_adhoc_context->c_access = access_type; adm_adhoc_context->c_walltime = walltime; @@ -810,8 +958,10 @@ ADM_job_resources_destroy(ADM_job_resources_t res) { ADM_job_requirements_t -ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, - ADM_dataset_t outputs[], size_t outputs_len, +ADM_job_requirements_create(ADM_dataset_route_t inputs[], size_t inputs_len, + ADM_dataset_route_t outputs[], size_t outputs_len, + ADM_dataset_route_t expected_outputs[], + size_t expected_outputs_len, ADM_adhoc_storage_t adhoc_storage) { struct adm_job_requirements* adm_job_reqs = @@ -823,26 +973,36 @@ ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, return NULL; } - ADM_dataset_list_t inputs_list = NULL; - ADM_dataset_list_t outputs_list = NULL; + ADM_dataset_route_list_t inputs_list = NULL; + ADM_dataset_route_list_t outputs_list = NULL; + ADM_dataset_route_list_t expected_outputs_list = NULL; const char* error_msg = NULL; - inputs_list = ADM_dataset_list_create(inputs, inputs_len); + inputs_list = ADM_dataset_route_list_create(inputs, inputs_len); if(!inputs_list) { error_msg = "Could not allocate ADM_job_requirements_t"; goto cleanup_on_error; } - outputs_list = ADM_dataset_list_create(outputs, outputs_len); + outputs_list = ADM_dataset_route_list_create(outputs, outputs_len); if(!outputs_list) { error_msg = "Could not allocate ADM_job_requirements_t"; goto cleanup_on_error; } + expected_outputs_list = ADM_dataset_route_list_create(expected_outputs, + expected_outputs_len); + + if(!expected_outputs_list) { + error_msg = "Could not allocate ADM_job_requirements_t"; + goto cleanup_on_error; + } + adm_job_reqs->r_inputs = inputs_list; adm_job_reqs->r_outputs = outputs_list; + adm_job_reqs->r_expected_outputs = expected_outputs_list; if(!adhoc_storage) { return adm_job_reqs; @@ -878,11 +1038,15 @@ ADM_job_requirements_destroy(ADM_job_requirements_t reqs) { } if(reqs->r_inputs) { - ADM_dataset_list_destroy(reqs->r_inputs); + ADM_dataset_route_list_destroy(reqs->r_inputs); } if(reqs->r_outputs) { - ADM_dataset_list_destroy(reqs->r_outputs); + ADM_dataset_route_list_destroy(reqs->r_outputs); + } + + if(reqs->r_expected_outputs) { + ADM_dataset_route_list_destroy(reqs->r_expected_outputs); } if(reqs->r_adhoc_storage) { diff --git a/src/lib/types.cpp b/src/lib/types.cpp index cb6782e8a7ed5ee8baa523737e57ea6facb92250..e51c2a08ee7e9dc6812fc96e54213718f106174a 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -232,14 +232,20 @@ private: job::requirements::requirements() = default; -job::requirements::requirements(std::vector inputs, - std::vector outputs) - : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)) {} +job::requirements::requirements( + std::vector inputs, + std::vector outputs, + std::vector expected_outputs) + : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), + m_expected_outputs(std::move(expected_outputs)) {} -job::requirements::requirements(std::vector inputs, - std::vector outputs, - scord::adhoc_storage adhoc_storage) +job::requirements::requirements( + std::vector inputs, + std::vector outputs, + std::vector expected_outputs, + scord::adhoc_storage adhoc_storage) : m_inputs(std::move(inputs)), m_outputs(std::move(outputs)), + m_expected_outputs(std::move(expected_outputs)), m_adhoc_storage(std::move(adhoc_storage)) {} job::requirements::requirements(ADM_job_requirements_t reqs) { @@ -247,13 +253,23 @@ job::requirements::requirements(ADM_job_requirements_t reqs) { m_inputs.reserve(reqs->r_inputs->l_length); for(size_t i = 0; i < reqs->r_inputs->l_length; ++i) { - m_inputs.emplace_back(reqs->r_inputs->l_datasets[i].d_id); + m_inputs.emplace_back(dataset{reqs->r_inputs->l_routes[i].d_src}, + dataset{reqs->r_inputs->l_routes[i].d_dst}); } m_outputs.reserve(reqs->r_outputs->l_length); for(size_t i = 0; i < reqs->r_outputs->l_length; ++i) { - m_outputs.emplace_back(reqs->r_outputs->l_datasets[i].d_id); + m_outputs.emplace_back(dataset{reqs->r_inputs->l_routes[i].d_src}, + dataset{reqs->r_inputs->l_routes[i].d_dst}); + } + + m_expected_outputs.reserve(reqs->r_expected_outputs->l_length); + + for(size_t i = 0; i < reqs->r_expected_outputs->l_length; ++i) { + m_expected_outputs.emplace_back( + dataset{reqs->r_expected_outputs->l_routes[i].d_src}, + dataset{reqs->r_expected_outputs->l_routes[i].d_dst}); } if(reqs->r_adhoc_storage) { @@ -261,16 +277,21 @@ job::requirements::requirements(ADM_job_requirements_t reqs) { } } -std::vector +std::vector const& job::requirements::inputs() const { return m_inputs; } -std::vector +std::vector const& job::requirements::outputs() const { return m_outputs; } +std::vector const& +job::requirements::expected_outputs() const { + return m_expected_outputs; +} + std::optional job::requirements::adhoc_storage() const { return m_adhoc_storage; @@ -528,6 +549,110 @@ template void dataset::serialize( network::serialization::input_archive&); +class dataset_route::impl { +public: + impl() = default; + explicit impl(dataset src, dataset dst) + : m_source(std::move(src)), m_destination(std::move(dst)) {} + impl(const impl& rhs) = default; + impl(impl&& rhs) = default; + impl& + operator=(const impl& other) noexcept = default; + impl& + operator=(impl&&) noexcept = default; + ~impl() = default; + + dataset const& + source() const { + return m_source; + } + + dataset const& + destination() const { + return m_destination; + } + + template + void + load(Archive& ar) { + ar(SCORD_SERIALIZATION_NVP(m_source)); + ar(SCORD_SERIALIZATION_NVP(m_destination)); + } + + template + void + save(Archive& ar) const { + ar(SCORD_SERIALIZATION_NVP(m_source)); + ar(SCORD_SERIALIZATION_NVP(m_destination)); + } + +private: + dataset m_source; + dataset m_destination; +}; + +dataset_route::dataset_route() = default; + +dataset_route::dataset_route(dataset src, dataset dst) + : m_pimpl(std::make_unique(std::move(src), + std::move(dst))) {} + +dataset_route::dataset_route(ADM_dataset_route_t dataset_route) + : dataset_route::dataset_route(dataset{dataset_route->d_src}, + dataset{dataset_route->d_dst}) {} + +dataset_route::dataset_route(const dataset_route& other) noexcept + : m_pimpl(std::make_unique(*other.m_pimpl)) {} + +dataset_route::dataset_route(dataset_route&&) noexcept = default; + +dataset_route& +dataset_route::operator=(const dataset_route& other) noexcept { + this->m_pimpl = std::make_unique(*other.m_pimpl); + return *this; +} + +dataset_route& +dataset_route::operator=(dataset_route&&) noexcept = default; + +dataset_route::~dataset_route() = default; + +dataset const& +dataset_route::source() const { + return m_pimpl->source(); +} + +dataset const& +dataset_route::destination() const { + return m_pimpl->destination(); +} + +// since the PIMPL class is fully defined at this point, we can now +// define the serialization function +template +inline void +dataset_route::serialize(Archive& ar) { + ar(SCORD_SERIALIZATION_NVP(m_pimpl)); +} + +// we must also explicitly instantiate our template functions for +// serialization in the desired archives +template void +dataset_route::impl::save( + network::serialization::output_archive&) const; + +template void +dataset_route::impl::load( + network::serialization::input_archive&); + +template void +dataset_route::serialize( + network::serialization::output_archive&); + +template void +dataset_route::serialize( + network::serialization::input_archive&); + adhoc_storage::resources::resources(std::vector nodes) : m_nodes(std::move(nodes)) {} @@ -562,22 +687,24 @@ adhoc_storage::resources::nodes() const { } adhoc_storage::ctx::ctx(std::string controller_address, + std::string data_stager_address, adhoc_storage::execution_mode exec_mode, adhoc_storage::access_type access_type, std::uint32_t walltime, bool should_flush) : m_controller_address(std::move(controller_address)), + m_data_stager_address(std::move(data_stager_address)), m_exec_mode(exec_mode), m_access_type(access_type), m_walltime(walltime), m_should_flush(should_flush) {} adhoc_storage::ctx::ctx(ADM_adhoc_context_t ctx) - : adhoc_storage::ctx(ctx->c_ctl_address, + : adhoc_storage::ctx(ctx->c_ctl_address, ctx->c_stager_address, static_cast(ctx->c_mode), static_cast(ctx->c_access), ctx->c_walltime, ctx->c_should_bg_flush) {} adhoc_storage::ctx::operator ADM_adhoc_context_t() const { return ADM_adhoc_context_create( - m_controller_address.c_str(), + m_controller_address.c_str(), m_data_stager_address.c_str(), static_cast(m_exec_mode), static_cast(m_access_type), m_walltime, m_should_flush); @@ -588,6 +715,11 @@ adhoc_storage::ctx::controller_address() const { return m_controller_address; } +std::string const& +adhoc_storage::ctx::data_stager_address() const { + return m_data_stager_address; +} + adhoc_storage::execution_mode adhoc_storage::ctx::exec_mode() const { return m_exec_mode; diff --git a/src/lib/types_private.h b/src/lib/types_private.h index 716766e35950eaf83adec2378bda0daa91f8bb65..3e0ba099bf1e3ec0c3448d9e71bb20793e4a6983 100644 --- a/src/lib/types_private.h +++ b/src/lib/types_private.h @@ -45,6 +45,11 @@ struct adm_dataset { const char* d_id; }; +struct adm_dataset_route { + ADM_dataset_t d_src; + ADM_dataset_t d_dst; +}; + struct adm_job { uint64_t j_id; uint64_t j_slurm_id; @@ -85,6 +90,8 @@ struct adm_dataset_info { struct adm_adhoc_context { /** The address to the node responsible for this adhoc storage system */ const char* c_ctl_address; + /** The address to the data stager for this adhoc storage system */ + const char* c_stager_address; /** The adhoc storage system execution mode */ ADM_adhoc_mode_t c_mode; /** The adhoc storage system access type */ @@ -127,9 +134,11 @@ struct adm_data_operation { struct adm_job_requirements { /** An array of input datasets */ - ADM_dataset_list_t r_inputs; + ADM_dataset_route_list_t r_inputs; /** An array of output datasets */ - ADM_dataset_list_t r_outputs; + ADM_dataset_route_list_t r_outputs; + /** An array of expected output datasets */ + ADM_dataset_route_list_t r_expected_outputs; /** An optional definition for a specific storage instance */ ADM_adhoc_storage_t r_adhoc_storage; }; @@ -147,6 +156,13 @@ struct adm_dataset_list { size_t l_length; }; +struct adm_dataset_route_list { + /** An array of dataset routes */ + struct adm_dataset_route* l_routes; + /** The length of the array */ + size_t l_length; +}; + struct adm_qos_limit_list { /** An array of QoS limits */ struct adm_qos_limit* l_limits; diff --git a/src/lib/utils.cpp b/src/lib/utils.cpp new file mode 100644 index 0000000000000000000000000000000000000000..846fc0eb9b230b15672dba96e87d7dd4beb1ebcc --- /dev/null +++ b/src/lib/utils.cpp @@ -0,0 +1,99 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of the scord API. + * + * The scord API is free software: you can redistribute it and/or modify + * it under the terms of the Lesser GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * The scord API is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the Lesser GNU General Public License + * along with the scord API. If not, see . + * + * SPDX-License-Identifier: LGPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include + + +namespace { + +std::vector +split(const std::string& text, char sep) { + std::vector tokens; + std::size_t start = 0, end; + + while((end = text.find(sep, start)) != std::string::npos) { + tokens.push_back(text.substr(start, end - start)); + start = end + 1; + } + + tokens.push_back(text.substr(start)); + return tokens; +} + +} // namespace + +extern "C" ADM_return_t +scord_utils_parse_dataset_routes(const char* routes, + ADM_dataset_route_t** parsed_routes, + size_t* parsed_routes_count) { + + std::vector tmp; + + if(routes == nullptr || parsed_routes == nullptr || + parsed_routes_count == nullptr) { + return ADM_EBADARGS; + } + + const std::string route_str(routes); + + if(route_str.empty()) { + return ADM_EBADARGS; + } + + for(auto&& rs : split(route_str, ';')) { + + const auto parts = split(rs, '='); + + if(parts.size() != 2) { + return ADM_EBADARGS; + } + + ADM_dataset_route_t dr = + ADM_dataset_route_create(ADM_dataset_create(parts[0].c_str()), + ADM_dataset_create(parts[1].c_str())); + + if(dr == nullptr) { + return ADM_ENOMEM; + } + + tmp.push_back(dr); + } + + *parsed_routes = static_cast( + malloc(tmp.size() * sizeof(ADM_dataset_route_t))); + + if(*parsed_routes == nullptr) { + return ADM_ENOMEM; + } + + *parsed_routes_count = tmp.size(); + + for(std::size_t i = 0; i < tmp.size(); i++) { + (*parsed_routes)[i] = tmp[i]; + } + + return ADM_SUCCESS; +} diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 5ccfb19ebfa0bd0d6e35fd7835bb90639ace92c2..839e283a2fd7dc781ef4b7c9b55132c2d30f423f 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -26,7 +26,7 @@ add_executable(scord) target_sources(scord PRIVATE scord.cpp - job_manager.hpp adhoc_storage_manager.hpp + job_manager.hpp adhoc_storage_manager.hpp transfer_manager.hpp pfs_storage_manager.hpp ${CMAKE_CURRENT_BINARY_DIR}/defaults.hpp internal_types.hpp internal_types.cpp rpc_server.hpp rpc_server.cpp) @@ -51,6 +51,7 @@ target_link_libraries( CLI11::CLI11 RedisPlusPlus::RedisPlusPlus ryml::ryml + cargo::cargo ) install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index 993cefb2a8a84df963f01bfd705c5b505c314d8d..2f723ef6119c7ee03863259e98fe6af8a0bb1e4f 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -79,6 +79,11 @@ adhoc_storage_metadata::controller_address() const { return m_adhoc_storage.context().controller_address(); } +std::string const& +adhoc_storage_metadata::data_stager_address() const { + return m_adhoc_storage.context().data_stager_address(); +} + void adhoc_storage_metadata::update(scord::adhoc_storage::resources new_resources) { m_adhoc_storage.update(std::move(new_resources)); diff --git a/src/scord/internal_types.hpp b/src/scord/internal_types.hpp index e22f862143587161b3c51aeae78264508a689d78..92c7ee8b3f6d7ed369add8f18313a2706d9deff4 100644 --- a/src/scord/internal_types.hpp +++ b/src/scord/internal_types.hpp @@ -81,6 +81,9 @@ struct adhoc_storage_metadata { std::string const& controller_address() const; + std::string const& + data_stager_address() const; + void update(scord::adhoc_storage::resources new_resources); @@ -114,6 +117,44 @@ struct pfs_storage_metadata { std::shared_ptr m_client_info; }; +template +struct transfer_metadata { + transfer_metadata(transfer_id id, TransferHandle&& handle, + std::vector qos) + : m_id(id), m_handle(handle), m_qos(std::move(qos)) {} + + transfer_id + id() const { + return m_id; + } + + TransferHandle + transfer() const { + return m_handle; + } + + std::vector const& + qos() const { + return m_qos; + } + + float + measured_bandwidth() const { + return m_measured_bandwidth; + } + + void + update(float bandwidth) { + m_measured_bandwidth = bandwidth; + } + + transfer_id m_id; + TransferHandle m_handle; + std::vector m_qos; + float m_measured_bandwidth = -1.0; +}; + + } // namespace scord::internal #endif // SCORD_INTERNAL_TYPES_HPP diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 737b93197f24763dcb0c1f765d114830a49e6049..55b692f4fb926c9d919e9b0806452c70bc79808f 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include "rpc_server.hpp" template @@ -275,7 +276,7 @@ rpc_server::register_adhoc_storage( LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, adhoc_ctx: {}, " "adhoc_resources: {}}}", - rpc, name, type, ctx, resources); + rpc, std::quoted(name), type, ctx, resources); scord::error_code ec; std::optional adhoc_id; @@ -517,8 +518,8 @@ rpc_server::register_pfs_storage(const network::request& req, const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, name, - type, ctx); + LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, + std::quoted(name), type, ctx); scord::error_code ec; std::optional pfs_id = 0; @@ -611,17 +612,72 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, "limits: {}, mapping: {}}}", rpc, job_id, sources, targets, limits, mapping); - scord::error_code ec; + const auto jm_result = m_job_manager.find(job_id); + + if(!jm_result) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error finding job: {}\"", + rpc.id(), job_id); + const auto resp = response_with_id{rpc.id(), jm_result.error()}; + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); + return; + } + + const auto& job_metadata_ptr = jm_result.value(); + + if(!job_metadata_ptr->adhoc_storage_metadata()) { + LOGGER_ERROR("rpc id: {} error_msg: \"Job has no adhoc storage\"", + rpc.id(), job_id); + const auto resp = response_with_id{rpc.id(), error_code::no_resources}; + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); + return; + } + + const auto data_stager_address = + job_metadata_ptr->adhoc_storage_metadata()->data_stager_address(); + + // Transform the `scord::dataset`s into `cargo::dataset`s and contact the + // Cargo service associated with the job's adhoc storage instance to + // execute the transfers. + cargo::server srv{data_stager_address}; - std::optional tx_id; + std::vector inputs; + std::vector outputs; - // TODO: generate a global ID for the transfer and contact Cargo to - // actually request it - tx_id = 42; + // TODO: check type of storage tier to enable parallel transfers + std::transform(sources.cbegin(), sources.cend(), std::back_inserter(inputs), + [](const auto& src) { return cargo::dataset{src.id()}; }); - const auto resp = response_with_id{rpc.id(), ec, tx_id}; + std::transform(targets.cbegin(), targets.cend(), + std::back_inserter(outputs), + [](const auto& tgt) { return cargo::dataset{tgt.id()}; }); - LOGGER_INFO("rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, ec, tx_id); + const auto cargo_tx = cargo::transfer_datasets(srv, inputs, outputs); + + // Register the transfer into the `tranfer_manager`. + // We embed the generated `cargo::transfer` object into + // scord's `transfer_metadata` so that we can later query the Cargo + // service for the transfer's status. + const auto rv = + m_transfer_manager.create(cargo_tx, limits) + .or_else([&](auto&& ec) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error creating " + "transfer: {}\"", + rpc.id(), ec); + }) + .and_then([&](auto&& transfer_metadata_ptr) + -> tl::expected { + return transfer_metadata_ptr->id(); + }); + + const auto resp = + rv ? response_with_id{rpc.id(), error_code::success, rv.value()} + : response_with_id{rpc.id(), rv.error()}; + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, + resp.error_code(), resp.value_or_none()); req.respond(resp); } diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 494c85fe164f77b40042639662fd2fda67065630..f86a60c8aa52ccacfddc44c9f416673e22aab093 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -31,6 +31,11 @@ #include "job_manager.hpp" #include "adhoc_storage_manager.hpp" #include "pfs_storage_manager.hpp" +#include "transfer_manager.hpp" + +namespace cargo { +class transfer; +} namespace scord { @@ -103,6 +108,7 @@ private: job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; + transfer_manager m_transfer_manager; }; } // namespace scord diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..6abb74f2deb8e87d6bde6343dfcd45beef8800b4 --- /dev/null +++ b/src/scord/transfer_manager.hpp @@ -0,0 +1,129 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_TRANSFER_MANAGER_HPP +#define SCORD_TRANSFER_MANAGER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include "internal_types.hpp" + +namespace scord { + +template +struct transfer_manager { + + tl::expected< + std::shared_ptr>, + scord::error_code> + create(TransferHandle tx, std::vector limits) { + + static std::atomic_uint64_t current_id; + scord::transfer_id id = current_id++; + + abt::unique_lock lock(m_transfer_mutex); + + if(const auto it = m_transfer.find(id); it == m_transfer.end()) { + const auto& [it_transfer, inserted] = m_transfer.emplace( + id, std::make_shared< + internal::transfer_metadata>( + id, std::move(tx), std::move(limits))); + + if(!inserted) { + LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); + return tl::make_unexpected(scord::error_code::snafu); + } + + return it_transfer->second; + } + + LOGGER_ERROR("{}: Transfer '{}' already exists", __FUNCTION__, id); + return tl::make_unexpected(scord::error_code::entity_exists); + } + + scord::error_code + update(scord::transfer_id id, float obtained_bw) { + + abt::unique_lock lock(m_transfer_mutex); + + if(const auto it = m_transfer.find(id); it != m_transfer.end()) { + const auto& current_transfer_info = it->second; + current_transfer_info->update(obtained_bw); + return scord::error_code::success; + } + + LOGGER_ERROR("{}: Transfer '{}' does not exist", __FUNCTION__, id); + return scord::error_code::no_such_entity; + } + + tl::expected< + std::shared_ptr>, + scord::error_code> + find(scord::transfer_id id) { + + abt::shared_lock lock(m_transfer_mutex); + + if(auto it = m_transfer.find(id); it != m_transfer.end()) { + return it->second; + } + + LOGGER_ERROR("Transfer '{}' was not registered or was already deleted", + id); + return tl::make_unexpected(scord::error_code::no_such_entity); + } + + tl::expected< + std::shared_ptr>, + scord::error_code> + remove(scord::transfer_id id) { + + abt::unique_lock lock(m_transfer_mutex); + + if(const auto it = m_transfer.find(id); it != m_transfer.end()) { + auto nh = m_transfer.extract(it); + return nh.mapped(); + } + + LOGGER_ERROR("Transfer '{}' was not registered or was already deleted", + id); + + return tl::make_unexpected(scord::error_code::no_such_entity); + } + +private: + mutable abt::shared_mutex m_transfer_mutex; + std::unordered_map< + scord::transfer_id, + std::shared_ptr>> + m_transfer; +}; + +} // namespace scord + +#endif // SCORD_TRANSFER_MANAGER_HPP