From f781ac5682cb56ed4255348c4348e8ffb7fef58e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 4 Oct 2022 10:37:24 +0200 Subject: [PATCH 1/2] ADM_adhoc_context_t: Replace `uint32_t nodes` field with `ADM_adhoc_resources_t resources` --- examples/c/ADM_cancel_transfer.c | 14 +- examples/c/ADM_connect_data_operation.c | 14 +- examples/c/ADM_define_data_operation.c | 14 +- examples/c/ADM_deploy_adhoc_storage.c | 14 +- examples/c/ADM_finalize_data_operation.c | 14 +- examples/c/ADM_get_pending_transfers.c | 14 +- examples/c/ADM_get_qos_constraints.c | 14 +- examples/c/ADM_get_statistics.c | 14 +- examples/c/ADM_get_transfer_priority.c | 14 +- .../c/ADM_link_transfer_to_data_operation.c | 14 +- examples/c/ADM_register_adhoc_storage.c | 14 +- examples/c/ADM_register_job.c | 14 +- examples/c/ADM_remove_adhoc_storage.c | 14 +- examples/c/ADM_remove_job.c | 14 +- examples/c/ADM_set_dataset_information.c | 14 +- examples/c/ADM_set_io_resources.c | 17 +- examples/c/ADM_set_qos_constraints.c | 14 +- examples/c/ADM_set_transfer_priority.c | 14 +- examples/c/ADM_transfer_datasets.c | 20 +- examples/c/ADM_update_adhoc_storage.c | 14 +- examples/c/ADM_update_job.c | 14 +- examples/c/common.c | 22 ++ examples/c/common.h | 3 + examples/cxx/ADM_register_adhoc_storage.cpp | 12 +- examples/cxx/ADM_register_job.cpp | 9 +- examples/cxx/ADM_set_io_resources.cpp | 2 +- examples/cxx/ADM_transfer_datasets.cpp | 15 +- examples/cxx/ADM_update_job.cpp | 12 +- examples/cxx/common.cpp | 11 + examples/cxx/common.hpp | 3 + src/common/api/admire_types.h | 57 ++++-- src/common/api/admire_types.hpp | 53 ++++- src/common/api/convert.cpp | 39 +++- src/common/api/convert.hpp | 62 +++++- src/common/api/types.cpp | 189 +++++++++++++++--- src/common/net/proto/rpc_types.c | 144 ++++++++++++- src/common/net/proto/rpc_types.h | 39 ++-- src/lib/admire.cpp | 2 +- src/lib/admire.h | 2 +- src/lib/admire.hpp | 2 +- src/lib/c_wrapper.cpp | 2 +- 41 files changed, 829 insertions(+), 154 deletions(-) diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 8feb1a29..0e086bc4 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_connect_data_operation.c b/examples/c/ADM_connect_data_operation.c index 6fc25c34..0145532a 100644 --- a/examples/c/ADM_connect_data_operation.c +++ b/examples/c/ADM_connect_data_operation.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_define_data_operation.c b/examples/c/ADM_define_data_operation.c index ac18ff92..b1d5a324 100644 --- a/examples/c/ADM_define_data_operation.c +++ b/examples/c/ADM_define_data_operation.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -45,13 +46,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index f3c32447..94b88da2 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_finalize_data_operation.c b/examples/c/ADM_finalize_data_operation.c index a38c393a..a54e320c 100644 --- a/examples/c/ADM_finalize_data_operation.c +++ b/examples/c/ADM_finalize_data_operation.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -45,13 +46,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_get_pending_transfers.c b/examples/c/ADM_get_pending_transfers.c index e089247c..37359ea7 100644 --- a/examples/c/ADM_get_pending_transfers.c +++ b/examples/c/ADM_get_pending_transfers.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_get_qos_constraints.c b/examples/c/ADM_get_qos_constraints.c index 88497d33..3cb0df77 100644 --- a/examples/c/ADM_get_qos_constraints.c +++ b/examples/c/ADM_get_qos_constraints.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_get_statistics.c b/examples/c/ADM_get_statistics.c index a980a2e4..5500bcea 100644 --- a/examples/c/ADM_get_statistics.c +++ b/examples/c/ADM_get_statistics.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index 8529a2f3..3007b7ba 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index 476cf605..57657ff6 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -45,13 +46,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index 6667bd74..4145c750 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_register_job.c b/examples/c/ADM_register_job.c index 72744b91..8ab2c28e 100644 --- a/examples/c/ADM_register_job.c +++ b/examples/c/ADM_register_job.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index 544ded6a..dcd08471 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_remove_job.c b/examples/c/ADM_remove_job.c index 4cd38d54..f8c68f58 100644 --- a/examples/c/ADM_remove_job.c +++ b/examples/c/ADM_remove_job.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_set_dataset_information.c b/examples/c/ADM_set_dataset_information.c index 53dbe38e..a207e06d 100644 --- a/examples/c/ADM_set_dataset_information.c +++ b/examples/c/ADM_set_dataset_information.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -45,13 +46,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_set_io_resources.c b/examples/c/ADM_set_io_resources.c index 785eb001..1854d9e7 100644 --- a/examples/c/ADM_set_io_resources.c +++ b/examples/c/ADM_set_io_resources.c @@ -28,9 +28,9 @@ #include #include "common.h" - -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -45,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); @@ -70,7 +77,7 @@ main(int argc, char* argv[]) { } ADM_storage_t tier = NULL; - ADM_storage_resources_t resources = NULL; + ADM_adhoc_resources_t resources = NULL; ret = ADM_set_io_resources(server, job, tier, resources); if(ret != ADM_SUCCESS) { diff --git a/examples/c/ADM_set_qos_constraints.c b/examples/c/ADM_set_qos_constraints.c index 61248527..543f1b8e 100644 --- a/examples/c/ADM_set_qos_constraints.c +++ b/examples/c/ADM_set_qos_constraints.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index bdd45e70..c543df56 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index cd5ad062..841071f8 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -28,11 +28,12 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 -#define NSOURCES 5 -#define NTARGETS 5 -#define NLIMITS 3 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 3 int main(int argc, char* argv[]) { @@ -47,13 +48,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index 45a90d54..b0a20137 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/ADM_update_job.c b/examples/c/ADM_update_job.c index da4ee588..8e692d4d 100644 --- a/examples/c/ADM_update_job.c +++ b/examples/c/ADM_update_job.c @@ -28,8 +28,9 @@ #include #include "common.h" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -44,13 +45,20 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); + assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); assert(inputs); ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_adhoc_resources_t adhoc_resources = + ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( - ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 42, 100, false); + ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, adhoc_resources, + 100, false); assert(ctx); ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); diff --git a/examples/c/common.c b/examples/c/common.c index 1e47a170..ccb779b4 100644 --- a/examples/c/common.c +++ b/examples/c/common.c @@ -5,6 +5,28 @@ #include "common.h" #include "admire_types.h" +ADM_node_t* +prepare_nodes(size_t n) { + + ADM_node_t* nodes = calloc(n, sizeof(ADM_node_t)); + + if(!nodes) { + return NULL; + } + + for(size_t i = 0; i < n; ++i) { + size_t len = snprintf(NULL, 0, "node-%02zu", i); + char* id = (char*) alloca(len + 1); + snprintf(id, len + 1, "node-%02zu", i); + nodes[i] = ADM_node_create(id); + if(!nodes[i]) { + return NULL; + } + } + + return nodes; +} + ADM_dataset_t* prepare_datasets(const char* pattern, size_t n) { diff --git a/examples/c/common.h b/examples/c/common.h index 16b383dc..43e0536e 100644 --- a/examples/c/common.h +++ b/examples/c/common.h @@ -3,6 +3,9 @@ #include +ADM_node_t* +prepare_nodes(size_t n); + ADM_dataset_t* prepare_datasets(const char* pattern, size_t n); diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index 7cae76be..c8bf6bbf 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -26,8 +26,9 @@ #include #include "common.hpp" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -41,20 +42,23 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, - admire::adhoc_storage::access_type::read_write, 42, 100, false); + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false); admire::job_requirements reqs(inputs, outputs, std::move(p)); std::string user_id = "adhoc_storage_42"; const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{ admire::adhoc_storage::execution_mode::separate_new, - admire::adhoc_storage::access_type::read_write, 42, 100, false}; + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false}; ADM_return_t ret = ADM_SUCCESS; try { diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index bec2e5ed..771b8f1f 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -26,8 +26,9 @@ #include #include "common.hpp" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -40,13 +41,15 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, - admire::adhoc_storage::access_type::read_write, 42, 100, false); + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false); admire::job_requirements reqs(inputs, outputs, std::move(p)); diff --git a/examples/cxx/ADM_set_io_resources.cpp b/examples/cxx/ADM_set_io_resources.cpp index cf394845..bad2726d 100644 --- a/examples/cxx/ADM_set_io_resources.cpp +++ b/examples/cxx/ADM_set_io_resources.cpp @@ -39,7 +39,7 @@ main(int argc, char* argv[]) { ADM_job_t job{}; ADM_storage_t tier{}; - ADM_storage_resources_t resources{}; + ADM_adhoc_resources_t resources{}; ADM_return_t ret = ADM_SUCCESS; try { diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 7d39e932..74f3f1ff 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -26,11 +26,12 @@ #include #include "common.hpp" -#define NINPUTS 10 -#define NOUTPUTS 5 -#define NSOURCES 5 -#define NTARGETS 5 -#define NLIMITS 4 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 +#define NSOURCES 5 +#define NTARGETS 5 +#define NLIMITS 4 int main(int argc, char* argv[]) { @@ -43,6 +44,7 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); @@ -54,7 +56,8 @@ main(int argc, char* argv[]) { auto p = std::make_unique( admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, - admire::adhoc_storage::access_type::read_write, 42, 100, false); + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false); admire::job_requirements reqs(inputs, outputs, std::move(p)); diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index a1156911..01e70d37 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -26,8 +26,9 @@ #include #include "common.hpp" -#define NINPUTS 10 -#define NOUTPUTS 5 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 int main(int argc, char* argv[]) { @@ -40,13 +41,15 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); auto p = std::make_unique( admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, - admire::adhoc_storage::access_type::read_write, 42, 100, false); + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false); admire::job_requirements reqs{inputs, outputs, std::move(p)}; @@ -58,7 +61,8 @@ main(int argc, char* argv[]) { auto p2 = std::make_unique( admire::storage::type::gekkofs, "foobar", admire::adhoc_storage::execution_mode::separate_new, - admire::adhoc_storage::access_type::read_write, 42, 100, false); + admire::adhoc_storage::access_type::read_write, + admire::adhoc_storage::resources{adhoc_nodes}, 100, false); admire::job_requirements new_reqs{new_inputs, new_outputs, std::move(p2)}; diff --git a/examples/cxx/common.cpp b/examples/cxx/common.cpp index 5461521b..be3585d9 100644 --- a/examples/cxx/common.cpp +++ b/examples/cxx/common.cpp @@ -1,5 +1,16 @@ #include "common.hpp" +std::vector +prepare_nodes(size_t n) { + std::vector nodes; + nodes.reserve(n); + for(size_t i = 0; i < n; ++i) { + nodes.emplace_back(fmt::format("node-{:02d}", i)); + } + + return nodes; +} + std::vector prepare_datasets(const std::string& pattern, size_t n) { std::vector datasets; diff --git a/examples/cxx/common.hpp b/examples/cxx/common.hpp index f3618be5..ca08c7e5 100644 --- a/examples/cxx/common.hpp +++ b/examples/cxx/common.hpp @@ -4,6 +4,9 @@ #include #include +std::vector +prepare_nodes(size_t n); + std::vector prepare_datasets(const std::string& pattern, size_t n); diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index 0cb5db79..d4ad7564 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -61,6 +61,9 @@ typedef struct adm_server* ADM_server_t; /** A node */ typedef struct adm_node* ADM_node_t; +/** A list of nodes */ +typedef struct adm_node_list* ADM_node_list_t; + /** A job */ typedef struct adm_job* ADM_job_t; @@ -113,7 +116,7 @@ typedef enum { typedef struct adm_storage* ADM_storage_t; /** Information about resources assigned to a storage tier */ -typedef struct adm_storage_resources* ADM_storage_resources_t; +typedef struct adm_adhoc_resources* ADM_adhoc_resources_t; /** Execution modes for an adhoc storage system */ typedef enum { @@ -242,6 +245,29 @@ ADM_node_create(const char* hostname); ADM_return_t ADM_node_destroy(ADM_node_t node); +/** + * Create a node list from an array of ADM_NODEs and its + * length. + * + * @remark node lists need to be freed by calling ADM_node_list_destroy(). + * + * @param[in] datasets The array of nodes. + * @param[in] len The length of the array. + * @return A valid ADM_node_list_t if successful or NULL in case of + * failure. + */ +ADM_node_list_t +ADM_node_list_create(ADM_node_t nodes[], size_t len); + +/** + * Destroy a node list created by ADM_node_list_create(). + * + * @param[in] list A valid ADM_node_list_t + * @return ADM_SUCCESS or corresponding ADM error code + */ +ADM_return_t +ADM_node_list_destroy(ADM_node_list_t list); + /* ----------------------------------------------------- */ /* Jobs */ /* ----------------------------------------------------- */ @@ -379,24 +405,28 @@ ADM_return_t ADM_storage_destroy(ADM_storage_t storage); /** - * Create an ADM_STORAGE_RESOURCES from information about storage resources. + * Create an ADM_ADHOC_RESOURCES from information about storage resources. + * + * @remark ADM_ADHOC_RESOURCES need to be freed by calling + * ADM_adhoc_resources_destroy(). * - * @remark ADM_STORAGE_RESOURCES need to be freed by calling - * ADM_storage_resources_destroy(). + * @param[in] nodes An array of ADM_NODES describing the nodes assigned + * by the adhoc_storage. + * @param[in] nodes_len The number of ADM_NODES stored in nodes. * - * @return A valid ADM_STORAGE_RESOURCES, or NULL in case of failure + * @return A valid ADM_ADHOC_RESOURCES, or NULL in case of failure */ -ADM_storage_resources_t -ADM_storage_resources_create(); +ADM_adhoc_resources_t +ADM_adhoc_resources_create(ADM_node_t nodes[], size_t nodes_len); /** - * Destroy a ADM_STORAGE_RESOURCES created by ADM_storage_resources_create(). + * Destroy a ADM_ADHOC_RESOURCES created by ADM_adhoc_resources_create(). * - * @param[in] res A valid ADM_STORAGE + * @param[in] res A valid ADM_ADHOC_RESOURCES * @return ADM_SUCCESS or corresponding ADM error code */ ADM_return_t -ADM_storage_resources_destroy(ADM_storage_resources_t res); +ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res); /** * Create an ADM_ADHOC_CONTEXT from information about how an adhoc storage @@ -407,7 +437,7 @@ ADM_storage_resources_destroy(ADM_storage_resources_t res); * * @param[in] exec_mode The adhoc storage system execution mode * @param[in] access_type The adhoc storage system execution type - * @param[in] nodes The number of nodes for the adhoc storage system + * @param[in] adhoc_resources The resources assigned for the storage system * @param[in] walltime The adhoc storage system walltime * @param[in] should_flush Whether the adhoc storage system should flush data in * the background @@ -415,7 +445,8 @@ ADM_storage_resources_destroy(ADM_storage_resources_t res); */ ADM_adhoc_context_t ADM_adhoc_context_create(ADM_adhoc_mode_t exec_mode, - ADM_adhoc_access_t access_type, uint32_t nodes, + ADM_adhoc_access_t access_type, + ADM_adhoc_resources_t adhoc_resources, uint32_t walltime, bool should_flush); /** @@ -533,7 +564,7 @@ ADM_qos_limit_list_destroy(ADM_qos_limit_list_t list); * Create an ADM_DATA_OPERATION from information about storage resources. * * @remark ADM_DATA_OPERATION need to be freed by calling - * ADM_storage_resources_destroy(). + * ADM_adhoc_resources_destroy(). * * @return A valid ADM_DATA_OPERATION, or NULL in case of failure */ diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 28293cd3..b3fe951d 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -268,10 +268,22 @@ struct adhoc_storage : public storage { read_write = ADM_ADHOC_ACCESS_RDWR, }; + struct resources { + explicit resources(std::vector nodes); + explicit resources(ADM_adhoc_resources_t res); + + std::vector + nodes() const; + + private: + std::vector m_nodes; + }; + struct ctx : storage::ctx { ctx(execution_mode exec_mode, access_type access_type, - std::uint32_t nodes, std::uint32_t walltime, bool should_flush); + adhoc_storage::resources resources, std::uint32_t walltime, + bool should_flush); explicit ctx(ADM_adhoc_context_t ctx); @@ -279,8 +291,8 @@ struct adhoc_storage : public storage { exec_mode() const; enum access_type access_type() const; - std::uint32_t - nodes() const; + adhoc_storage::resources + resources() const; std::uint32_t walltime() const; bool @@ -289,14 +301,14 @@ struct adhoc_storage : public storage { private: execution_mode m_exec_mode; enum access_type m_access_type; - std::uint32_t m_nodes; + adhoc_storage::resources m_resources; std::uint32_t m_walltime; bool m_should_flush; }; adhoc_storage(enum storage::type type, std::string user_id, execution_mode exec_mode, access_type access_type, - std::uint32_t nodes, std::uint32_t walltime, + adhoc_storage::resources res, std::uint32_t walltime, bool should_flush); adhoc_storage(enum storage::type type, std::string user_id, ADM_adhoc_context_t ctx); @@ -627,6 +639,22 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter + : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::adhoc_storage::resources& r, + FormatContext& ctx) const { + + const auto str = fmt::format("{{nodes: {}}}", r.nodes()); + + return formatter::format(str, ctx); + } +}; + + template <> struct fmt::formatter : formatter { @@ -637,8 +665,8 @@ struct fmt::formatter const auto str = fmt::format("{{execution_mode: {}, access_type: {}, " - "nodes: {}, walltime: {}, should_flush: {}}}", - c.exec_mode(), c.access_type(), c.nodes(), + "resources: {}, walltime: {}, should_flush: {}}}", + c.exec_mode(), c.access_type(), c.resources(), c.walltime(), c.should_flush()); return formatter::format(str, ctx); @@ -825,6 +853,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter> : formatter { + // parse is inherited from formatter. + template + auto + format(const std::vector& v, FormatContext& ctx) const { + const auto str = fmt::format("[{}]", fmt::join(v, ", ")); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter> : formatter { diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index c9873ee6..de070c65 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -35,6 +35,19 @@ ADM_transfer_create(uint64_t id); namespace { +admire::api::managed_ctype_array +as_ctype_array(const std::vector& nodes) { + + std::vector tmp; + + std::transform(nodes.cbegin(), nodes.cend(), std::back_inserter(tmp), + [](const admire::node& n) { + return ADM_node_create(n.hostname().c_str()); + }); + + return admire::api::managed_ctype_array{std::move(tmp)}; +} + admire::api::managed_ctype_array as_ctype_array(const std::vector& datasets) { @@ -57,12 +70,30 @@ convert(const node& node) { return managed_ctype(ADM_node_create(node.hostname().c_str())); } +managed_ctype +convert(const adhoc_storage::resources& res) { + + auto managed_nodes = as_ctype_array(res.nodes()); + + ADM_adhoc_resources_t c_res = ADM_adhoc_resources_create( + managed_nodes.data(), managed_nodes.size()); + + return managed_ctype{c_res, + std::move(managed_nodes)}; +} + managed_ctype convert(const adhoc_storage::ctx& ctx) { - return managed_ctype{ADM_adhoc_context_create( - static_cast(ctx.exec_mode()), - static_cast(ctx.access_type()), ctx.nodes(), - ctx.walltime(), ctx.should_flush())}; + + auto managed_adhoc_resources = convert(ctx.resources()); + + return managed_ctype{ + ADM_adhoc_context_create( + static_cast(ctx.exec_mode()), + static_cast(ctx.access_type()), + managed_adhoc_resources.get(), ctx.walltime(), + ctx.should_flush()), + std::move(managed_adhoc_resources)}; } managed_ctype diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 2b569791..59692e98 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -42,6 +42,9 @@ struct managed_ctype_array; managed_ctype convert(const node& node); +managed_ctype +convert(const adhoc_storage::resources& res); + managed_ctype convert(const adhoc_storage::ctx& ctx); @@ -109,10 +112,66 @@ struct admire::api::managed_ctype { scord::utils::ctype_ptr m_node; }; +template <> +struct admire::api::managed_ctype_array { + + explicit managed_ctype_array(ADM_node_t* data, size_t size) + : m_nodes(data, size) {} + + explicit managed_ctype_array(std::vector&& v) + : m_nodes(v.data(), v.size()) {} + + constexpr size_t + size() const { + return m_nodes.size(); + } + + constexpr const ADM_node_t* + data() const noexcept { + return m_nodes.data(); + } + + constexpr ADM_node_t* + data() noexcept { + return m_nodes.data(); + } + + constexpr ADM_node_t* + release() noexcept { + return m_nodes.release(); + } + + scord::utils::ctype_ptr_vector m_nodes; +}; + +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_adhoc_resources_t res, + managed_ctype_array&& nodes) + : m_adhoc_resources(res), m_nodes(std::move(nodes)) {} + + ADM_adhoc_resources_t + get() const { + return m_adhoc_resources.get(); + } + + ADM_adhoc_resources_t + release() { + return m_adhoc_resources.release(); + } + + scord::utils::ctype_ptr + m_adhoc_resources; + managed_ctype_array m_nodes; +}; + template <> struct admire::api::managed_ctype { - explicit managed_ctype(ADM_adhoc_context_t ctx) : m_adhoc_context(ctx) {} + explicit managed_ctype(ADM_adhoc_context_t ctx, + managed_ctype&& resources) + : m_adhoc_context(ctx), m_adhoc_resources(std::move(resources)) {} ADM_adhoc_context_t get() const { @@ -126,6 +185,7 @@ struct admire::api::managed_ctype { scord::utils::ctype_ptr m_adhoc_context; + managed_ctype m_adhoc_resources; }; template <> diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index f778737d..90aa40ef 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -83,11 +83,35 @@ ADM_node_create(const char* hostname) { return NULL; } - adm_node->n_hostname = hostname; + if(hostname) { + size_t n = strlen(hostname); + adm_node->n_hostname = (const char*) calloc(n + 1, sizeof(char)); + strcpy((char*) adm_node->n_hostname, hostname); + } return adm_node; } +ADM_node_t +ADM_node_copy(ADM_node_t dst, const ADM_node_t src) { + + if(!src || !dst) { + return NULL; + } + + // copy all primitive types + *dst = *src; + + // duplicate copy any pointer types + if(src->n_hostname) { + size_t n = strlen(src->n_hostname); + dst->n_hostname = (const char*) calloc(n + 1, sizeof(char)); + strncpy((char*) dst->n_hostname, src->n_hostname, n); + } + + return dst; +} + ADM_return_t ADM_node_destroy(ADM_node_t node) { ADM_return_t ret = ADM_SUCCESS; @@ -97,10 +121,80 @@ ADM_node_destroy(ADM_node_t node) { return ADM_EBADARGS; } + if(node->n_hostname) { + free((void*) node->n_hostname); + } + free(node); return ret; } +ADM_node_list_t +ADM_node_list_create(ADM_node_t nodes[], size_t length) { + + ADM_node_list_t p = (ADM_node_list_t) malloc(sizeof(*p)); + + if(!p) { + LOGGER_ERROR("Could not allocate ADM_node_list_t") + return NULL; + } + + const char* error_msg = NULL; + + p->l_length = length; + p->l_nodes = (struct adm_node*) calloc(length, sizeof(adm_node)); + + if(!p->l_nodes) { + error_msg = "Could not allocate ADM_node_list_t"; + goto cleanup_on_error; + } + + for(size_t i = 0; i < length; ++i) { + + if(!ADM_node_copy(&p->l_nodes[i], nodes[i])) { + error_msg = "Could not allocate ADM_node_list_t"; + goto cleanup_on_error; + }; + } + + return p; + +cleanup_on_error: + if(p->l_nodes) { + free(p->l_nodes); + } + free(p); + + if(error_msg) { + LOGGER_ERROR(error_msg); + } + + return NULL; +} + +ADM_return_t +ADM_node_list_destroy(ADM_node_list_t list) { + ADM_return_t ret = ADM_SUCCESS; + + if(!list) { + LOGGER_ERROR("Invalid ADM_node_list_t") + return ADM_EBADARGS; + } + + // We cannot call ADM_node_destroy here because adm_nodes + // are stored as a consecutive array in memory. Thus, we free + // the node ids themselves and then the array. + if(list->l_nodes) { + for(size_t i = 0; i < list->l_length; ++i) { + free((void*) list->l_nodes[i].n_hostname); + } + free(list->l_nodes); + } + + free(list); + return ret; +} + ADM_dataset_t ADM_dataset_create(const char* id) { @@ -449,23 +543,45 @@ ADM_storage_destroy(ADM_storage_t storage) { return ret; } -ADM_storage_resources_t -ADM_storage_resources_create() { +ADM_adhoc_resources_t +ADM_adhoc_resources_create(ADM_node_t nodes[], size_t nodes_len) { - struct adm_storage_resources* adm_storage_resources = - (struct adm_storage_resources*) malloc( - sizeof(*adm_storage_resources)); + struct adm_adhoc_resources* adm_adhoc_resources = + (struct adm_adhoc_resources*) malloc(sizeof(*adm_adhoc_resources)); - if(!adm_storage_resources) { - LOGGER_ERROR("Could not allocate ADM_storage_resources_t"); - return NULL; + const char* error_msg = NULL; + ADM_node_list_t nodes_list = NULL; + + if(!adm_adhoc_resources) { + error_msg = "Could not allocate ADM_adhoc_resources_t"; + goto cleanup_on_error; + } + + nodes_list = ADM_node_list_create(nodes, nodes_len); + + if(!nodes_list) { + error_msg = "Could not allocate ADM_adhoc_resources_t"; + goto cleanup_on_error; + } + + adm_adhoc_resources->r_nodes = nodes_list; + + return adm_adhoc_resources; + +cleanup_on_error: + if(error_msg) { + LOGGER_ERROR(error_msg); } - return adm_storage_resources; + if(adm_adhoc_resources) { + ADM_adhoc_resources_destroy(adm_adhoc_resources); + } + + return NULL; } ADM_return_t -ADM_storage_resources_destroy(ADM_storage_resources_t res) { +ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res) { ADM_return_t ret = ADM_SUCCESS; @@ -474,6 +590,10 @@ ADM_storage_resources_destroy(ADM_storage_resources_t res) { return ADM_EBADARGS; } + if(res->r_nodes) { + ADM_node_list_destroy(res->r_nodes); + } + free(res); return ret; } @@ -509,7 +629,8 @@ ADM_data_operation_destroy(ADM_data_operation_t op) { ADM_adhoc_context_t ADM_adhoc_context_create(ADM_adhoc_mode_t exec_mode, - ADM_adhoc_access_t access_type, uint32_t nodes, + ADM_adhoc_access_t access_type, + ADM_adhoc_resources_t adhoc_resources, uint32_t walltime, bool should_flush) { struct adm_adhoc_context* adm_adhoc_context = @@ -522,7 +643,7 @@ ADM_adhoc_context_create(ADM_adhoc_mode_t exec_mode, adm_adhoc_context->c_mode = exec_mode; adm_adhoc_context->c_access = access_type; - adm_adhoc_context->c_nodes = nodes; + adm_adhoc_context->c_resources = adhoc_resources; adm_adhoc_context->c_walltime = walltime; adm_adhoc_context->c_should_bg_flush = should_flush; @@ -1071,18 +1192,36 @@ storage::type() const { return m_type; } +adhoc_storage::resources::resources(std::vector nodes) + : m_nodes(std::move(nodes)) {} + +adhoc_storage::resources::resources(ADM_adhoc_resources_t res) { + assert(res->r_nodes); + m_nodes.reserve(res->r_nodes->l_length); + + for(size_t i = 0; i < res->r_nodes->l_length; ++i) { + m_nodes.emplace_back(res->r_nodes->l_nodes[i].n_hostname); + } +} + +std::vector +adhoc_storage::resources::nodes() const { + return m_nodes; +} + adhoc_storage::ctx::ctx(adhoc_storage::execution_mode exec_mode, adhoc_storage::access_type access_type, - std::uint32_t nodes, std::uint32_t walltime, - bool should_flush) - : m_exec_mode(exec_mode), m_access_type(access_type), m_nodes(nodes), - m_walltime(walltime), m_should_flush(should_flush) {} + adhoc_storage::resources resources, + std::uint32_t walltime, bool should_flush) + : m_exec_mode(exec_mode), m_access_type(access_type), + m_resources(std::move(resources)), m_walltime(walltime), + m_should_flush(should_flush) {} adhoc_storage::ctx::ctx(ADM_adhoc_context_t ctx) : adhoc_storage::ctx(static_cast(ctx->c_mode), static_cast(ctx->c_access), - ctx->c_nodes, ctx->c_walltime, - ctx->c_should_bg_flush) {} + adhoc_storage::resources{ctx->c_resources}, + ctx->c_walltime, ctx->c_should_bg_flush) {} adhoc_storage::execution_mode adhoc_storage::ctx::exec_mode() const { @@ -1094,9 +1233,9 @@ adhoc_storage::ctx::access_type() const { return m_access_type; } -std::uint32_t -adhoc_storage::ctx::nodes() const { - return m_nodes; +adhoc_storage::resources +adhoc_storage::ctx::resources() const { + return m_resources; } std::uint32_t @@ -1144,11 +1283,11 @@ private: adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, execution_mode exec_mode, access_type access_type, - std::uint32_t nodes, std::uint32_t walltime, - bool should_flush) + adhoc_storage::resources res, + std::uint32_t walltime, bool should_flush) : storage(type, std::move(user_id)), m_pimpl(std::make_unique(adhoc_storage::ctx{ - exec_mode, access_type, nodes, walltime, should_flush})) {} + exec_mode, access_type, res, walltime, should_flush})) {} adhoc_storage::adhoc_storage(enum storage::type type, std::string user_id, ADM_adhoc_context_t ctx) diff --git a/src/common/net/proto/rpc_types.c b/src/common/net/proto/rpc_types.c index dfdb9c2b..8ab8e6ec 100644 --- a/src/common/net/proto/rpc_types.c +++ b/src/common/net/proto/rpc_types.c @@ -110,7 +110,7 @@ hg_proc_ADM_dataset_t(hg_proc_t proc, void* data) { case HG_ENCODE: // find out the length of the adm_dataset object we need to send - dataset_length = *dataset ? sizeof(adm_node) : 0; + dataset_length = *dataset ? sizeof(adm_dataset) : 0; ret = hg_proc_hg_size_t(proc, &dataset_length); if(ret != HG_SUCCESS) { @@ -821,3 +821,145 @@ hg_proc_ADM_qos_entity_t(hg_proc_t proc, void* data) { return ret; } + +hg_return_t +hg_proc_ADM_node_list_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_node_list_t* list = (ADM_node_list_t*) data; + ADM_node_list_t tmp = NULL; + + hg_size_t length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + tmp = *list; + // write the length of the list + length = tmp->l_length; + ret = hg_proc_hg_size_t(proc, &tmp->l_length); + + if(ret != HG_SUCCESS) { + break; + } + + // write the list + for(size_t i = 0; i < length; ++i) { + ret = hg_proc_adm_node(proc, &tmp->l_nodes[i]); + + if(ret != HG_SUCCESS) { + break; + } + } + break; + + case HG_DECODE: { + + // find out the length of the list + ret = hg_proc_hg_size_t(proc, &length); + + if(ret != HG_SUCCESS) { + break; + } + + // loop and create list elements + tmp = (ADM_node_list_t) calloc(1, sizeof(struct adm_node_list)); + tmp->l_length = length; + tmp->l_nodes = (adm_node*) calloc(length, sizeof(adm_node)); + + for(size_t i = 0; i < length; ++i) { + ret = hg_proc_adm_node(proc, &tmp->l_nodes[i]); + + if(ret != HG_SUCCESS) { + break; + } + } + + // return the newly-created list + *list = tmp; + + break; + } + + case HG_FREE: + tmp = *list; + free(tmp->l_nodes); + free(tmp); + ret = HG_SUCCESS; + break; + } + + return ret; +} + +hg_return_t +hg_proc_ADM_adhoc_resources_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_adhoc_resources_t* res = (ADM_adhoc_resources_t*) data; + ADM_adhoc_resources_t tmp = NULL; + hg_size_t res_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_adhoc_resources object we need to + // send + res_length = *res ? sizeof(adm_adhoc_resources) : 0; + ret = hg_proc_hg_size_t(proc, &res_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!res_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_transfer object, + // write it to the mercury buffer + tmp = *res; + + ret = hg_proc_adm_adhoc_resources(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_transfer object + ret = hg_proc_hg_size_t(proc, &res_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!res_length) { + *res = NULL; + break; + } + + // if the received adm_adhoc_resources object was not NULL, read + // each of its fields from the mercury buffer + tmp = (adm_adhoc_resources*) calloc(1, sizeof(adm_adhoc_resources)); + + ret = hg_proc_adm_adhoc_resources(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *res = tmp; + break; + + case HG_FREE: + tmp = *res; + free(tmp); + break; + } + + return ret; +} diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index e8b44482..dc33b76b 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -145,22 +145,25 @@ typedef struct adm_adhoc_context { ADM_adhoc_mode_t c_mode; /** The adhoc storage system access type */ ADM_adhoc_access_t c_access; - /** The number of nodes for the adhoc storage system */ - uint32_t c_nodes; + /** The resources assigned for the adhoc storage system */ + ADM_adhoc_resources_t c_resources; /** The adhoc storage system walltime */ uint32_t c_walltime; /** Whether the adhoc storage system should flush data in the background */ bool c_should_bg_flush; } adm_adhoc_context; +hg_return_t +hg_proc_ADM_adhoc_resources_t(hg_proc_t proc, void* data); + // clang-format off MERCURY_GEN_STRUCT_PROC( adm_adhoc_context, // NOLINT - ((hg_int32_t) (c_mode)) - ((hg_int32_t) (c_access)) - ((hg_uint32_t) (c_nodes)) - ((hg_uint32_t) (c_walltime)) - ((hg_bool_t) (c_should_bg_flush)) + ((hg_int32_t) (c_mode)) + ((hg_int32_t) (c_access)) + ((ADM_adhoc_resources_t) (c_resources)) + ((hg_uint32_t) (c_walltime)) + ((hg_bool_t) (c_should_bg_flush)) ) // clang-format on @@ -189,15 +192,25 @@ typedef struct adm_storage { hg_return_t hg_proc_ADM_storage_t(hg_proc_t proc, void* data); -typedef struct adm_storage_resources { - // TODO: undefined for now - int32_t placeholder; -} adm_storage_resources; + +struct adm_node_list { + /** An array of nodes */ + adm_node* l_nodes; + /** The length of the array */ + size_t l_length; +}; + +hg_return_t +hg_proc_ADM_node_list_t(hg_proc_t proc, void* data); + +typedef struct adm_adhoc_resources { + ADM_node_list_t r_nodes; +} adm_adhoc_resources; // clang-format off MERCURY_GEN_STRUCT_PROC( - adm_storage_resources, // NOLINT - ((hg_int32_t) (placeholder)) + adm_adhoc_resources, // NOLINT + ((ADM_node_list_t) (r_nodes)) ); // clang-format on diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 8b2af228..4b7db666 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -457,7 +457,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_return_t set_io_resources(const server& srv, ADM_job_t job, ADM_storage_t tier, - ADM_storage_resources_t resources) { + ADM_adhoc_resources_t resources) { (void) srv; (void) job; (void) tier; diff --git a/src/lib/admire.h b/src/lib/admire.h index 972a8d03..11f58f97 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -243,7 +243,7 @@ ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, */ ADM_return_t ADM_set_io_resources(ADM_server_t server, ADM_job_t job, ADM_storage_t tier, - ADM_storage_resources_t resources); + ADM_adhoc_resources_t resources); /** diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 4e3c59c9..332c900e 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -95,7 +95,7 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_return_t set_io_resources(const server& srv, ADM_job_t job, ADM_storage_t tier, - ADM_storage_resources_t resources); + ADM_adhoc_resources_t resources); ADM_return_t get_transfer_priority(const server& srv, ADM_job_t job, ADM_transfer_t transfer, diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 5a36a08d..732c84b1 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -184,7 +184,7 @@ ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_return_t ADM_set_io_resources(ADM_server_t server, ADM_job_t job, ADM_storage_t tier, - ADM_storage_resources_t resources) { + ADM_adhoc_resources_t resources) { const admire::server srv{server}; -- GitLab From 481cbbcf47c6c62ba282aa42dcbcfa4b5e771c84 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Fri, 7 Oct 2022 12:28:55 +0200 Subject: [PATCH 2/2] ADM_register_job, ADM_update_job: Add new ADM_job_resources_t argument --- examples/c/ADM_cancel_transfer.c | 9 ++- examples/c/ADM_connect_data_operation.c | 9 ++- examples/c/ADM_define_data_operation.c | 9 ++- examples/c/ADM_deploy_adhoc_storage.c | 9 ++- examples/c/ADM_finalize_data_operation.c | 9 ++- examples/c/ADM_get_pending_transfers.c | 9 ++- examples/c/ADM_get_qos_constraints.c | 9 ++- examples/c/ADM_get_statistics.c | 9 ++- examples/c/ADM_get_transfer_priority.c | 9 ++- .../c/ADM_link_transfer_to_data_operation.c | 9 ++- examples/c/ADM_register_adhoc_storage.c | 9 ++- examples/c/ADM_register_job.c | 9 ++- examples/c/ADM_remove_adhoc_storage.c | 9 ++- examples/c/ADM_remove_job.c | 9 ++- examples/c/ADM_set_dataset_information.c | 9 ++- examples/c/ADM_set_io_resources.c | 9 ++- examples/c/ADM_set_qos_constraints.c | 9 ++- examples/c/ADM_set_transfer_priority.c | 9 ++- examples/c/ADM_transfer_datasets.c | 9 ++- examples/c/ADM_update_adhoc_storage.c | 9 ++- examples/c/ADM_update_job.c | 11 ++- examples/cxx/ADM_register_adhoc_storage.cpp | 5 +- examples/cxx/ADM_register_job.cpp | 5 +- examples/cxx/ADM_transfer_datasets.cpp | 5 +- examples/cxx/ADM_update_job.cpp | 9 ++- src/common/api/admire_types.h | 27 +++++++ src/common/api/admire_types.hpp | 22 ++++++ src/common/api/convert.cpp | 11 +++ src/common/api/convert.hpp | 25 +++++++ src/common/api/types.cpp | 73 +++++++++++++++++++ src/common/net/proto/rpc_types.c | 72 ++++++++++++++++++ src/common/net/proto/rpc_types.h | 17 +++++ src/lib/admire.cpp | 11 +-- src/lib/admire.h | 8 +- src/lib/admire.hpp | 6 +- src/lib/c_wrapper.cpp | 10 ++- src/lib/detail/impl.cpp | 23 ++++-- src/lib/detail/impl.hpp | 6 +- src/scord/rpc_handlers.cpp | 10 ++- 39 files changed, 480 insertions(+), 56 deletions(-) diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 0e086bc4..14831a40 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_connect_data_operation.c b/examples/c/ADM_connect_data_operation.c index 0145532a..38cc1daa 100644 --- a/examples/c/ADM_connect_data_operation.c +++ b/examples/c/ADM_connect_data_operation.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -64,11 +67,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_define_data_operation.c b/examples/c/ADM_define_data_operation.c index b1d5a324..ac952793 100644 --- a/examples/c/ADM_define_data_operation.c +++ b/examples/c/ADM_define_data_operation.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -46,6 +47,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -53,6 +56,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -69,7 +76,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 94b88da2..a882a80e 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_finalize_data_operation.c b/examples/c/ADM_finalize_data_operation.c index a54e320c..cc6bb5a1 100644 --- a/examples/c/ADM_finalize_data_operation.c +++ b/examples/c/ADM_finalize_data_operation.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -46,6 +47,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -65,11 +68,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_get_pending_transfers.c b/examples/c/ADM_get_pending_transfers.c index 37359ea7..fe320b25 100644 --- a/examples/c/ADM_get_pending_transfers.c +++ b/examples/c/ADM_get_pending_transfers.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_get_qos_constraints.c b/examples/c/ADM_get_qos_constraints.c index 3cb0df77..36679095 100644 --- a/examples/c/ADM_get_qos_constraints.c +++ b/examples/c/ADM_get_qos_constraints.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -64,11 +67,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_get_statistics.c b/examples/c/ADM_get_statistics.c index 5500bcea..294cc6c2 100644 --- a/examples/c/ADM_get_statistics.c +++ b/examples/c/ADM_get_statistics.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -64,11 +67,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index 3007b7ba..c243fea9 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index 57657ff6..12f9024d 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -46,6 +47,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -65,11 +68,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index 4145c750..e6b7865e 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -29,6 +29,7 @@ #include "common.h" #define NADHOC_NODES 25 +#define NJOB_NODES 50 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_register_job.c b/examples/c/ADM_register_job.c index 8ab2c28e..6b4b02ca 100644 --- a/examples/c/ADM_register_job.c +++ b/examples/c/ADM_register_job.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index dcd08471..95826ebd 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_remove_job.c b/examples/c/ADM_remove_job.c index f8c68f58..979acc06 100644 --- a/examples/c/ADM_remove_job.c +++ b/examples/c/ADM_remove_job.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_set_dataset_information.c b/examples/c/ADM_set_dataset_information.c index a207e06d..c43169f6 100644 --- a/examples/c/ADM_set_dataset_information.c +++ b/examples/c/ADM_set_dataset_information.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -46,6 +47,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -65,11 +68,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_set_io_resources.c b/examples/c/ADM_set_io_resources.c index 1854d9e7..9baed9a5 100644 --- a/examples/c/ADM_set_io_resources.c +++ b/examples/c/ADM_set_io_resources.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -64,11 +67,15 @@ main(int argc, char* argv[]) { ADM_storage_t st = ADM_storage_create("foobar", ADM_STORAGE_GEKKOFS, ctx); assert(st); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_job_requirements_t reqs = ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_set_qos_constraints.c b/examples/c/ADM_set_qos_constraints.c index 543f1b8e..20cc389c 100644 --- a/examples/c/ADM_set_qos_constraints.c +++ b/examples/c/ADM_set_qos_constraints.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index c543df56..77875968 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index 841071f8..f157c2ba 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -48,6 +49,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -55,6 +58,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -71,7 +78,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index b0a20137..bd876193 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " diff --git a/examples/c/ADM_update_job.c b/examples/c/ADM_update_job.c index 8e692d4d..be0b22b6 100644 --- a/examples/c/ADM_update_job.c +++ b/examples/c/ADM_update_job.c @@ -28,6 +28,7 @@ #include #include "common.h" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -45,6 +46,8 @@ main(int argc, char* argv[]) { ADM_server_t server = ADM_server_create("tcp", argv[1]); ADM_job_t job; + ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES); + assert(job_nodes); ADM_node_t* adhoc_nodes = prepare_nodes(NADHOC_NODES); assert(adhoc_nodes); ADM_dataset_t* inputs = prepare_datasets("input-dataset-%d", NINPUTS); @@ -52,6 +55,10 @@ main(int argc, char* argv[]) { ADM_dataset_t* outputs = prepare_datasets("output-dataset-%d", NOUTPUTS); assert(outputs); + ADM_job_resources_t job_resources = + ADM_job_resources_create(job_nodes, NJOB_NODES); + assert(job_resources); + ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); @@ -68,7 +75,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_create(inputs, NINPUTS, outputs, NOUTPUTS, st); assert(reqs); - ADM_return_t ret = ADM_register_job(server, reqs, &job); + ADM_return_t ret = ADM_register_job(server, job_resources, reqs, &job); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_register_job() remote procedure not completed " @@ -99,7 +106,7 @@ main(int argc, char* argv[]) { ADM_job_requirements_t new_reqs = ADM_job_requirements_create( new_inputs, NINPUTS, new_outputs, NOUTPUTS, st); - ret = ADM_update_job(server, job, new_reqs); + ret = ADM_update_job(server, job, job_resources, new_reqs); if(ret != ADM_SUCCESS) { fprintf(stdout, "ADM_update_job() remote procedure not completed " diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index c8bf6bbf..542acad2 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -26,6 +26,7 @@ #include #include "common.hpp" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -42,6 +43,7 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); @@ -62,7 +64,8 @@ main(int argc, char* argv[]) { ADM_return_t ret = ADM_SUCCESS; try { - const auto job = admire::register_job(server, reqs); + const auto job = admire::register_job( + server, admire::job::resources{job_nodes}, reqs); const auto adhoc_storage = admire::register_adhoc_storage( server, job, user_id, adhoc_storage_ctx); } catch(const std::exception& e) { diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index 771b8f1f..c75c06b0 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -26,6 +26,7 @@ #include #include "common.hpp" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -41,6 +42,7 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); @@ -54,7 +56,8 @@ main(int argc, char* argv[]) { admire::job_requirements reqs(inputs, outputs, std::move(p)); try { - [[maybe_unused]] const auto job = admire::register_job(server, reqs); + [[maybe_unused]] const auto job = admire::register_job( + server, admire::job::resources{job_nodes}, reqs); // do something with job diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 74f3f1ff..c48ac934 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -26,6 +26,7 @@ #include #include "common.hpp" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -44,6 +45,7 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); @@ -62,7 +64,8 @@ main(int argc, char* argv[]) { admire::job_requirements reqs(inputs, outputs, std::move(p)); try { - const auto job = admire::register_job(server, reqs); + const auto job = admire::register_job( + server, admire::job::resources{job_nodes}, reqs); const auto transfer = admire::transfer_datasets( server, job, sources, targets, qos_limits, mapping); diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index 01e70d37..70bc1c8a 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -26,6 +26,7 @@ #include #include "common.hpp" +#define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 @@ -41,6 +42,7 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; + const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); @@ -67,10 +69,11 @@ main(int argc, char* argv[]) { admire::job_requirements new_reqs{new_inputs, new_outputs, std::move(p2)}; try { - [[maybe_unused]] const auto job = admire::register_job(server, reqs); + [[maybe_unused]] const auto job = admire::register_job( + server, admire::job::resources{job_nodes}, reqs); - [[maybe_unused]] ADM_return_t ret = - admire::update_job(server, job, new_reqs); + [[maybe_unused]] ADM_return_t ret = admire::update_job( + server, job, admire::job::resources{job_nodes}, new_reqs); fmt::print( stdout, diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index d4ad7564..10e419bd 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -72,6 +72,9 @@ typedef struct adm_job* ADM_job_t; /* Jobs */ /* ----------------------------------------------------- */ +/** Information about resources assigned to a job */ +typedef struct adm_job_resources* ADM_job_resources_t; + /** I/O stats from a job */ typedef struct { // TODO: empty for now @@ -272,6 +275,30 @@ ADM_node_list_destroy(ADM_node_list_t list); /* Jobs */ /* ----------------------------------------------------- */ +/** + * Create an ADM_JOB_RESOURCES from information about storage resources. + * + * @remark ADM_JOB_RESOURCES need to be freed by calling + * ADM_job_resources_destroy(). + * + * @param[in] nodes An array of ADM_NODES describing the nodes assigned + * by the job_storage. + * @param[in] nodes_len The number of ADM_NODES stored in nodes. + * + * @return A valid ADM_JOB_RESOURCES, or NULL in case of failure + */ +ADM_job_resources_t +ADM_job_resources_create(ADM_node_t nodes[], size_t nodes_len); + +/** + * Destroy a ADM_JOB_RESOURCES created by ADM_job_resources_create(). + * + * @param[in] res A valid ADM_JOB_RESOURCES + * @return ADM_SUCCESS or corresponding ADM error code + */ +ADM_return_t +ADM_job_resources_destroy(ADM_job_resources_t res); + /** * Create a JOB_REQUIREMENTS from user-provided information. * diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index b3fe951d..b6ad7136 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -82,6 +82,17 @@ private: struct job { + struct resources { + explicit resources(std::vector nodes); + explicit resources(ADM_job_resources_t res); + + std::vector + nodes() const; + + private: + std::vector m_nodes; + }; + explicit job(job_id id); explicit job(ADM_job_t job); job(const job&) noexcept; @@ -700,6 +711,17 @@ struct fmt::formatter : formatter { } }; +template <> +struct fmt::formatter : formatter { + // parse is inherited from formatter. + template + auto + format(const admire::job::resources& r, FormatContext& ctx) const { + const auto str = fmt::format("{{nodes: {}}}", r.nodes()); + return formatter::format(str, ctx); + } +}; + template <> struct fmt::formatter : formatter { // parse is inherited from formatter. diff --git a/src/common/api/convert.cpp b/src/common/api/convert.cpp index de070c65..8f6e5f80 100644 --- a/src/common/api/convert.cpp +++ b/src/common/api/convert.cpp @@ -160,6 +160,17 @@ convert(ADM_dataset_list_t list) { return rv; } +managed_ctype +convert(const job::resources& res) { + + auto managed_nodes = as_ctype_array(res.nodes()); + + ADM_job_resources_t c_res = ADM_job_resources_create(managed_nodes.data(), + managed_nodes.size()); + + return managed_ctype{c_res, std::move(managed_nodes)}; +} + managed_ctype convert(const admire::job_requirements& reqs) { diff --git a/src/common/api/convert.hpp b/src/common/api/convert.hpp index 59692e98..7d1e84a7 100644 --- a/src/common/api/convert.hpp +++ b/src/common/api/convert.hpp @@ -63,6 +63,9 @@ convert(ADM_dataset_t datasets[], size_t datasets_len); std::vector convert(ADM_dataset_list_t list); +managed_ctype +convert(const job::resources& res); + managed_ctype convert(const admire::job_requirements& reqs); @@ -279,6 +282,28 @@ struct admire::api::managed_ctype { m_list; }; +template <> +struct admire::api::managed_ctype { + + explicit managed_ctype(ADM_job_resources_t res, + managed_ctype_array&& nodes) + : m_adhoc_resources(res), m_nodes(std::move(nodes)) {} + + ADM_job_resources_t + get() const { + return m_adhoc_resources.get(); + } + + ADM_job_resources_t + release() { + return m_adhoc_resources.release(); + } + + scord::utils::ctype_ptr + m_adhoc_resources; + managed_ctype_array m_nodes; +}; + template <> struct admire::api::managed_ctype { diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 90aa40ef..3319471a 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -694,6 +694,62 @@ ADM_pfs_context_destroy(ADM_pfs_context_t ctx) { return ret; } +ADM_job_resources_t +ADM_job_resources_create(ADM_node_t nodes[], size_t nodes_len) { + + struct adm_job_resources* adm_job_resources = + (struct adm_job_resources*) malloc(sizeof(*adm_job_resources)); + + const char* error_msg = NULL; + ADM_node_list_t nodes_list = NULL; + + if(!adm_job_resources) { + error_msg = "Could not allocate ADM_job_resources_t"; + goto cleanup_on_error; + } + + nodes_list = ADM_node_list_create(nodes, nodes_len); + + if(!nodes_list) { + error_msg = "Could not allocate ADM_job_resources_t"; + goto cleanup_on_error; + } + + adm_job_resources->r_nodes = nodes_list; + + return adm_job_resources; + +cleanup_on_error: + if(error_msg) { + LOGGER_ERROR(error_msg); + } + + if(adm_job_resources) { + ADM_job_resources_destroy(adm_job_resources); + } + + return NULL; +} + +ADM_return_t +ADM_job_resources_destroy(ADM_job_resources_t res) { + + ADM_return_t ret = ADM_SUCCESS; + + if(!res) { + LOGGER_ERROR("Invalid ADM_job_resources_t") + return ADM_EBADARGS; + } + + if(res->r_nodes) { + ADM_node_list_destroy(res->r_nodes); + } + + free(res); + return ret; +} + + ADM_job_requirements_t ADM_job_requirements_create(ADM_dataset_t inputs[], size_t inputs_len, ADM_dataset_t outputs[], size_t outputs_len, @@ -1059,6 +1115,23 @@ private: job_id m_id; }; +job::resources::resources(std::vector nodes) + : m_nodes(std::move(nodes)) {} + +job::resources::resources(ADM_job_resources_t res) { + assert(res->r_nodes); + m_nodes.reserve(res->r_nodes->l_length); + + for(size_t i = 0; i < res->r_nodes->l_length; ++i) { + m_nodes.emplace_back(res->r_nodes->l_nodes[i].n_hostname); + } +} + +std::vector +job::resources::nodes() const { + return m_nodes; +} + job::job(job_id id) : m_pimpl(std::make_unique(id)) {} job::job(ADM_job_t job) : job::job(job->j_id) {} diff --git a/src/common/net/proto/rpc_types.c b/src/common/net/proto/rpc_types.c index 8ab8e6ec..8a2aa664 100644 --- a/src/common/net/proto/rpc_types.c +++ b/src/common/net/proto/rpc_types.c @@ -963,3 +963,75 @@ hg_proc_ADM_adhoc_resources_t(hg_proc_t proc, void* data) { return ret; } + +hg_return_t +hg_proc_ADM_job_resources_t(hg_proc_t proc, void* data) { + + hg_return_t ret = HG_SUCCESS; + ADM_job_resources_t* res = (ADM_job_resources_t*) data; + ADM_job_resources_t tmp = NULL; + hg_size_t res_length = 0; + + switch(hg_proc_get_op(proc)) { + + case HG_ENCODE: + // find out the length of the adm_job_resources object we need to + // send + res_length = *res ? sizeof(adm_job_resources) : 0; + ret = hg_proc_hg_size_t(proc, &res_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!res_length) { + return HG_SUCCESS; + } + + // if we actually need to send an adm_transfer object, + // write it to the mercury buffer + tmp = *res; + + ret = hg_proc_adm_job_resources(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + break; + + case HG_DECODE: + // find out the length of the adm_transfer object + ret = hg_proc_hg_size_t(proc, &res_length); + + if(ret != HG_SUCCESS) { + break; + } + + if(!res_length) { + *res = NULL; + break; + } + + // if the received adm_job_resources object was not NULL, read + // each of its fields from the mercury buffer + tmp = (adm_job_resources*) calloc(1, sizeof(adm_job_resources)); + + ret = hg_proc_adm_job_resources(proc, tmp); + + if(ret != HG_SUCCESS) { + break; + } + + // return the newly-created ctx + *res = tmp; + break; + + case HG_FREE: + tmp = *res; + free(tmp); + break; + } + + return ret; +} diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index dc33b76b..d4826fe8 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -263,6 +263,18 @@ MERCURY_GEN_STRUCT_PROC( ); // clang-format on +/** The resources assigned to a job */ +typedef struct adm_job_resources { + ADM_node_list_t r_nodes; +} adm_job_resources; + +// clang-format off +MERCURY_GEN_STRUCT_PROC( + adm_job_resources, // NOLINT + ((ADM_node_list_t) (r_nodes)) +); +// clang-format on + // clang-format off MERCURY_GEN_PROC( @@ -271,9 +283,13 @@ MERCURY_GEN_PROC( ((int32_t) (retval)) ); +hg_return_t +hg_proc_ADM_job_resources_t(hg_proc_t proc, void* data); + /// ADM_register_job MERCURY_GEN_PROC( ADM_register_job_in_t, + ((ADM_job_resources_t) (job_resources)) ((adm_job_requirements) (reqs)) ); @@ -288,6 +304,7 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_update_job_in_t, ((ADM_job_t) (job)) + ((ADM_job_resources_t) (job_resources)) ((adm_job_requirements) (reqs)) ); diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 4b7db666..e94ad7f7 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -209,9 +209,10 @@ ping(const server& srv) { admire::job -register_job(const server& srv, const job_requirements& reqs) { +register_job(const server& srv, const job::resources& resources, + const job_requirements& reqs) { - const auto rv = detail::register_job(srv, reqs); + const auto rv = detail::register_job(srv, resources, reqs); if(!rv) { throw std::runtime_error(fmt::format("ADM_register_job() error: {}", @@ -219,12 +220,12 @@ register_job(const server& srv, const job_requirements& reqs) { } return rv.value(); - } ADM_return_t -update_job(const server& srv, const job& job, const job_requirements& reqs) { - return detail::update_job(srv, job, reqs); +update_job(const server& srv, const job& job, + const job::resources& job_resources, const job_requirements& reqs) { + return detail::update_job(srv, job, job_resources, reqs); } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index 11f58f97..355da02e 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -68,17 +68,19 @@ ADM_ping(ADM_server_t server); * ADM_remove_job(). * * @param[in] server The server to which the request is directed + * @param[in] res The resources for the job. * @param[in] reqs The requirements for the job. * @param[out] job An ADM_JOB referring to the newly-registered job. * @return Returns ADM_SUCCESS if the remote procedure has completed * successfully. */ ADM_return_t -ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, - ADM_job_t* job); +ADM_register_job(ADM_server_t server, ADM_job_resources_t res, + ADM_job_requirements_t reqs, ADM_job_t* job); ADM_return_t -ADM_update_job(ADM_server_t server, ADM_job_t job, ADM_job_requirements_t reqs); +ADM_update_job(ADM_server_t server, ADM_job_t job, + ADM_job_resources_t job_resources, ADM_job_requirements_t reqs); ADM_return_t ADM_remove_job(ADM_server_t server, ADM_job_t job); diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 332c900e..d44275cb 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -48,10 +48,12 @@ void ping(const server& srv); admire::job -register_job(const server& srv, const job_requirements& reqs); +register_job(const server& srv, const job::resources& job_resources, + const job_requirements& reqs); ADM_return_t -update_job(const server& srv, const job&, const job_requirements& reqs); +update_job(const server& srv, const job&, const job::resources& job_resources, + const job_requirements& reqs); ADM_return_t remove_job(const server& srv, const job& job); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 732c84b1..cf375e0c 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -43,13 +43,14 @@ ADM_ping(ADM_server_t server) { } ADM_return_t -ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, - ADM_job_t* job) { +ADM_register_job(ADM_server_t server, ADM_job_resources_t resources, + ADM_job_requirements_t reqs, ADM_job_t* job) { const admire::server srv{server}; const auto rv = - admire::detail::register_job(srv, admire::job_requirements{reqs}); + admire::detail::register_job(srv, admire::job::resources{resources}, + admire::job_requirements{reqs}); if(!rv) { return rv.error(); @@ -62,11 +63,12 @@ ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, ADM_return_t ADM_update_job(ADM_server_t server, ADM_job_t job, - ADM_job_requirements_t reqs) { + ADM_job_resources_t job_resources, ADM_job_requirements_t reqs) { const admire::server srv{server}; return admire::update_job(srv, admire::job{job}, + admire::job::resources{job_resources}, admire::job_requirements{reqs}); } diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 49ce4597..e8188d7a 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -197,7 +197,9 @@ ping(const server& srv) { } tl::expected -register_job(const admire::server& srv, const admire::job_requirements& reqs) { +register_job(const admire::server& srv, + const admire::job::resources& job_resources, + const admire::job_requirements& reqs) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -205,13 +207,14 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_requirements: {}}}", + "body: {{job_resources: {}, job_requirements: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), reqs); + std::quoted(rpc_client.self_address()), job_resources, reqs); + auto rpc_job_resources = api::convert(job_resources); auto rpc_reqs = api::convert(reqs); - ADM_register_job_in_t in{*rpc_reqs.get()}; + ADM_register_job_in_t in{rpc_job_resources.get(), *rpc_reqs.get()}; ADM_register_job_out_t out; const auto rpc = endp.call("ADM_register_job", &in, &out); @@ -235,7 +238,8 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { } admire::error_code -update_job(const server& srv, const job& job, const job_requirements& reqs) { +update_job(const server& srv, const job& job, + const job::resources& job_resources, const job_requirements& reqs) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -243,14 +247,17 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job: {}, job_requirements: {}}}", + "body: {{job: {}, job_resources: {}, job_requirements: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), job, reqs); + std::quoted(rpc_client.self_address()), job, job_resources, + reqs); const auto rpc_job = api::convert(job); + const auto rpc_job_resources = api::convert(job_resources); const auto rpc_reqs = api::convert(reqs); - ADM_update_job_in_t in{rpc_job.get(), *rpc_reqs.get()}; + ADM_update_job_in_t in{rpc_job.get(), rpc_job_resources.get(), + *rpc_reqs.get()}; ADM_update_job_out_t out; const auto rpc = endp.call("ADM_update_job", &in, &out); diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 662f4e47..2a5eb1b1 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -35,10 +35,12 @@ admire::error_code ping(const server& srv); tl::expected -register_job(const server& srv, const job_requirements& reqs); +register_job(const server& srv, const job::resources& job_resources, + const job_requirements& reqs); admire::error_code -update_job(const server& srv, const job& job, const job_requirements& reqs); +update_job(const server& srv, const job& job, + const job::resources& job_resources, const job_requirements& reqs); admire::error_code remove_job(const server& srv, const job& job); diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 73c380f6..d236c32e 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -98,12 +98,13 @@ ADM_register_job(hg_handle_t h) { assert(ret == HG_SUCCESS); const admire::job_requirements reqs(&in.reqs); + const admire::job::resources job_resources(in.job_resources); const auto id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_requirements: {}}}", + "body: {{job_resources: {}, job_requirements: {}}}", id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), - reqs); + job_resources, reqs); const auto job = admire::job{42}; @@ -147,13 +148,14 @@ ADM_update_job(hg_handle_t h) { assert(ret == HG_SUCCESS); const admire::job job(in.job); + const admire::job::resources job_resources(in.job_resources); const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job: {}, job_requirements: {}}}", + "body: {{job: {}, job_resources: {}, job_requirements: {}}}", id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job, - reqs); + job_resources, reqs); admire::error_code rv = ADM_SUCCESS; out.op_id = id; -- GitLab