From 27279c27e6f4ea34e903e15ff46cf11454be7ecb Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 24 Jul 2023 12:06:36 +0200 Subject: [PATCH 1/7] WIP WIP tests Lock and scheduling output Updated Thread Move scheduling to processing thread (one shot) Thallium threading Updated stager_address Stager logger Added stager contact point in scord log remove stager_address remove stager_addrss from cpp --- examples/c/ADM_cancel_transfer.c | 1 + examples/c/ADM_connect_data_operation.c | 2 +- examples/c/ADM_define_data_operation.c | 2 +- examples/c/ADM_deploy_adhoc_storage.c | 2 +- examples/c/ADM_finalize_data_operation.c | 1 + examples/c/ADM_get_pending_transfers.c | 2 +- examples/c/ADM_get_qos_constraints.c | 2 +- examples/c/ADM_get_statistics.c | 2 +- examples/c/ADM_get_transfer_priority.c | 2 +- .../c/ADM_link_transfer_to_data_operation.c | 2 +- examples/c/ADM_register_adhoc_storage.c | 1 + examples/c/ADM_register_job.c | 2 + examples/c/ADM_remove_adhoc_storage.c | 2 +- examples/c/ADM_remove_job.c | 3 + examples/c/ADM_set_dataset_information.c | 2 +- examples/c/ADM_set_io_resources.c | 2 +- examples/c/ADM_set_qos_constraints.c | 2 +- examples/c/ADM_set_transfer_priority.c | 2 +- examples/c/ADM_terminate_adhoc_storage.c | 2 +- examples/c/ADM_transfer_datasets.c | 2 + examples/c/ADM_update_adhoc_storage.c | 1 + examples/c/ADM_update_job.c | 1 + examples/cxx/ADM_deploy_adhoc_storage.cpp | 2 + examples/cxx/ADM_in_situ_ops.cpp | 3 +- examples/cxx/ADM_in_transit_ops.cpp | 3 +- examples/cxx/ADM_register_adhoc_storage.cpp | 2 +- examples/cxx/ADM_register_job.cpp | 1 + examples/cxx/ADM_remove_adhoc_storage.cpp | 1 + examples/cxx/ADM_remove_job.cpp | 1 + examples/cxx/ADM_terminate_adhoc_storage.cpp | 1 + examples/cxx/ADM_transfer_datasets.cpp | 1 + examples/cxx/ADM_transfer_update.cpp | 2 + examples/cxx/ADM_update_adhoc_storage.cpp | 1 + plugins/slurm/defaults.h.in | 3 + plugins/slurm/slurmadmcli.c | 13 +++ src/lib/scord/types.hpp | 8 +- src/lib/types.c | 5 ++ src/scord/rpc_server.cpp | 80 +++++++++++++++++-- src/scord/rpc_server.hpp | 34 ++++++++ src/scord/transfer_manager.hpp | 27 +++++-- 40 files changed, 195 insertions(+), 33 deletions(-) diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 76fff6aa..3f9075e4 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -69,6 +69,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_connect_data_operation.c b/examples/c/ADM_connect_data_operation.c index f860dd80..a4be478f 100644 --- a/examples/c/ADM_connect_data_operation.c +++ b/examples/c/ADM_connect_data_operation.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_define_data_operation.c b/examples/c/ADM_define_data_operation.c index 6f08e8b8..13c332b2 100644 --- a/examples/c/ADM_define_data_operation.c +++ b/examples/c/ADM_define_data_operation.c @@ -69,7 +69,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index fdc5b331..66854b24 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_finalize_data_operation.c b/examples/c/ADM_finalize_data_operation.c index 2605cc69..80035b77 100644 --- a/examples/c/ADM_finalize_data_operation.c +++ b/examples/c/ADM_finalize_data_operation.c @@ -65,6 +65,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_pending_transfers.c b/examples/c/ADM_get_pending_transfers.c index 7f9e49d3..1c8edd8a 100644 --- a/examples/c/ADM_get_pending_transfers.c +++ b/examples/c/ADM_get_pending_transfers.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_qos_constraints.c b/examples/c/ADM_get_qos_constraints.c index f3e0b8f8..ca04e7b7 100644 --- a/examples/c/ADM_get_qos_constraints.c +++ b/examples/c/ADM_get_qos_constraints.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_statistics.c b/examples/c/ADM_get_statistics.c index 5ce8a715..4c482672 100644 --- a/examples/c/ADM_get_statistics.c +++ b/examples/c/ADM_get_statistics.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index 0442740e..bbf14cda 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index cbadc2a6..ef61a6de 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index 106251a0..fb9ba1f9 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -48,6 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_register_job.c b/examples/c/ADM_register_job.c index e20a3fae..5fd01aaf 100644 --- a/examples/c/ADM_register_job.c +++ b/examples/c/ADM_register_job.c @@ -63,6 +63,8 @@ main(int argc, char* argv[]) { ADM_dataset_route_t* outputs = NULL; ADM_dataset_route_t* expected_outputs = NULL; + // stager information + // Let's prepare all the information required by the API calls. // ADM_register_job() often requires an adhoc storage to have been // registered onto the system, so let's prepare first the data required diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index 797f1d56..72c3265d 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_remove_job.c b/examples/c/ADM_remove_job.c index 08c8d076..19631d7e 100644 --- a/examples/c/ADM_remove_job.c +++ b/examples/c/ADM_remove_job.c @@ -69,6 +69,9 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); + + + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_dataset_information.c b/examples/c/ADM_set_dataset_information.c index 3c5b77ca..b955c8ec 100644 --- a/examples/c/ADM_set_dataset_information.c +++ b/examples/c/ADM_set_dataset_information.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_io_resources.c b/examples/c/ADM_set_io_resources.c index de5547fd..baf17f84 100644 --- a/examples/c/ADM_set_io_resources.c +++ b/examples/c/ADM_set_io_resources.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_qos_constraints.c b/examples/c/ADM_set_qos_constraints.c index f5929ae8..f170b96d 100644 --- a/examples/c/ADM_set_qos_constraints.c +++ b/examples/c/ADM_set_qos_constraints.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index 3d4bb8dc..a04ac42b 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_terminate_adhoc_storage.c b/examples/c/ADM_terminate_adhoc_storage.c index fa58bda3..f728c6da 100644 --- a/examples/c/ADM_terminate_adhoc_storage.c +++ b/examples/c/ADM_terminate_adhoc_storage.c @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index e1bc1d80..574c19b7 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -69,6 +69,8 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); + + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index dfe2bccf..82b91aff 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -48,6 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; + ADM_node_t* adhoc_nodes = NULL; ADM_node_t* new_adhoc_nodes = NULL; diff --git a/examples/c/ADM_update_job.c b/examples/c/ADM_update_job.c index 401e76d4..1c94c36f 100644 --- a/examples/c/ADM_update_job.c +++ b/examples/c/ADM_update_job.c @@ -68,6 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/cxx/ADM_deploy_adhoc_storage.cpp b/examples/cxx/ADM_deploy_adhoc_storage.cpp index 0944ee1a..cc4bdb57 100644 --- a/examples/cxx/ADM_deploy_adhoc_storage.cpp +++ b/examples/cxx/ADM_deploy_adhoc_storage.cpp @@ -50,6 +50,8 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; + + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_in_situ_ops.cpp b/examples/cxx/ADM_in_situ_ops.cpp index 563cf0bc..8ea97e36 100644 --- a/examples/cxx/ADM_in_situ_ops.cpp +++ b/examples/cxx/ADM_in_situ_ops.cpp @@ -45,7 +45,8 @@ main(int argc, char* argv[]) { fmt::print( stdout, "Calling ADM_in_situ_ops remote procedure on {} -> access method: {} ...\n", - cli_args.controller_address, argv[2]); + cli_args.controller_address, + argv[2]); ADM_in_situ_ops_in_t in; in.in_situ = argv[2]; ADM_in_situ_ops_out_t out; diff --git a/examples/cxx/ADM_in_transit_ops.cpp b/examples/cxx/ADM_in_transit_ops.cpp index 8e743799..dfe6de1f 100644 --- a/examples/cxx/ADM_in_transit_ops.cpp +++ b/examples/cxx/ADM_in_transit_ops.cpp @@ -45,7 +45,8 @@ main(int argc, char* argv[]) { fmt::print( stdout, "Calling ADM_in_transit_ops remote procedure on {} -> access method: {} ...\n", - cli_args.controller_address, argv[2]); + cli_args.controller_address, + argv[2]); ADM_in_transit_ops_in_t in; in.in_transit = argv[2]; ADM_in_transit_ops_out_t out; diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index d2c07d40..faf20f08 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); - + std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index 2da8de73..ff3e45ea 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -53,6 +53,7 @@ main(int argc, char* argv[]) { prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_remove_adhoc_storage.cpp b/examples/cxx/ADM_remove_adhoc_storage.cpp index 05d9d8b0..9c840bef 100644 --- a/examples/cxx/ADM_remove_adhoc_storage.cpp +++ b/examples/cxx/ADM_remove_adhoc_storage.cpp @@ -50,6 +50,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_remove_job.cpp b/examples/cxx/ADM_remove_job.cpp index 408a16ef..b659554e 100644 --- a/examples/cxx/ADM_remove_job.cpp +++ b/examples/cxx/ADM_remove_job.cpp @@ -48,6 +48,7 @@ main(int argc, char* argv[]) { prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_terminate_adhoc_storage.cpp b/examples/cxx/ADM_terminate_adhoc_storage.cpp index 737e1d4f..4ff1a5be 100644 --- a/examples/cxx/ADM_terminate_adhoc_storage.cpp +++ b/examples/cxx/ADM_terminate_adhoc_storage.cpp @@ -50,6 +50,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 1028658c..645c12c2 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -61,6 +61,7 @@ main(int argc, char* argv[]) { const auto mapping = scord::transfer::mapping::n_to_n; std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index 060633b1..0aea57a0 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -61,6 +61,8 @@ main(int argc, char* argv[]) { const auto mapping = scord::transfer::mapping::n_to_n; std::string name = "adhoc_storage_42"; + + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_update_adhoc_storage.cpp b/examples/cxx/ADM_update_adhoc_storage.cpp index a7b88deb..1b9d1a67 100644 --- a/examples/cxx/ADM_update_adhoc_storage.cpp +++ b/examples/cxx/ADM_update_adhoc_storage.cpp @@ -50,6 +50,7 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/plugins/slurm/defaults.h.in b/plugins/slurm/defaults.h.in index 9de23bc6..f2bb3572 100644 --- a/plugins/slurm/defaults.h.in +++ b/plugins/slurm/defaults.h.in @@ -41,6 +41,9 @@ #define CARGO_PROTO_DEFAULT SCORD_PROTO_DEFAULT #define CARGO_PORT_DEFAULT 62000 +#define CARGO_PROTO_DEFAULT SCORD_PROTO_DEFAULT +#define CARGO_PORT_DEFAULT 62000 + // clang-format on #endif // SCORD_SLURM_PLUGIN_DEFAULTS_H diff --git a/plugins/slurm/slurmadmcli.c b/plugins/slurm/slurmadmcli.c index 61d10078..0cb39d0c 100644 --- a/plugins/slurm/slurmadmcli.c +++ b/plugins/slurm/slurmadmcli.c @@ -459,6 +459,10 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, cfg.scordctl_info.proto, ADM_node_get_hostname(ctl_node), cfg.scordctl_info.port); + cfg.cargo_info.addr = margo_address_create(cfg.cargo_info.proto, + ADM_node_get_hostname(ctl_node), + cfg.cargo_info.port); + if(!cfg.scordctl_info.addr) { slurm_error("%s: failed to compute address scordctl server", plugin_name); @@ -480,6 +484,15 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, cfg.scord_info.port); slurm_debug("%s: %s: scordctl_info:", plugin_name, __func__); + slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, + cfg.cargo_info.addr); + slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, + cfg.cargo_info.proto); + slurm_debug("%s: %s: port: %d,", plugin_name, __func__, + cfg.cargo_info.port); + + + slurm_debug("%s: %s: cargo_info:", plugin_name, __func__); slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, cfg.scordctl_info.addr); slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index ec767ccb..6ec23f13 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -106,7 +106,7 @@ struct error_code { template void serialize(Archive&& ar) { - ar & m_value; + ar& m_value; } private: @@ -231,7 +231,7 @@ struct adhoc_storage { template void serialize(Archive&& ar) { - ar & m_nodes; + ar& m_nodes; } private: @@ -350,7 +350,7 @@ struct pfs_storage { template void serialize(Archive&& ar) { - ar & m_mount_point; + ar& m_mount_point; } private: @@ -412,7 +412,7 @@ struct job { template void serialize(Archive&& ar) { - ar & m_nodes; + ar& m_nodes; } private: diff --git a/src/lib/types.c b/src/lib/types.c index 1e426909..d1ca7637 100644 --- a/src/lib/types.c +++ b/src/lib/types.c @@ -822,6 +822,11 @@ ADM_adhoc_context_create(const char* ctl_address, const char* stager_address, LOGGER_ERROR("The address to the controller cannot be NULL"); return NULL; } + /* We may continue if there is no transfers.. */ + if(!stg_address) { + LOGGER_ERROR("The stager address is null"); + } + if(!stager_address) { LOGGER_ERROR("The address to the stager cannot be NULL"); diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 8ec74eb6..45e35bd9 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -29,6 +29,7 @@ #include #include #include "rpc_server.hpp" +#include template constexpr std::optional @@ -756,9 +757,29 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, resp.error_code(), resp.value_or_none()); + // TODO: create a transfer in transfer manager + // We need the contact point, and different qos + + if(const auto transfer_result = + m_transfer_manager.create(tx_id.value(), stager_address, limits); + !transfer_result.has_value()) { + LOGGER_ERROR( + "rpc id: {} error_msg: \"Error creating transfer_storage: {}\"", + rpc.id(), transfer_result.error()); + ec = transfer_result.error(); + } + + req.respond(resp); } +void +rpc_server::start_scheduler() { + + thallium::xstream es = thallium::xstream::self(); + thallium::managed th = + es.make_thread([this]() { scheduler_runnable((void*) this); }); +} void rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, @@ -775,25 +796,70 @@ rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, scord::error_code ec; - // TODO: generate a global ID for the transfer and contact Cargo to - // actually request it - const auto resp = response_with_id{rpc.id(), ec, transfer_id}; LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); - // TODO: create a transfer in transfer manager - // We need the contact point, and different qos - ec = m_transfer_manager.update(transfer_id, obtained_bw); if(ec.no_such_entity) { LOGGER_ERROR( "rpc id: {} error_msg: \"Error updating transfer_storage\"", rpc.id()); } + req.respond(resp); + // Wake Up Scheduling thread, as the status has changed + start_scheduler(); +} +void +rpc_server::scheduler_runnable(void* arg) { + scord::rpc_server* server = ((scord::rpc_server*) arg); - req.respond(resp); + auto scheduling = server->scheduler_update(); + LOGGER_INFO("Internal Size: {}", scheduling.size()); + // Call expand/shrink with the info +} + + +std::vector> +rpc_server::scheduler_update() { + std::vector> return_set; + const auto threshold = 0.1f; + m_transfer_manager.lock(); + const auto transfer = m_transfer_manager.transfer(); + + for(const auto& tr_unit : transfer) { + const auto tr_info = tr_unit.second.get(); + auto bw = tr_info->obtained_bw(); + if(bw == -1) { + continue; + } + + LOGGER_DEBUG("update for unit {} - {} >? {}", tr_unit.first, + tr_info->qos().front().value(), bw); + + auto qos = tr_info->qos().front().value(); + if(bw + bw * threshold > qos) { + // Send decrease / slow signal to cargo + LOGGER_DEBUG("Action for unit {} --> Decrease {}", tr_unit.first, + tr_info->contact_point()); + std::pair entity = + std::make_pair(tr_info->contact_point(), -1); + return_set.push_back(entity); + + } else if(bw - bw * threshold < qos) { + // Send increase / speed up signal to cargo + LOGGER_DEBUG("Action for unit {} --> Increase {}", tr_unit.first, + tr_info->contact_point()); + std::pair entity = + std::make_pair(tr_info->contact_point(), +1); + return_set.push_back(entity); + } + // Remove from next computations + tr_info->obtained_bw(-1); + } + m_transfer_manager.unlock(); + return return_set; } diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index ef6b066c..3a713ad1 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -26,6 +26,7 @@ #define SCORD_RPC_SERVER_HPP #include +#include #include #include #include "job_manager.hpp" @@ -109,6 +110,39 @@ private: transfer_update(const network::request& req, uint64_t transfer_id, float obtained_bw); + /** + * @brief Function that schedules transfers + * + * @param arg , is a pointer to rpc_server to access structures. + */ + static void + scheduler_runnable(void* arg); + /** + * @brief Starts the scheduler thread for transfers + * + */ + void + start_scheduler(); + + job_manager m_job_manager; + adhoc_storage_manager m_adhoc_manager; + pfs_storage_manager m_pfs_manager; + transfer_manager m_transfer_manager; + + +public: + /** + * @brief Generates scheduling information, a set of pairs (contact point, + * and action) + * + * It causes a lock-unlock of the transfer_manager structure. + * + * @return a vector with a string contact_point and an action encoded in a + * integer (-1, or 1) + */ + std::vector> + scheduler_update(); + job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp index 6abb74f2..7cf75da6 100644 --- a/src/scord/transfer_manager.hpp +++ b/src/scord/transfer_manager.hpp @@ -49,7 +49,7 @@ struct transfer_manager { abt::unique_lock lock(m_transfer_mutex); - if(const auto it = m_transfer.find(id); it == m_transfer.end()) { + if(const auto it = m_transfer.find(tx_id); it == m_transfer.end()) { const auto& [it_transfer, inserted] = m_transfer.emplace( id, std::make_shared< internal::transfer_metadata>( @@ -105,15 +105,32 @@ struct transfer_manager { abt::unique_lock lock(m_transfer_mutex); - if(const auto it = m_transfer.find(id); it != m_transfer.end()) { - auto nh = m_transfer.extract(it); - return nh.mapped(); + if(m_transfer.count(id) != 0) { + m_transfer.erase(id); + return scord::error_code::success; } + LOGGER_ERROR("Transfer '{}' was not registered or was already deleted", id); - return tl::make_unexpected(scord::error_code::no_such_entity); + return scord::error_code::no_such_entity; + } + + std::unordered_map> + transfer() { + return m_transfer; + } + + void + lock() { + m_transfer_mutex.lock(); + } + + void + unlock() { + m_transfer_mutex.unlock(); } private: -- GitLab From c6120d89fdcb257c592781cff9fdac6b15270ea8 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 14 Nov 2023 17:59:29 +0100 Subject: [PATCH 2/7] Recover examples --- examples/c/ADM_cancel_transfer.c | 1 - examples/c/ADM_connect_data_operation.c | 2 +- examples/c/ADM_define_data_operation.c | 2 +- examples/c/ADM_deploy_adhoc_storage.c | 2 +- examples/c/ADM_finalize_data_operation.c | 1 - examples/c/ADM_get_pending_transfers.c | 2 +- examples/c/ADM_get_qos_constraints.c | 2 +- examples/c/ADM_get_statistics.c | 2 +- examples/c/ADM_get_transfer_priority.c | 2 +- examples/c/ADM_link_transfer_to_data_operation.c | 2 +- examples/c/ADM_register_adhoc_storage.c | 1 - examples/c/ADM_register_job.c | 2 -- examples/c/ADM_remove_adhoc_storage.c | 2 +- examples/c/ADM_remove_job.c | 3 --- examples/c/ADM_set_dataset_information.c | 2 +- examples/c/ADM_set_io_resources.c | 2 +- examples/c/ADM_set_qos_constraints.c | 2 +- examples/c/ADM_set_transfer_priority.c | 2 +- examples/c/ADM_terminate_adhoc_storage.c | 2 +- examples/c/ADM_transfer_datasets.c | 2 -- examples/c/ADM_update_adhoc_storage.c | 1 - examples/c/ADM_update_job.c | 1 - examples/cxx/ADM_deploy_adhoc_storage.cpp | 2 -- examples/cxx/ADM_in_situ_ops.cpp | 3 +-- examples/cxx/ADM_in_transit_ops.cpp | 3 +-- examples/cxx/ADM_register_adhoc_storage.cpp | 2 +- examples/cxx/ADM_register_job.cpp | 1 - examples/cxx/ADM_remove_adhoc_storage.cpp | 1 - examples/cxx/ADM_remove_job.cpp | 1 - examples/cxx/ADM_terminate_adhoc_storage.cpp | 1 - examples/cxx/ADM_transfer_datasets.cpp | 1 - examples/cxx/ADM_transfer_update.cpp | 2 -- examples/cxx/ADM_update_adhoc_storage.cpp | 1 - 33 files changed, 17 insertions(+), 41 deletions(-) diff --git a/examples/c/ADM_cancel_transfer.c b/examples/c/ADM_cancel_transfer.c index 3f9075e4..76fff6aa 100644 --- a/examples/c/ADM_cancel_transfer.c +++ b/examples/c/ADM_cancel_transfer.c @@ -69,7 +69,6 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_connect_data_operation.c b/examples/c/ADM_connect_data_operation.c index a4be478f..f860dd80 100644 --- a/examples/c/ADM_connect_data_operation.c +++ b/examples/c/ADM_connect_data_operation.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_define_data_operation.c b/examples/c/ADM_define_data_operation.c index 13c332b2..6f08e8b8 100644 --- a/examples/c/ADM_define_data_operation.c +++ b/examples/c/ADM_define_data_operation.c @@ -69,7 +69,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 66854b24..fdc5b331 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_finalize_data_operation.c b/examples/c/ADM_finalize_data_operation.c index 80035b77..2605cc69 100644 --- a/examples/c/ADM_finalize_data_operation.c +++ b/examples/c/ADM_finalize_data_operation.c @@ -65,7 +65,6 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_pending_transfers.c b/examples/c/ADM_get_pending_transfers.c index 1c8edd8a..7f9e49d3 100644 --- a/examples/c/ADM_get_pending_transfers.c +++ b/examples/c/ADM_get_pending_transfers.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_qos_constraints.c b/examples/c/ADM_get_qos_constraints.c index ca04e7b7..f3e0b8f8 100644 --- a/examples/c/ADM_get_qos_constraints.c +++ b/examples/c/ADM_get_qos_constraints.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_statistics.c b/examples/c/ADM_get_statistics.c index 4c482672..5ce8a715 100644 --- a/examples/c/ADM_get_statistics.c +++ b/examples/c/ADM_get_statistics.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_get_transfer_priority.c b/examples/c/ADM_get_transfer_priority.c index bbf14cda..0442740e 100644 --- a/examples/c/ADM_get_transfer_priority.c +++ b/examples/c/ADM_get_transfer_priority.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_link_transfer_to_data_operation.c b/examples/c/ADM_link_transfer_to_data_operation.c index ef61a6de..cbadc2a6 100644 --- a/examples/c/ADM_link_transfer_to_data_operation.c +++ b/examples/c/ADM_link_transfer_to_data_operation.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_register_adhoc_storage.c b/examples/c/ADM_register_adhoc_storage.c index fb9ba1f9..106251a0 100644 --- a/examples/c/ADM_register_adhoc_storage.c +++ b/examples/c/ADM_register_adhoc_storage.c @@ -48,7 +48,6 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_register_job.c b/examples/c/ADM_register_job.c index 5fd01aaf..e20a3fae 100644 --- a/examples/c/ADM_register_job.c +++ b/examples/c/ADM_register_job.c @@ -63,8 +63,6 @@ main(int argc, char* argv[]) { ADM_dataset_route_t* outputs = NULL; ADM_dataset_route_t* expected_outputs = NULL; - // stager information - // Let's prepare all the information required by the API calls. // ADM_register_job() often requires an adhoc storage to have been // registered onto the system, so let's prepare first the data required diff --git a/examples/c/ADM_remove_adhoc_storage.c b/examples/c/ADM_remove_adhoc_storage.c index 72c3265d..797f1d56 100644 --- a/examples/c/ADM_remove_adhoc_storage.c +++ b/examples/c/ADM_remove_adhoc_storage.c @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_remove_job.c b/examples/c/ADM_remove_job.c index 19631d7e..08c8d076 100644 --- a/examples/c/ADM_remove_job.c +++ b/examples/c/ADM_remove_job.c @@ -69,9 +69,6 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - - - ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_dataset_information.c b/examples/c/ADM_set_dataset_information.c index b955c8ec..3c5b77ca 100644 --- a/examples/c/ADM_set_dataset_information.c +++ b/examples/c/ADM_set_dataset_information.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_io_resources.c b/examples/c/ADM_set_io_resources.c index baf17f84..de5547fd 100644 --- a/examples/c/ADM_set_io_resources.c +++ b/examples/c/ADM_set_io_resources.c @@ -64,7 +64,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_qos_constraints.c b/examples/c/ADM_set_qos_constraints.c index f170b96d..f5929ae8 100644 --- a/examples/c/ADM_set_qos_constraints.c +++ b/examples/c/ADM_set_qos_constraints.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_set_transfer_priority.c b/examples/c/ADM_set_transfer_priority.c index a04ac42b..3d4bb8dc 100644 --- a/examples/c/ADM_set_transfer_priority.c +++ b/examples/c/ADM_set_transfer_priority.c @@ -68,7 +68,7 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_t adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - + ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_terminate_adhoc_storage.c b/examples/c/ADM_terminate_adhoc_storage.c index f728c6da..fa58bda3 100644 --- a/examples/c/ADM_terminate_adhoc_storage.c +++ b/examples/c/ADM_terminate_adhoc_storage.c @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - + ADM_node_t* adhoc_nodes = NULL; ADM_adhoc_resources_t adhoc_resources = NULL; ADM_adhoc_context_t adhoc_ctx = NULL; diff --git a/examples/c/ADM_transfer_datasets.c b/examples/c/ADM_transfer_datasets.c index 574c19b7..e1bc1d80 100644 --- a/examples/c/ADM_transfer_datasets.c +++ b/examples/c/ADM_transfer_datasets.c @@ -69,8 +69,6 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - - ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/c/ADM_update_adhoc_storage.c b/examples/c/ADM_update_adhoc_storage.c index 82b91aff..dfe2bccf 100644 --- a/examples/c/ADM_update_adhoc_storage.c +++ b/examples/c/ADM_update_adhoc_storage.c @@ -48,7 +48,6 @@ main(int argc, char* argv[]) { // adhoc information const char* adhoc_name = "adhoc_storage_42"; - ADM_node_t* adhoc_nodes = NULL; ADM_node_t* new_adhoc_nodes = NULL; diff --git a/examples/c/ADM_update_job.c b/examples/c/ADM_update_job.c index 1c94c36f..401e76d4 100644 --- a/examples/c/ADM_update_job.c +++ b/examples/c/ADM_update_job.c @@ -68,7 +68,6 @@ main(int argc, char* argv[]) { ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); assert(adhoc_resources); - ADM_adhoc_context_t ctx = ADM_adhoc_context_create( cli_args.controller_address, cli_args.data_stager_address, ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false); diff --git a/examples/cxx/ADM_deploy_adhoc_storage.cpp b/examples/cxx/ADM_deploy_adhoc_storage.cpp index cc4bdb57..0944ee1a 100644 --- a/examples/cxx/ADM_deploy_adhoc_storage.cpp +++ b/examples/cxx/ADM_deploy_adhoc_storage.cpp @@ -50,8 +50,6 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; - - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_in_situ_ops.cpp b/examples/cxx/ADM_in_situ_ops.cpp index 8ea97e36..563cf0bc 100644 --- a/examples/cxx/ADM_in_situ_ops.cpp +++ b/examples/cxx/ADM_in_situ_ops.cpp @@ -45,8 +45,7 @@ main(int argc, char* argv[]) { fmt::print( stdout, "Calling ADM_in_situ_ops remote procedure on {} -> access method: {} ...\n", - cli_args.controller_address, - argv[2]); + cli_args.controller_address, argv[2]); ADM_in_situ_ops_in_t in; in.in_situ = argv[2]; ADM_in_situ_ops_out_t out; diff --git a/examples/cxx/ADM_in_transit_ops.cpp b/examples/cxx/ADM_in_transit_ops.cpp index dfe6de1f..8e743799 100644 --- a/examples/cxx/ADM_in_transit_ops.cpp +++ b/examples/cxx/ADM_in_transit_ops.cpp @@ -45,8 +45,7 @@ main(int argc, char* argv[]) { fmt::print( stdout, "Calling ADM_in_transit_ops remote procedure on {} -> access method: {} ...\n", - cli_args.controller_address, - argv[2]); + cli_args.controller_address, argv[2]); ADM_in_transit_ops_in_t in; in.in_transit = argv[2]; ADM_in_transit_ops_out_t out; diff --git a/examples/cxx/ADM_register_adhoc_storage.cpp b/examples/cxx/ADM_register_adhoc_storage.cpp index faf20f08..d2c07d40 100644 --- a/examples/cxx/ADM_register_adhoc_storage.cpp +++ b/examples/cxx/ADM_register_adhoc_storage.cpp @@ -48,7 +48,7 @@ main(int argc, char* argv[]) { const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); - + std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, diff --git a/examples/cxx/ADM_register_job.cpp b/examples/cxx/ADM_register_job.cpp index ff3e45ea..2da8de73 100644 --- a/examples/cxx/ADM_register_job.cpp +++ b/examples/cxx/ADM_register_job.cpp @@ -53,7 +53,6 @@ main(int argc, char* argv[]) { prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); std::string name = "adhoc_storage_42"; - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_remove_adhoc_storage.cpp b/examples/cxx/ADM_remove_adhoc_storage.cpp index 9c840bef..05d9d8b0 100644 --- a/examples/cxx/ADM_remove_adhoc_storage.cpp +++ b/examples/cxx/ADM_remove_adhoc_storage.cpp @@ -50,7 +50,6 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_remove_job.cpp b/examples/cxx/ADM_remove_job.cpp index b659554e..408a16ef 100644 --- a/examples/cxx/ADM_remove_job.cpp +++ b/examples/cxx/ADM_remove_job.cpp @@ -48,7 +48,6 @@ main(int argc, char* argv[]) { prepare_routes("{}-exp-output-dataset-{}", NEXPOUTPUTS); std::string name = "adhoc_storage_42"; - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_terminate_adhoc_storage.cpp b/examples/cxx/ADM_terminate_adhoc_storage.cpp index 4ff1a5be..737e1d4f 100644 --- a/examples/cxx/ADM_terminate_adhoc_storage.cpp +++ b/examples/cxx/ADM_terminate_adhoc_storage.cpp @@ -50,7 +50,6 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_transfer_datasets.cpp b/examples/cxx/ADM_transfer_datasets.cpp index 645c12c2..1028658c 100644 --- a/examples/cxx/ADM_transfer_datasets.cpp +++ b/examples/cxx/ADM_transfer_datasets.cpp @@ -61,7 +61,6 @@ main(int argc, char* argv[]) { const auto mapping = scord::transfer::mapping::n_to_n; std::string name = "adhoc_storage_42"; - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index 0aea57a0..060633b1 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -61,8 +61,6 @@ main(int argc, char* argv[]) { const auto mapping = scord::transfer::mapping::n_to_n; std::string name = "adhoc_storage_42"; - - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, diff --git a/examples/cxx/ADM_update_adhoc_storage.cpp b/examples/cxx/ADM_update_adhoc_storage.cpp index 1b9d1a67..a7b88deb 100644 --- a/examples/cxx/ADM_update_adhoc_storage.cpp +++ b/examples/cxx/ADM_update_adhoc_storage.cpp @@ -50,7 +50,6 @@ main(int argc, char* argv[]) { const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); std::string name = "adhoc_storage_42"; - const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, cli_args.data_stager_address, -- GitLab From 174615620cb9080ff7c68ee835235ba3f972fd32 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 14 Nov 2023 18:04:23 +0100 Subject: [PATCH 3/7] merge errors --- plugins/slurm/defaults.h.in | 3 --- src/lib/scord/types.hpp | 8 ++++---- src/lib/types.c | 5 ----- src/scord/transfer_manager.hpp | 27 +++++---------------------- 4 files changed, 9 insertions(+), 34 deletions(-) diff --git a/plugins/slurm/defaults.h.in b/plugins/slurm/defaults.h.in index f2bb3572..9de23bc6 100644 --- a/plugins/slurm/defaults.h.in +++ b/plugins/slurm/defaults.h.in @@ -41,9 +41,6 @@ #define CARGO_PROTO_DEFAULT SCORD_PROTO_DEFAULT #define CARGO_PORT_DEFAULT 62000 -#define CARGO_PROTO_DEFAULT SCORD_PROTO_DEFAULT -#define CARGO_PORT_DEFAULT 62000 - // clang-format on #endif // SCORD_SLURM_PLUGIN_DEFAULTS_H diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 6ec23f13..ec767ccb 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -106,7 +106,7 @@ struct error_code { template void serialize(Archive&& ar) { - ar& m_value; + ar & m_value; } private: @@ -231,7 +231,7 @@ struct adhoc_storage { template void serialize(Archive&& ar) { - ar& m_nodes; + ar & m_nodes; } private: @@ -350,7 +350,7 @@ struct pfs_storage { template void serialize(Archive&& ar) { - ar& m_mount_point; + ar & m_mount_point; } private: @@ -412,7 +412,7 @@ struct job { template void serialize(Archive&& ar) { - ar& m_nodes; + ar & m_nodes; } private: diff --git a/src/lib/types.c b/src/lib/types.c index d1ca7637..1e426909 100644 --- a/src/lib/types.c +++ b/src/lib/types.c @@ -822,11 +822,6 @@ ADM_adhoc_context_create(const char* ctl_address, const char* stager_address, LOGGER_ERROR("The address to the controller cannot be NULL"); return NULL; } - /* We may continue if there is no transfers.. */ - if(!stg_address) { - LOGGER_ERROR("The stager address is null"); - } - if(!stager_address) { LOGGER_ERROR("The address to the stager cannot be NULL"); diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp index 7cf75da6..6abb74f2 100644 --- a/src/scord/transfer_manager.hpp +++ b/src/scord/transfer_manager.hpp @@ -49,7 +49,7 @@ struct transfer_manager { abt::unique_lock lock(m_transfer_mutex); - if(const auto it = m_transfer.find(tx_id); it == m_transfer.end()) { + if(const auto it = m_transfer.find(id); it == m_transfer.end()) { const auto& [it_transfer, inserted] = m_transfer.emplace( id, std::make_shared< internal::transfer_metadata>( @@ -105,32 +105,15 @@ struct transfer_manager { abt::unique_lock lock(m_transfer_mutex); - if(m_transfer.count(id) != 0) { - m_transfer.erase(id); - return scord::error_code::success; + if(const auto it = m_transfer.find(id); it != m_transfer.end()) { + auto nh = m_transfer.extract(it); + return nh.mapped(); } - LOGGER_ERROR("Transfer '{}' was not registered or was already deleted", id); - return scord::error_code::no_such_entity; - } - - std::unordered_map> - transfer() { - return m_transfer; - } - - void - lock() { - m_transfer_mutex.lock(); - } - - void - unlock() { - m_transfer_mutex.unlock(); + return tl::make_unexpected(scord::error_code::no_such_entity); } private: -- GitLab From 6c399b429fe8ef8a4fd30c95c001653b8aa3f1f6 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 14 Nov 2023 18:05:05 +0100 Subject: [PATCH 4/7] merge error --- plugins/slurm/slurmadmcli.c | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/plugins/slurm/slurmadmcli.c b/plugins/slurm/slurmadmcli.c index 0cb39d0c..61d10078 100644 --- a/plugins/slurm/slurmadmcli.c +++ b/plugins/slurm/slurmadmcli.c @@ -459,10 +459,6 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, cfg.scordctl_info.proto, ADM_node_get_hostname(ctl_node), cfg.scordctl_info.port); - cfg.cargo_info.addr = margo_address_create(cfg.cargo_info.proto, - ADM_node_get_hostname(ctl_node), - cfg.cargo_info.port); - if(!cfg.scordctl_info.addr) { slurm_error("%s: failed to compute address scordctl server", plugin_name); @@ -484,15 +480,6 @@ scord_register_job(scord_plugin_config_t cfg, scord_nodelist_t nodelist, cfg.scord_info.port); slurm_debug("%s: %s: scordctl_info:", plugin_name, __func__); - slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, - cfg.cargo_info.addr); - slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, - cfg.cargo_info.proto); - slurm_debug("%s: %s: port: %d,", plugin_name, __func__, - cfg.cargo_info.port); - - - slurm_debug("%s: %s: cargo_info:", plugin_name, __func__); slurm_debug("%s: %s: addr: \"%s\",", plugin_name, __func__, cfg.scordctl_info.addr); slurm_debug("%s: %s: proto: \"%s\",", plugin_name, __func__, -- GitLab From 8d29b922f2b6aa36a478dd30a8b028be2684cdb1 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 14 Nov 2023 21:01:51 +0100 Subject: [PATCH 5/7] Recover bw --- examples/cxx/ADM_transfer_update.cpp | 2 ++ examples/cxx/CMakeLists.txt | 2 +- src/scord/rpc_server.cpp | 38 +++++++++------------------- src/scord/rpc_server.hpp | 7 ++--- src/scord/transfer_manager.hpp | 17 +++++++++++++ 5 files changed, 34 insertions(+), 32 deletions(-) diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index 060633b1..c4e1bd24 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -80,10 +80,12 @@ main(int argc, char* argv[]) { const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); + const auto transfer = scord::transfer_datasets( server, job, sources, targets, qos_limits, mapping); scord::transfer_update(server, transfer.id(), 10.0f); + fmt::print(stdout, "ADM_transfer_update() remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 67bd704d..96780d99 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -77,7 +77,7 @@ if(SCORD_BUILD_TESTS) ${SCORD_CTL_ADDRESS_STRING} ${DATA_STAGER_ADDRESS_STRING}) set_tests_properties(run_${TEST_NAME} - PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl" + PROPERTIES FIXTURES_REQUIRED "scord_daemon;scord_ctl;cargo" ENVIRONMENT "${TEST_ENV}") add_test(validate_${TEST_NAME} diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 45e35bd9..9b585202 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -756,20 +756,6 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, resp.error_code(), resp.value_or_none()); - - // TODO: create a transfer in transfer manager - // We need the contact point, and different qos - - if(const auto transfer_result = - m_transfer_manager.create(tx_id.value(), stager_address, limits); - !transfer_result.has_value()) { - LOGGER_ERROR( - "rpc id: {} error_msg: \"Error creating transfer_storage: {}\"", - rpc.id(), transfer_result.error()); - ec = transfer_result.error(); - } - - req.respond(resp); } @@ -830,7 +816,7 @@ rpc_server::scheduler_update() { for(const auto& tr_unit : transfer) { const auto tr_info = tr_unit.second.get(); - auto bw = tr_info->obtained_bw(); + auto bw = tr_info->measured_bandwidth(); if(bw == -1) { continue; } @@ -841,22 +827,22 @@ rpc_server::scheduler_update() { auto qos = tr_info->qos().front().value(); if(bw + bw * threshold > qos) { // Send decrease / slow signal to cargo - LOGGER_DEBUG("Action for unit {} --> Decrease {}", tr_unit.first, - tr_info->contact_point()); - std::pair entity = - std::make_pair(tr_info->contact_point(), -1); - return_set.push_back(entity); + LOGGER_DEBUG("Action for unit {} --> Decrease", tr_unit.first + ); + + tr_info->update(20); + // std::pair entity = + // std::make_pair(tr_info->contact_point(), -1); + //return_set.push_back(entity); } else if(bw - bw * threshold < qos) { // Send increase / speed up signal to cargo - LOGGER_DEBUG("Action for unit {} --> Increase {}", tr_unit.first, - tr_info->contact_point()); - std::pair entity = + LOGGER_DEBUG("Action for unit {} --> Increase", tr_unit.first + ); + /*std::pair entity = std::make_pair(tr_info->contact_point(), +1); - return_set.push_back(entity); + return_set.push_back(entity);*/ } - // Remove from next computations - tr_info->obtained_bw(-1); } m_transfer_manager.unlock(); return return_set; diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 3a713ad1..4c97cda3 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -127,7 +127,7 @@ private: job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; - transfer_manager m_transfer_manager; + transfer_manager m_transfer_manager; public: @@ -143,10 +143,7 @@ public: std::vector> scheduler_update(); - job_manager m_job_manager; - adhoc_storage_manager m_adhoc_manager; - pfs_storage_manager m_pfs_manager; - transfer_manager m_transfer_manager; + }; } // namespace scord diff --git a/src/scord/transfer_manager.hpp b/src/scord/transfer_manager.hpp index 6abb74f2..dfe4fd89 100644 --- a/src/scord/transfer_manager.hpp +++ b/src/scord/transfer_manager.hpp @@ -116,6 +116,23 @@ struct transfer_manager { return tl::make_unexpected(scord::error_code::no_such_entity); } + std::unordered_map< + scord::transfer_id, + std::shared_ptr>> + transfer() { + return m_transfer; + } + + void + lock() { + m_transfer_mutex.lock(); + } + + void + unlock() { + m_transfer_mutex.unlock(); + } + private: mutable abt::shared_mutex m_transfer_mutex; std::unordered_map< -- GitLab From c0e9927b525a30851848fa8aa4a6ce22e6946f13 Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Fri, 17 Nov 2023 14:03:40 +0100 Subject: [PATCH 6/7] Removed transfer_update RPC, cargo workflow finished Added support for gekkofs: etc types --- examples/cxx/ADM_transfer_update.cpp | 17 ++- examples/cxx/CMakeLists.txt | 2 +- spack/packages/scord/package.py | 9 +- src/common/net/server.cpp | 1 + src/common/net/server.hpp | 1 + src/lib/c_wrapper.cpp | 9 -- src/lib/detail/impl.cpp | 32 ----- src/lib/detail/impl.hpp | 3 - src/lib/libscord.cpp | 12 -- src/scord/rpc_server.cpp | 182 ++++++++++++++------------- src/scord/rpc_server.hpp | 30 ++--- 11 files changed, 125 insertions(+), 173 deletions(-) diff --git a/examples/cxx/ADM_transfer_update.cpp b/examples/cxx/ADM_transfer_update.cpp index c4e1bd24..dc335e87 100644 --- a/examples/cxx/ADM_transfer_update.cpp +++ b/examples/cxx/ADM_transfer_update.cpp @@ -75,6 +75,19 @@ main(int argc, char* argv[]) { server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); + + std::vector ins; + std::vector outs; + + scord::dataset in1; + scord::dataset out1; + + in1 = scord::dataset("lustre:/tmp/input-dataset-1"); + out1 = scord::dataset("gekkofs:/tmp/input-dataset-cp"); + + ins.push_back (in1); + outs.push_back (out1); + scord::job::requirements reqs(inputs, outputs, expected_outputs, adhoc_storage); @@ -82,9 +95,9 @@ main(int argc, char* argv[]) { server, scord::job::resources{job_nodes}, reqs, 0); const auto transfer = scord::transfer_datasets( - server, job, sources, targets, qos_limits, mapping); + server, job, ins, outs, qos_limits, mapping); - scord::transfer_update(server, transfer.id(), 10.0f); + // scord::transfer_update(server, transfer.id(), 10.0f); fmt::print(stdout, "ADM_transfer_update() remote procedure completed " "successfully\n"); diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 96780d99..5d47a59b 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority - ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update + ADM_cancel_transfer ADM_get_pending_transfers # qos ADM_set_qos_constraints ADM_get_qos_constraints # data operations diff --git a/spack/packages/scord/package.py b/spack/packages/scord/package.py index 53616ca2..334911b4 100644 --- a/spack/packages/scord/package.py +++ b/spack/packages/scord/package.py @@ -83,18 +83,19 @@ class Scord(CMakePackage): # specific dependencies # v0.2.0+ depends_on("argobots@1.1", when='@0.2.0:') - depends_on("mochi-margo@0.9.8", when='@0.2.0:') - depends_on("mochi-thallium@0.10.1", when='@0.2.0:') + depends_on("mochi-margo@0.9.8:", when='@0.2.0:') + depends_on("mochi-thallium@0.10.1:", when='@0.2.0:') depends_on("boost@1.71 +program_options", when='@0.2.0:') depends_on("redis-plus-plus@1.3.3:", when='@0.2.0:') + depends_on("cargo@0.3.2:", when='@0.3.1:') with when("@0.2.0: +ofi"): depends_on("libfabric@1.14.0 fabrics=sockets,tcp,rxm") - depends_on("mercury@2.1.0 +ofi") + depends_on("mercury@2.1.0: +ofi") with when("@0.2.0: +ucx"): depends_on("ucx@1.12.0") - depends_on("mercury@2.1.0 +ucx") + depends_on("mercury@2.1.0: +ucx") def cmake_args(self): """Setup scord CMake arguments""" diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp index 464a02ea..8e8c9676 100644 --- a/src/common/net/server.cpp +++ b/src/common/net/server.cpp @@ -391,6 +391,7 @@ server::teardown_and_exit() { void server::shutdown() { + m_shutting_down = true; m_network_engine.finalize(); } diff --git a/src/common/net/server.hpp b/src/common/net/server.hpp index 267ac598..f405db48 100644 --- a/src/common/net/server.hpp +++ b/src/common/net/server.hpp @@ -110,6 +110,7 @@ private: protected: thallium::engine m_network_engine; + std::atomic m_shutting_down; private: scord::utils::signal_listener m_signal_listener; diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 71f03dc5..1b9f797a 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -245,15 +245,6 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, return ADM_SUCCESS; } -ADM_return_t -ADM_transfer_update(ADM_server_t server, uint64_t transfer_id, - float obtained_bw) { - - return scord::detail::transfer_update(scord::server{server}, transfer_id, - obtained_bw); -} - - ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index cc37fb09..8d26b4ee 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -531,36 +531,4 @@ transfer_datasets(const server& srv, const job& job, } -scord::error_code -transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { - - network::client rpc_client{srv.protocol()}; - - const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); - - if(const auto& lookup_rv = rpc_client.lookup(srv.address()); - lookup_rv.has_value()) { - const auto& endp = lookup_rv.value(); - - LOGGER_INFO("rpc {:<} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, - transfer_id, obtained_bw); - - if(const auto& call_rv = - endp.call(rpc.name(), transfer_id, obtained_bw); - call_rv.has_value()) { - - const network::generic_response resp{call_rv.value()}; - - LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, - resp.error_code(), resp.op_id()); - - return resp.error_code(); - } - } - - LOGGER_ERROR("rpc call failed"); - return scord::error_code::other; -} - } // namespace scord::detail diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index d26575c1..c1c5f054 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -86,9 +86,6 @@ transfer_datasets(const server& srv, const job& job, const std::vector& limits, transfer::mapping mapping); -scord::error_code -transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw); - } // namespace scord::detail diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 4b341d64..19e3d58e 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -378,18 +378,6 @@ transfer_datasets(const server& srv, const job& job, } -void -transfer_update(const server& srv, uint64_t transfer_id, float obtained_bw) { - - const auto ec = detail::transfer_update(srv, transfer_id, obtained_bw); - - if(!ec) { - throw std::runtime_error( - fmt::format("ADM_transfer_update() error: {}", ec.message())); - } -} - - ADM_return_t set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 9b585202..f98b82f3 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -42,13 +42,45 @@ value_or_none(tl::expected&& e) { using namespace std::literals; +namespace { +cargo::dataset +dataset_process(std::string id) { + + cargo::dataset::type type = cargo::dataset::type::none; + if(id.find("lustre:") != std::string::npos) { + id = id.substr(strlen("lustre:")); + type = cargo::dataset::type::parallel; + } else if(id.find("gekkofs:") != std::string::npos) { + id = id.substr(strlen("gekkofs:")); + type = cargo::dataset::type::posix; + } else if(id.find("hercules:") != std::string::npos) { + id = id.substr(strlen("hercules:")); + type = cargo::dataset::type::hercules; + } else if(id.find("expand:") != std::string::npos) { + id = id.substr(strlen("expand:")); + type = cargo::dataset::type::expand; + } else if(id.find("dataclay:") != std::string::npos) { + id = id.substr(strlen("dataclay:")); + type = cargo::dataset::type::dataclay; + } else + type = cargo::dataset::type::posix; + + return cargo::dataset{id, type}; +} +} // namespace namespace scord { rpc_server::rpc_server(std::string name, std::string address, bool daemonize, std::filesystem::path rundir) : server::server(std::move(name), std::move(address), std::move(daemonize), std::move(rundir)), - provider::provider(m_network_engine, 0) { + provider::provider(m_network_engine, 0), + m_scheduler_ess(thallium::xstream::create()), + m_scheduler_ult( + m_scheduler_ess->make_thread([this]() { scheduler_update(); })) { + + ; + #define EXPAND(rpc_name) "ADM_" #rpc_name##s, &rpc_server::rpc_name @@ -66,11 +98,17 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); provider::define(EXPAND(transfer_datasets)); - provider::define(EXPAND(transfer_update)); #undef EXPAND + m_network_engine.push_prefinalize_callback([this]() { + m_scheduler_ult->join(); + m_scheduler_ult = thallium::managed{}; + m_scheduler_ess->join(); + m_scheduler_ess = thallium::managed{}; + }); } + #define RPC_NAME() ("ADM_"s + __FUNCTION__) void @@ -373,8 +411,8 @@ rpc_server::update_adhoc_storage( name, adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", - child_rpc, adhoc_metadata_ptr->uuid(), - adhoc_storage.type(), adhoc_storage.get_resources()); + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), + adhoc_storage.get_resources()); if(const auto call_rv = endp->call( child_rpc.name(), adhoc_metadata_ptr->uuid(), @@ -470,8 +508,8 @@ rpc_server::deploy_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}, resources: {}}}", - child_rpc, adhoc_metadata_ptr->uuid(), - adhoc_storage.type(), adhoc_storage.get_resources()); + child_rpc, adhoc_metadata_ptr->uuid(), adhoc_storage.type(), + adhoc_storage.get_resources()); if(const auto call_rv = endp->call( rpc.name(), adhoc_metadata_ptr->uuid(), adhoc_storage.type(), @@ -549,8 +587,7 @@ rpc_server::terminate_adhoc_storage(const network::request& req, rpc.add_child(adhoc_storage.context().controller_address()); LOGGER_INFO("rpc {:<} body: {{uuid: {:?}, type: {}}}", child_rpc, - adhoc_metadata_ptr->uuid(), - adhoc_storage.type()); + adhoc_metadata_ptr->uuid(), adhoc_storage.type()); if(const auto call_rv = endp->call(rpc.name(), adhoc_metadata_ptr->uuid(), @@ -725,11 +762,11 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, // TODO: check type of storage tier to enable parallel transfers std::transform(sources.cbegin(), sources.cend(), std::back_inserter(inputs), - [](const auto& src) { return cargo::dataset{src.id()}; }); + [](const auto& src) { return ::dataset_process(src.id()); }); std::transform(targets.cbegin(), targets.cend(), std::back_inserter(outputs), - [](const auto& tgt) { return cargo::dataset{tgt.id()}; }); + [](const auto& tgt) { return ::dataset_process(tgt.id()); }); const auto cargo_tx = cargo::transfer_datasets(srv, inputs, outputs); @@ -759,93 +796,62 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, req.respond(resp); } -void -rpc_server::start_scheduler() { - - thallium::xstream es = thallium::xstream::self(); - thallium::managed th = - es.make_thread([this]() { scheduler_runnable((void*) this); }); -} - -void -rpc_server::transfer_update(const network::request& req, uint64_t transfer_id, - float obtained_bw) { - - using network::get_address; - using network::response_with_id; - using network::rpc_info; - - const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - - LOGGER_INFO("rpc {:>} body: {{transfer_id: {}, obtained_bw: {}}}", rpc, - transfer_id, obtained_bw); - - scord::error_code ec; - - const auto resp = response_with_id{rpc.id(), ec, transfer_id}; - - LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); - - ec = m_transfer_manager.update(transfer_id, obtained_bw); - if(ec.no_such_entity) { - LOGGER_ERROR( - "rpc id: {} error_msg: \"Error updating transfer_storage\"", - rpc.id()); - } - req.respond(resp); - // Wake Up Scheduling thread, as the status has changed - start_scheduler(); -} +/* Scheduling is done each 0.5 s*/ void -rpc_server::scheduler_runnable(void* arg) { - scord::rpc_server* server = ((scord::rpc_server*) arg); - - auto scheduling = server->scheduler_update(); - LOGGER_INFO("Internal Size: {}", scheduling.size()); - // Call expand/shrink with the info -} - - -std::vector> rpc_server::scheduler_update() { std::vector> return_set; const auto threshold = 0.1f; - m_transfer_manager.lock(); - const auto transfer = m_transfer_manager.transfer(); - - for(const auto& tr_unit : transfer) { - const auto tr_info = tr_unit.second.get(); - auto bw = tr_info->measured_bandwidth(); - if(bw == -1) { - continue; + while(!m_shutting_down) { + thallium::thread::self().sleep(m_network_engine, 500); + m_transfer_manager.lock(); + const auto transfer = m_transfer_manager.transfer(); + std::vector v_ids; + for(const auto& tr_unit : transfer) { + const auto tr_info = tr_unit.second.get(); + + // Contact for transfer status + const auto status = tr_info->transfer().status(); + tr_info->update(status.bw()); + auto bw = tr_info->measured_bandwidth(); + auto qos = tr_info->qos().front().value(); + + switch(status.state()) { + case cargo::transfer_state::completed: + LOGGER_INFO("Completed"); + v_ids.push_back(tr_unit.first); + continue; + break; + case cargo::transfer_state::failed: + LOGGER_INFO("Failed"); + v_ids.push_back(tr_unit.first); + continue; + break; + case cargo::transfer_state::pending: + continue; + break; + case cargo::transfer_state::running: + break; + } + if(bw == -1) { + continue; + } + + if(bw + bw * threshold > qos) { + // Send decrease / slow signal to cargo + tr_info->transfer().bw_control(+1); + } else if(bw - bw * threshold < qos) { + // Send increase / speed up signal to cargo + tr_info->transfer().bw_control(-1); + } } + m_transfer_manager.unlock(); - LOGGER_DEBUG("update for unit {} - {} >? {}", tr_unit.first, - tr_info->qos().front().value(), bw); - - auto qos = tr_info->qos().front().value(); - if(bw + bw * threshold > qos) { - // Send decrease / slow signal to cargo - LOGGER_DEBUG("Action for unit {} --> Decrease", tr_unit.first - ); - - tr_info->update(20); - // std::pair entity = - // std::make_pair(tr_info->contact_point(), -1); - //return_set.push_back(entity); - - } else if(bw - bw * threshold < qos) { - // Send increase / speed up signal to cargo - LOGGER_DEBUG("Action for unit {} --> Increase", tr_unit.first - ); - /*std::pair entity = - std::make_pair(tr_info->contact_point(), +1); - return_set.push_back(entity);*/ + // Remove all failed/done transfers + for(const auto id : v_ids) { + m_transfer_manager.remove(id); } } - m_transfer_manager.unlock(); - return return_set; } diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 4c97cda3..08788fcc 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -106,29 +106,17 @@ private: const std::vector& limits, enum scord::transfer::mapping mapping); - void - transfer_update(const network::request& req, uint64_t transfer_id, - float obtained_bw); - - /** - * @brief Function that schedules transfers - * - * @param arg , is a pointer to rpc_server to access structures. - */ - static void - scheduler_runnable(void* arg); - /** - * @brief Starts the scheduler thread for transfers - * - */ - void - start_scheduler(); job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; transfer_manager m_transfer_manager; + // Dedicated execution stream for the MPI listener ULT + thallium::managed m_scheduler_ess; + // ULT for the MPI listener + thallium::managed m_scheduler_ult; + public: /** @@ -136,14 +124,12 @@ public: * and action) * * It causes a lock-unlock of the transfer_manager structure. + * Is a thread * - * @return a vector with a string contact_point and an action encoded in a - * integer (-1, or 1) + * @return none */ - std::vector> + void scheduler_update(); - - }; } // namespace scord -- GitLab From 6d5b0780a0717ce5bbabc4cbaf63653168673d2b Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Sat, 18 Nov 2023 12:19:13 +0100 Subject: [PATCH 7/7] Solved segfault in scheduler --- examples/CMakeLists.txt | 4 ++-- src/scord/rpc_server.cpp | 15 +++++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 6d47717c..7398808a 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -84,8 +84,8 @@ if(SCORD_BUILD_TESTS) set_tests_properties(start_cargo PROPERTIES FIXTURES_SETUP cargo) - add_test(stop_cargo - ${CMAKE_SOURCE_DIR}/scripts/runner.sh stop TERM cargo.pid) + + add_test(stop_cargo ${CARGO_BIN_INSTALL_DIR}/cargo_shutdown --server ${DATA_STAGER_ADDRESS_STRING}) set_tests_properties(stop_cargo PROPERTIES FIXTURES_CLEANUP cargo) diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index f98b82f3..98d6a62c 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -46,7 +46,7 @@ namespace { cargo::dataset dataset_process(std::string id) { - cargo::dataset::type type = cargo::dataset::type::none; + cargo::dataset::type type = cargo::dataset::type::posix; if(id.find("lustre:") != std::string::npos) { id = id.substr(strlen("lustre:")); type = cargo::dataset::type::parallel; @@ -812,9 +812,6 @@ rpc_server::scheduler_update() { // Contact for transfer status const auto status = tr_info->transfer().status(); - tr_info->update(status.bw()); - auto bw = tr_info->measured_bandwidth(); - auto qos = tr_info->qos().front().value(); switch(status.state()) { case cargo::transfer_state::completed: @@ -833,6 +830,16 @@ rpc_server::scheduler_update() { case cargo::transfer_state::running: break; } + + tr_info->update(status.bw()); + auto bw = tr_info->measured_bandwidth(); + uint64_t qos = 0; + try { + qos = tr_info->qos().front().value(); + } catch(const std::exception& e) { + continue; + } + if(bw == -1) { continue; } -- GitLab