diff --git a/examples/ADM_adhoc_access.cpp b/examples/ADM_adhoc_access.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2d28cf3a65da24872aee2e5e871170363f35bb3c --- /dev/null +++ b/examples/ADM_adhoc_access.cpp @@ -0,0 +1,41 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_adhoc_access \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_access remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_access_in_t in; + in.access = argv[2]; + ADM_adhoc_access_out_t out; + + endp.call("ADM_adhoc_access", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_adhoc_access remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_adhoc_access remote procedure completed successfully\n"); + } +} + diff --git a/examples/ADM_adhoc_background_flush.cpp b/examples/ADM_adhoc_background_flush.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e8b5207139644341cf429c52669ed09fa0c476e9 --- /dev/null +++ b/examples/ADM_adhoc_background_flush.cpp @@ -0,0 +1,60 @@ +#include +#include + +bool +string_to_convert(std::string s) { + if(s == "true" || s == "TRUE" || s == "True") { + return true; + } else if(s == "false" || s == "FALSE" || s == "False") { + return false; + } else { + throw std::invalid_argument("ERROR: Incorrect input value. Please try again.\n"); + } +} + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_adhoc_background_flush \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_background_flush remote procedure on {} -> flush true/false: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_background_flush_in_t in; + + try { + in.b_flush = string_to_convert(argv[2]); + } catch(const std::invalid_argument& ia) { + fmt::print( + stderr, + "ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); + exit(EXIT_FAILURE); + } + + ADM_adhoc_background_flush_out_t out; + + endp.call("ADM_adhoc_background_flush", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_adhoc_background_flush remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_adhoc_background_flush remote procedure completed successfully\n"); + } +} \ No newline at end of file diff --git a/examples/ADM_adhoc_context.cpp b/examples/ADM_adhoc_context.cpp new file mode 100644 index 0000000000000000000000000000000000000000..527d87477997248017ac1c241989d0b7d8c89dc6 --- /dev/null +++ b/examples/ADM_adhoc_context.cpp @@ -0,0 +1,41 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_adhoc_context \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_context remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_context_in_t in; + in.context = argv[2]; + ADM_adhoc_context_out_t out; + + endp.call("ADM_adhoc_context", &in, &out); + + + if(out.ret < 0 || out.adhoc_context < 0) { + fmt::print( + stdout, + "ADM_adhoc_context remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_adhoc_context remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_adhoc_context_id.cpp b/examples/ADM_adhoc_context_id.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cf6705b0a5ccbbfe5070d069a492a9a8592342a7 --- /dev/null +++ b/examples/ADM_adhoc_context_id.cpp @@ -0,0 +1,47 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_adhoc_context_id \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_context_id remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_context_id_in_t in; + + try { + in.context_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + + ADM_adhoc_context_id_out_t out; + + endp.call("ADM_adhoc_context_id", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_adhoc_context_id remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_adhoc_context_id remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_adhoc_distribution.cpp b/examples/ADM_adhoc_distribution.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4379b73501235c27d03081e897a96cb5338dba09 --- /dev/null +++ b/examples/ADM_adhoc_distribution.cpp @@ -0,0 +1,41 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_adhoc_distribution \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_distribution remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_distribution_in_t in; + in.data_distribution = argv[2]; + ADM_adhoc_distribution_out_t out; + + endp.call("ADM_adhoc_distribution", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_adhoc_distribution remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_adhoc_distribution remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_adhoc_nodes.cpp b/examples/ADM_adhoc_nodes.cpp new file mode 100644 index 0000000000000000000000000000000000000000..91b36cf514d75ee669d394598e71f5483c94b4bf --- /dev/null +++ b/examples/ADM_adhoc_nodes.cpp @@ -0,0 +1,46 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_adhoc_nodes \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_nodes remote procedure on {} -> node numbers: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_nodes_in_t in; + + try { + in.nodes = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + + ADM_adhoc_nodes_out_t out; + + endp.call("ADM_adhoc_nodes", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_adhoc_nodes remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print(stdout, + "ADM_adhoc_nodes remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_adhoc_walltime.cpp b/examples/ADM_adhoc_walltime.cpp new file mode 100644 index 0000000000000000000000000000000000000000..49a380718dbeee0402ec2be3b997fab7c9665f40 --- /dev/null +++ b/examples/ADM_adhoc_walltime.cpp @@ -0,0 +1,47 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_adhoc_walltime \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_adhoc_walltime remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_adhoc_walltime_in_t in; + + try { + in.walltime = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + + ADM_adhoc_walltime_out_t out; + + endp.call("ADM_adhoc_walltime", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_adhoc_walltime remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_adhoc_walltime remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_cancel_transfer.cpp b/examples/ADM_cancel_transfer.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f8cee99f4c504bc7cfdf84473ba5ad08bb80d94a --- /dev/null +++ b/examples/ADM_cancel_transfer.cpp @@ -0,0 +1,46 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_cancel_transfer \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_cancel_transfer remote procedure on {} with transfer id {} ...\n", + argv[1], argv[2]); + ADM_cancel_transfer_in_t in; + try { + in.transfer_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_cancel_transfer_out_t out; + + endp.call("ADM_cancel_transfer", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_cancel_transfer remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_cancel_transfer remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_connect_data_operation.cpp b/examples/ADM_connect_data_operation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..26c5118ee408069d1eb11fc753f42694d09b6684 --- /dev/null +++ b/examples/ADM_connect_data_operation.cpp @@ -0,0 +1,75 @@ +#include +#include + +bool +string_to_convert(std::string s) { + if(s == "true" || s == "TRUE" || s == "True") { + return true; + } else if(s == "false" || s == "FALSE" || s == "False") { + return false; + } else { + throw std::invalid_argument( + "ERROR: Incorrect input value. Please try again.\n"); + } +} + +int +main(int argc, char* argv[]) { + + if(argc != 7) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_connect_data_operation \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_connect_data_operation remote procedure on {} with operation id {}, input {}, stream {}, arguments {} and job id {} ...\n", + argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]); + ADM_connect_data_operation_in_t in; + try { + in.operation_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + in.input = argv[3]; + try { + in.stream = string_to_convert(argv[4]); + } catch(const std::invalid_argument& ia) { + fmt::print(stderr, "ERROR: Incorrect input value. Please try again.\n"); + exit(EXIT_FAILURE); + } + in.arguments = argv[5]; + try { + in.job_id = std::stoi(argv[6]); + } catch(const std::exception& e) { + fmt::print( + stderr, + "ERROR: ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); + exit(EXIT_FAILURE); + } + + ADM_connect_data_operation_out_t out; + + endp.call("ADM_connect_data_operation", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_connect_data_operation remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_connect_data_operation remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_define_data_operation.cpp b/examples/ADM_define_data_operation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a5b737d7d81ca5373c49344bf69d325665f5d6ca --- /dev/null +++ b/examples/ADM_define_data_operation.cpp @@ -0,0 +1,51 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 5) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_define_data_operation \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_define_data_operation remote procedure on {} -> {} with operation id {} and arguments {} ...\n", + argv[1], argv[2], argv[3], argv[4]); + + ADM_define_data_operation_in_t in; + in.path = argv[2]; + try { + in.operation_id = std::stoi(argv[3]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + in.arguments = argv[4]; + + ADM_define_data_operation_out_t out; + + endp.call("ADM_define_data_operation", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_define_data_operation remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_define_data_operation remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_finalize_data_operation.cpp b/examples/ADM_finalize_data_operation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7ad3f08477a4722b2f5796395946cbaced86b7a1 --- /dev/null +++ b/examples/ADM_finalize_data_operation.cpp @@ -0,0 +1,46 @@ +#include +#include + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_finalize_data_operation \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_finalize_data_operation remote procedure on {} with operation id {} ...\n", + argv[1], argv[2]); + ADM_finalize_data_operation_in_t in; + try { + in.operation_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_finalize_data_operation_out_t out; + + endp.call("ADM_finalize_data_operation", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_finalize_data_operation remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_finalize_data_operation remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_get_pending_transfers.cpp b/examples/ADM_get_pending_transfers.cpp new file mode 100644 index 0000000000000000000000000000000000000000..99fdb84884fb2e04f6c812d4783dc76661ae7d56 --- /dev/null +++ b/examples/ADM_get_pending_transfers.cpp @@ -0,0 +1,39 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 2) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_get_pending_transfers \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print(stdout, + "Calling ADM_get_pending_transfers remote procedure on {} ...\n", + argv[1]); + ADM_get_pending_transfers_in_t in; + in.value = NULL; + ADM_get_pending_transfers_out_t out; + + endp.call("ADM_get_pending_transfers", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_get_pending_transfers remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_get_pending_transfers remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_get_statistics.cpp b/examples/ADM_get_statistics.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6d08b25d5694a25794128821e806c9430bd00823 --- /dev/null +++ b/examples/ADM_get_statistics.cpp @@ -0,0 +1,53 @@ +#include +#include + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_get_statistics \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_get_statistics remote procedure on {} with job id {} and job step {} ...\n", + argv[1], argv[2], argv[3]); + ADM_get_statistics_in_t in; + try { + in.job_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + try { + in.job_step = std::stoi(argv[3]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + + ADM_get_statistics_out_t out; + + endp.call("ADM_get_statistics", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_get_statistics remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_get_statistics remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_get_transfer_priority.cpp b/examples/ADM_get_transfer_priority.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a09663a378bbe759f51951330b7056892468507d --- /dev/null +++ b/examples/ADM_get_transfer_priority.cpp @@ -0,0 +1,47 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_get_transfer_priority \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_get_transfer_priority remote procedure on {} with transfer id {} ...\n", + argv[1], argv[2]); + ADM_get_transfer_priority_in_t in; + try { + in.transfer_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_get_transfer_priority_out_t out; + + endp.call("ADM_get_transfer_priority", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_get_transfer_priority remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_get_transfer_priority remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_in_situ_ops.cpp b/examples/ADM_in_situ_ops.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b870e6ec3a84205220bdd7d0073404baa32053db --- /dev/null +++ b/examples/ADM_in_situ_ops.cpp @@ -0,0 +1,39 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_in_situ_ops \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_in_situ_ops remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_in_situ_ops_in_t in; + in.in_situ = argv[2]; + ADM_in_situ_ops_out_t out; + + endp.call("ADM_in_situ_ops", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_in_situ_ops remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print(stdout, + "ADM_in_situ_ops remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_in_transit_ops.cpp b/examples/ADM_in_transit_ops.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4eef3e9496ea40980411d79c77a72ce481644a9b --- /dev/null +++ b/examples/ADM_in_transit_ops.cpp @@ -0,0 +1,40 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_in_transit_ops \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_in_transit_ops remote procedure on {} -> access method: {} ...\n", + argv[1], argv[2]); + ADM_in_transit_ops_in_t in; + in.in_transit = argv[2]; + ADM_in_transit_ops_out_t out; + + endp.call("ADM_in_transit_ops", &in, &out); + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_in_transit_ops remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_in_transit_ops remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_inout.cpp b/examples/ADM_inout.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4b88dcdbfe94d09268ab6818a0108fbdea8a3414 --- /dev/null +++ b/examples/ADM_inout.cpp @@ -0,0 +1,40 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_inout \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print(stdout, + "Calling ADM_inout remote procedure on {} : {} -> {} ...\n", + argv[1], argv[2], argv[3]); + ADM_inout_in_t in; + in.origin = argv[2]; + in.target = argv[3]; + ADM_inout_out_t out; + + endp.call("ADM_inout", &in, &out); + + if(out.ret < 0) { + fmt::print(stdout, + "ADM_inout remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print(stdout, + "ADM_inout remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_input.cpp b/examples/ADM_input.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6a3199809fd379e4754c064af21e8b9282be789b --- /dev/null +++ b/examples/ADM_input.cpp @@ -0,0 +1,41 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_input \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print(stdout, + "Calling ADM_input remote procedure on {} : {} -> {} ...\n", + argv[1], argv[2], argv[3]); + ADM_input_in_t in; + in.origin = argv[2]; + in.target = argv[3]; + ADM_input_out_t out; + + endp.call("ADM_input", &in, &out); + + + if(out.ret < 0) { + fmt::print(stdout, + "ADM_input remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print(stdout, + "ADM_input remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_link_transfer_to_data_operation.cpp b/examples/ADM_link_transfer_to_data_operation.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e939a981b31b2e0727caf94c1ad847f610b189f5 --- /dev/null +++ b/examples/ADM_link_transfer_to_data_operation.cpp @@ -0,0 +1,80 @@ +#include +#include + +bool +string_to_convert(std::string s) { + if(s == "true" || s == "TRUE" || s == "True") { + return true; + } else if(s == "false" || s == "FALSE" || s == "False") { + return false; + } else { + throw std::invalid_argument( + "ERROR: Incorrect input value. Please try again.\n"); + } +} + +int +main(int argc, char* argv[]) { + + if(argc != 7) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_link_transfer_to_data_operation \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_link_transfer_to_data_operation remote procedure on {} with operation id {}, transfer id {}, stream {}, arguments {} and job id {} ...\n", + argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]); + ADM_link_transfer_to_data_operation_in_t in; + try { + in.operation_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + try { + in.transfer_id = std::stoi(argv[3]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + try { + in.stream = string_to_convert(argv[4]); + } catch(const std::invalid_argument& ia) { + fmt::print( + stderr, + "ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); + exit(EXIT_FAILURE); + } + in.arguments = argv[5]; + try { + in.job_id = std::stoi(argv[6]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + + ADM_link_transfer_to_data_operation_out_t out; + + endp.call("ADM_link_transfer_to_data_operation", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_link_transfer_to_data_operation remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_link_transfer_to_data_operation remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_output.cpp b/examples/ADM_output.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b1b3c94a7ee4929c90c562ed8fb160b7aac8b737 --- /dev/null +++ b/examples/ADM_output.cpp @@ -0,0 +1,40 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_output \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print(stdout, + "Calling ADM_output remote procedure on {} : {} -> {} ...\n", + argv[1], argv[2], argv[3]); + ADM_output_in_t in; + in.origin = argv[2]; + in.target = argv[3]; + ADM_output_out_t out; + + endp.call("ADM_output", &in, &out); + + if(out.ret < 0) { + fmt::print(stdout, + "ADM_output remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print(stdout, + "ADM_output remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_set_dataset_information.cpp b/examples/ADM_set_dataset_information.cpp new file mode 100644 index 0000000000000000000000000000000000000000..78d5dd5fe9ef720f39f457f7c7f5e653164c26ee --- /dev/null +++ b/examples/ADM_set_dataset_information.cpp @@ -0,0 +1,56 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 5) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_set_dataset_information \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_set_dataset_information remote procedure on {} with resource id {}, info {} and" + " job id {} ...\n", + argv[1], argv[2], argv[3], argv[4]); + ADM_set_dataset_information_in_t in; + try { + in.resource_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + in.info = argv[3]; + try { + in.job_id = std::stoi(argv[4]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_set_dataset_information_out_t out; + + endp.call("ADM_set_dataset_information", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_set_dataset_information remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_set_dataset_information remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_set_io_resources.cpp b/examples/ADM_set_io_resources.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8cbebb1a43c02232ab10eaacea4c05304ce9c103 --- /dev/null +++ b/examples/ADM_set_io_resources.cpp @@ -0,0 +1,55 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 5) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_set_io_resources \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_set_io_resources remote procedure on {} with tier id {}, resources {} and" + " job id {} ...\n", + argv[1], argv[2], argv[3], argv[4]); + ADM_set_io_resources_in_t in; + try { + in.tier_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + in.resources = argv[3]; + try { + in.job_id = std::stoi(argv[4]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_set_io_resources_out_t out; + + endp.call("ADM_set_io_resources", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_set_io_resources remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_set_io_resources remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_set_qos_constraints_pull.cpp b/examples/ADM_set_qos_constraints_pull.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ee57c9fb055a068a9d16afd134870d1ce000fe3b --- /dev/null +++ b/examples/ADM_set_qos_constraints_pull.cpp @@ -0,0 +1,48 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_set_qos_constraints_pull \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_set_qos_constraints_pull remote procedure on {} with scope {} and element id {} ...\n", + argv[1], argv[2], argv[3]); + ADM_set_qos_constraints_pull_in_t in; + in.scope = argv[2]; + try { + in.element_id = std::stoi(argv[3]); + } catch(const std::exception& e) { + fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + + ADM_set_qos_constraints_pull_out_t out; + + endp.call("ADM_set_qos_constraints_pull", &in, &out); + + if(out.ret < 0) { + fmt::print( + stderr, + "ADM_set_qos_constraints_pull remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_set_qos_constraints_pull remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_set_qos_constraints_push.cpp b/examples/ADM_set_qos_constraints_push.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e5c2b1f58735f2e430bb714a1872220d9d8021fc --- /dev/null +++ b/examples/ADM_set_qos_constraints_push.cpp @@ -0,0 +1,51 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 6) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_set_qos_constraints_push \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_set_qos_constraints_push remote procedure on {} with scope {}, QoS class {}, element id {} and class value {} ...\n", + argv[1], argv[2], argv[3], argv[4], argv[5]); + ADM_set_qos_constraints_push_in_t in; + in.scope = argv[2]; + in.qos_class = argv[3]; + try { + in.element_id = std::stoi(argv[4]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + in.class_value = argv[4]; + + ADM_set_qos_constraints_push_out_t out; + + endp.call("ADM_set_qos_constraints_push", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_set_qos_constraints_push remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_set_qos_constraints_push remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_set_transfer_priority.cpp b/examples/ADM_set_transfer_priority.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a82e3c0929ff7d46edad53c897cb64c467c395b0 --- /dev/null +++ b/examples/ADM_set_transfer_priority.cpp @@ -0,0 +1,53 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_set_transfer_priority \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_set_transfer_priority remote procedure on {} with transfer id {} and number of positions {}...\n", + argv[1], argv[2], argv[3]); + ADM_set_transfer_priority_in_t in; + try { + in.transfer_id = std::stoi(argv[2]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + try { + in.n_positions = std::stoi(argv[3]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_set_transfer_priority_out_t out; + + endp.call("ADM_set_transfer_priority", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_set_transfer_priority remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_set_transfer_priority remote procedure completed successfully\n"); + } +} diff --git a/examples/ADM_transfer_dataset.cpp b/examples/ADM_transfer_dataset.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b1dd75f5e025225e8fac200a643f47cebbc4d101 --- /dev/null +++ b/examples/ADM_transfer_dataset.cpp @@ -0,0 +1,54 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 7) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_transfer_dataset " + " \n"); + exit(EXIT_FAILURE); + } + + scord::network::rpc_client rpc_client{"tcp"}; + rpc_client.register_rpcs(); + + + auto endp = rpc_client.lookup(argv[1]); + + fmt::print( + stdout, + "Calling ADM_transfer_dataset remote procedure on {} : {} -> {} using " + " qos constraints {}, distribution {} and job id {} ...\n", + argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]); + ADM_transfer_dataset_in_t in; + in.source = argv[2]; + in.destination = argv[3]; + in.qos_constraints = argv[4]; + in.distribution = argv[5]; + try { + in.job_id = std::stoi(argv[6]); + } catch(const std::exception& e) { + fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + exit(EXIT_FAILURE); + } + ADM_transfer_dataset_out_t out; + + endp.call("ADM_transfer_dataset", &in, &out); + + + if(out.ret < 0) { + fmt::print( + stdout, + "ADM_transfer_dataset remote procedure not completed successfully\n"); + exit(EXIT_FAILURE); + } else { + fmt::print( + stdout, + "ADM_transfer_dataset remote procedure completed successfully\n"); + } +} diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 97ef05ea606b6340d7a6c0f5f09d7625966823ec..884538459e056bb6b7d7f80a5c366d85f95f3b7c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -22,7 +22,18 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_executable(ping) -target_sources(ping PRIVATE ping.cpp) +list(APPEND examples + ping ADM_input ADM_output ADM_inout ADM_adhoc_context ADM_adhoc_context_id + ADM_adhoc_nodes ADM_adhoc_walltime ADM_adhoc_access ADM_adhoc_distribution + ADM_adhoc_background_flush ADM_in_situ_ops ADM_in_transit_ops ADM_transfer_dataset + ADM_set_dataset_information ADM_set_io_resources ADM_get_transfer_priority + ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers + ADM_set_qos_constraints_push ADM_define_data_operation ADM_connect_data_operation + ADM_finalize_data_operation ADM_link_transfer_to_data_operation ADM_get_statistics) + +foreach (example IN LISTS examples) + add_executable(${example}) + target_sources(${example} PRIVATE ${example}.cpp) + target_link_libraries(${example} PUBLIC network_engine fmt::fmt) +endforeach() -target_link_libraries(ping PUBLIC network_engine fmt::fmt) diff --git a/src/network/engine.hpp b/src/network/engine.hpp index 54b99dc0b021fa344ace2fa26181a02df1731dc5..dc8a15d3b7d73eeba990b67faa1969f5b6726924 100644 --- a/src/network/engine.hpp +++ b/src/network/engine.hpp @@ -38,17 +38,18 @@ namespace scord::network { namespace detail { -#define REGISTER_RPC(__mid, __m_rpc_names, __func_name, __in_t, __out_t, __handler, requires_response) \ -{ hg_id_t id = margo_provider_register_name(__mid, __func_name, \ - BOOST_PP_CAT(hg_proc_, __in_t), \ - BOOST_PP_CAT(hg_proc_, __out_t), \ - _handler_for_##__handler, \ - MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL); \ - __m_rpc_names.emplace(__func_name, id); \ - if(!requires_response) { \ - ::margo_registered_disable_response(__mid, id, HG_TRUE); \ - } \ -} +#define REGISTER_RPC(__mid, __m_rpc_names, __func_name, __in_t, __out_t, \ + __handler, requires_response) \ + { \ + hg_id_t id = margo_provider_register_name( \ + __mid, __func_name, BOOST_PP_CAT(hg_proc_, __in_t), \ + BOOST_PP_CAT(hg_proc_, __out_t), _handler_for_##__handler, \ + MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL); \ + __m_rpc_names.emplace(__func_name, id); \ + if(!requires_response) { \ + ::margo_registered_disable_response(__mid, id, HG_TRUE); \ + } \ + } struct margo_context { @@ -103,8 +104,133 @@ struct engine { register_rpcs() { // register RPCs manually for now - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ping", void, void, ping, false); - + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ping", void, + void, ping, false); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_input", + + ADM_input_in_t, ADM_input_out_t, ADM_input, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_output", + ADM_output_in_t, ADM_output_out_t, ADM_output, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_inout", + ADM_inout_in_t, ADM_inout_out_t, ADM_inout, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_context", ADM_adhoc_context_in_t, + ADM_adhoc_context_out_t, ADM_adhoc_context, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t, + ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t, + ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t, + ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_access", ADM_adhoc_access_in_t, + ADM_adhoc_access_out_t, ADM_adhoc_access, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t, + ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_adhoc_background_flush", + ADM_adhoc_background_flush_in_t, + ADM_adhoc_background_flush_out_t, + ADM_adhoc_background_flush, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_in_situ_ops", ADM_in_situ_ops_in_t, + ADM_in_situ_ops_out_t, ADM_in_situ_ops, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_in_transit_ops", ADM_in_transit_ops_in_t, + ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_transfer_dataset", ADM_transfer_dataset_in_t, + ADM_transfer_dataset_out_t, ADM_transfer_dataset, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_set_dataset_information", + ADM_set_dataset_information_in_t, + ADM_set_dataset_information_out_t, + ADM_set_dataset_information, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_set_io_resources", ADM_set_io_resources_in_t, + ADM_set_io_resources_out_t, ADM_set_io_resources, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_get_transfer_priority", + ADM_get_transfer_priority_in_t, + ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_set_transfer_priority", + ADM_set_transfer_priority_in_t, + ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_cancel_transfer", ADM_cancel_transfer_in_t, + ADM_cancel_transfer_out_t, ADM_cancel_transfer, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_get_pending_transfers", + ADM_get_pending_transfers_in_t, + ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_set_qos_constraints_push", + ADM_set_qos_constraints_push_in_t, + ADM_set_qos_constraints_push_out_t, + ADM_set_qos_constraints_push, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_set_qos_constraints_pull", + ADM_set_qos_constraints_pull_in_t, + ADM_set_qos_constraints_pull_out_t, + ADM_set_qos_constraints_pull, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_define_data_operation", + ADM_define_data_operation_in_t, + ADM_define_data_operation_out_t, ADM_define_data_operation, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_connect_data_operation", + ADM_connect_data_operation_in_t, + ADM_connect_data_operation_out_t, + ADM_connect_data_operation, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_finalize_data_operation", + ADM_finalize_data_operation_in_t, + ADM_finalize_data_operation_out_t, + ADM_finalize_data_operation, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_link_transfer_to_data_operation", + ADM_link_transfer_to_data_operation_in_t, + ADM_link_transfer_to_data_operation_out_t, + ADM_link_transfer_to_data_operation, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_get_statistics", ADM_get_statistics_in_t, + ADM_get_statistics_out_t, ADM_get_statistics, true); } void @@ -191,12 +317,11 @@ public: } /** - * Deprecated call, used to support Margo directly - * - **/ + * Deprecated call, used to support Margo directly + * + **/ template - [[deprecated("It should be eventually replaced by a generic call")]] - void + [[deprecated("It should be eventually replaced by a generic call")]] void call(const std::string& id, T1 input = nullptr, T2 output = nullptr) { const auto it = m_margo_context->m_rpc_names.find(id); @@ -224,7 +349,7 @@ public: ::HG_Error_to_string(ret))); } - if (output != nullptr) { + if(output != nullptr) { ret = ::margo_get_output(handle, output); } diff --git a/src/network/rpcs.cpp b/src/network/rpcs.cpp index 2d393d644bf135ed4f140dc36442c959a8fa9576..7fb34105c14bcb46b62ef70d4c3d99b43107c3e4 100644 --- a/src/network/rpcs.cpp +++ b/src/network/rpcs.cpp @@ -31,10 +31,1333 @@ ping(hg_handle_t h) { margo_instance_id mid = margo_hg_handle_get_instance(h); - LOGGER_INFO("remote_procedure::PING(noargs)"); + LOGGER_INFO("PING(noargs)"); ret = margo_destroy(h); assert(ret == HG_SUCCESS); } DEFINE_MARGO_RPC_HANDLER(ping); + +/** + * Specifes the origin location in a storage tier where input is located, as + * well as the target location where it should be placed in a different storage + * tier. + * + * @param in.origin An origin location for the source dataset. + * @param in.target A target location for the destination dataset. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_input(hg_handle_t h) { + hg_return_t ret; + + ADM_input_in_t in; + ADM_input_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.origin == nullptr) { + LOGGER_ERROR("ADM_input(): invalid origin (nullptr)"); + } else if(in.target == nullptr) { + LOGGER_ERROR("ADM_input(): invalid target (nullptr)"); + } else { + LOGGER_INFO("ADM_input({}, {})", in.origin, in.target); + out.ret = 0; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_input) + +/** + * Specifies the origin location in a storage tier where output is located, as + * well as the target location where it should be placed in a different storage + * tier. + * + * @param in.origin An origin location for the source dataset. + * @param in.target A target location for the destination dataset. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_output(hg_handle_t h) { + hg_return_t ret; + + ADM_output_in_t in; + ADM_output_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.origin == nullptr) { + LOGGER_ERROR("ADM_output(): invalid origin (nullptr)"); + } else if(in.target == nullptr) { + LOGGER_ERROR("ADM_output(): invalid target (nullptr)"); + } else { + LOGGER_INFO("ADM_output({}, {})", in.origin, in.target); + out.ret = 0; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_output) + +/** + * Specifies both the input and output locations in a storage tier. This + * combines both ADM_input and ADM_output for user convenience: the input data + * provided by origin is overwritten by the output data generated at target. + * + * @param in.origin An origin location for the source dataset. + * @param in.target A target location for the destination dataset. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_inout(hg_handle_t h) { + hg_return_t ret; + + ADM_inout_in_t in; + ADM_inout_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.origin == nullptr) { + LOGGER_ERROR("ADM_inout(): invalid origin (nullptr)"); + } else if(in.target == nullptr) { + LOGGER_ERROR("ADM_inout(): invalid target (nullptr)"); + } else { + LOGGER_INFO("ADM_inout({}, {})", in.origin, in.target); + out.ret = 0; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_inout) + +/** + * Specifies the execution_mode an Ad hoc Storage System should use. Valid + * options: in_job:shared (run while sharing the application’s compute nodes), + * in_job:dedicated (run using a subset of the application’s compute nodes), + * separate:new (ask the system to allocate a separate job with separate runtime + * and number of nodes) and separate:existing (ask the system to reuse an + * already running Ad hoc Storage System instance). The number of nodes assigned + * for the Ad hoc Storage System must be specified with ADM_adhoc_nodes. In the + * separate:new execution_mode, the lifetime of the Ad hoc Storage System will + * be controlled with ADM_adhoc_walltime. In the separate:existing + * execution_mode, a valid context ID must be provided with + * ADM_adhoc_context_id. + * + * @param in.context A valid execution_mode describing how the Ad hoc Storage + * System should behave. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + * @return out.adhoc_context_id A number that identifies the context. + */ +static void +ADM_adhoc_context(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_context_in_t in; + ADM_adhoc_context_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + const std::string ctx(in.context); + + out.ret = -1; + out.adhoc_context = -1; + + if(in.context == nullptr) { + LOGGER_ERROR("ADM_adhoc_context(): invalid context (nullptr)"); + } else { + LOGGER_INFO("ADM_adhoc_context({})", in.context); + + if(ctx == "in_job:shared" || ctx == "in_job:dedicated" || + ctx == "separate:new" || ctx == "separate:existing") { + LOGGER_INFO("ADM_adhoc_context value is acceptable ({})", + in.context); + out.ret = 0; + out.adhoc_context = rand(); + } else { + LOGGER_ERROR( + "ADM_adhoc_context is not valid. Please use: in_job:shared, in_job:dedicated, separate:new or separate:existing"); + } + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_context) + +/** + * Specifies an existing Ad hoc Storage System to use via its ID. + * + * @param in.context_id A valid context_id for a separate instance of an Ad hoc + * Storage System. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_adhoc_context_id(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_context_id_in_t in; + ADM_adhoc_context_id_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + + out.ret = -1; + + if(in.context_id < 0) { + LOGGER_ERROR("ADM_adhoc_context_id(): invalid context_id (< 0)"); + } else { + LOGGER_INFO("ADM_adhoc_context_id({})", in.context_id); + out.ret = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_context_id) + +/** + * Specifies the number of nodes for the Ad hoc Storage System. If the + * ADM_adhoc_execution_mode is shared, the number cannot exceed the number of + * allocated nodes within the compute job. If the ADM_adhoc_execution_mode is + * dedicated, the number of nodes is not restricted. + * + * @param in.number_of_nodes The desired number_of_nodes. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_adhoc_nodes(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_nodes_in_t in; + ADM_adhoc_nodes_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.nodes <= 0) { + LOGGER_ERROR("ADM_adhoc_nodes(): invalid n_nodes (<= 0)"); + } else { + LOGGER_INFO("ADM_adhoc_nodes({})", in.nodes); + out.ret = 0; + } + + + /*Specifies the number of nodes for the Ad hoc Storage System. If the + ADM_adhoc_execution_mode is shared, the number cannot exceed the number of + allocated nodes within the compute job. If the ADM_adhoc_execution_mode is + dedicated, the number of nodes is not restricted. Should this be checked + now? */ + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_nodes) + +/** + * Specifies for how long the ad hoc storage system should run before should + * down. Only relevant in the context of the ADM_adhoc_context function. + * + * @param in.walltime The desired walltime in minutes. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_adhoc_walltime(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_walltime_in_t in; + ADM_adhoc_walltime_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.walltime < 0) { + LOGGER_ERROR("ADM_adhoc_walltime(): invalid walltime (< 0)"); + } else { + LOGGER_INFO("ADM_adhoc_walltime({})", in.walltime); + out.ret = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_walltime) + +/** + * Specifies access to the ad hoc storage system: write-only, read-only, + * read-write. Cannot be used when using an existing Ad hoc Storage System + * instance. + * + * @param in.access The desired access method + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_adhoc_access(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_access_in_t in; + ADM_adhoc_access_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + const std::string acc(in.access); + + out.ret = -1; + + if(in.access == nullptr) { + LOGGER_ERROR("ADM_adhoc_access(): invalid access (nullptr)"); + } else { + LOGGER_INFO("ADM_adhoc_access({}, {})", in.access); + + if((acc == "write-only") || (acc == "read-only") || + (acc == "read-write")) { + out.ret = 0; + LOGGER_INFO("ADM_adhoc_access value is acceptable ({})", in.access); + } else { + LOGGER_ERROR( + "ADM_adhoc_access is not valid. Please use: write-only, read-only or read-write"); + } + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_access) + + +/** + * Specifies the data distribution within the ad hoc storage system, e.g., + * wide-striping, local, local-data-global-metadata. + * + * @param in.data_distribution The desired data distribution + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_adhoc_distribution(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_distribution_in_t in; + ADM_adhoc_distribution_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.data_distribution == nullptr) { + LOGGER_ERROR( + "ADM_adhoc_distribution(): invalid data_distribution (nullptr)"); + } else { + LOGGER_INFO("ADM_adhoc_distribution({})", in.data_distribution); + out.ret = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_distribution) + +/** + * Specifies if data in the output location should be moved to the shared + * backend storage system in the background (default false). + * + * @param in.b_flush A boolean enabling or disabling the option. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_adhoc_background_flush(hg_handle_t h) { + hg_return_t ret; + + ADM_adhoc_background_flush_in_t in; + ADM_adhoc_background_flush_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + LOGGER_INFO("ADM_adhoc_background_flush({})", in.b_flush); + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_adhoc_background_flush) + +/** + * In situ data operations specified in a given configuration file. + * + * @param in.in_situ A path to the configuration file. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_in_situ_ops(hg_handle_t h) { + hg_return_t ret; + + ADM_in_situ_ops_in_t in; + ADM_in_situ_ops_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.in_situ == nullptr) { + LOGGER_ERROR("ADM_in_situ_ops(): invalid in_situ_ops (nullptr)"); + } else { + LOGGER_INFO("ADM_in_situ_ops({})", in.in_situ); + out.ret = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_in_situ_ops) + +/** + * In transit data operations specified in a given configuration file. + * + * @param in.in_transit A path to the configuration file. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_in_transit_ops(hg_handle_t h) { + hg_return_t ret; + + ADM_in_transit_ops_in_t in; + ADM_in_transit_ops_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + if(in.in_transit == nullptr) { + LOGGER_ERROR("ADM_in_transit_ops(): invalid in_transit (nullptr)"); + } else { + LOGGER_INFO("ADM_in_transit_ops({})", in.in_transit); + out.ret = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops) + + +/** + * Transfers the dataset identified by the source_name to the storage tier + * defined by destination_name, and apply the provided constraints during the + * transfer. This function returns a handle that can be used to track the + * operation (i.e., get statistics, or status). + * + * @param in.source A source_location identifying the source dataset/s in the + * source storage tier. + * @param in.destination A destination_location identifying the destination + * dataset/s in its desired location in a storage tier. + * @param in.qos_constraints A list of qos_constraints that must be applied to + * the transfer. These may not exceed the global ones set at node, application, + * or resource level (see Section 3.4). + * @param in.distribution A distribution strategy for data (e.g. one-to-one, + * one-to-many, many-to-many) + * @param in.job_id A job_id identifying the originating job. + * @param out.transfer_handle A transfer_handle allowing clients to interact + * with the transfer (e.g. wait for its completion, query its status, cancel it, + * etc. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_transfer_dataset(hg_handle_t h) { + hg_return_t ret; + + ADM_transfer_dataset_in_t in; + ADM_transfer_dataset_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.transfer_handle = "fail"; + + if(in.source == nullptr) { + LOGGER_ERROR("ADM_transfer_dataset(): invalid source (nullptr)"); + } else if(in.destination == nullptr) { + LOGGER_ERROR("ADM_transfer_dataset(): invalid destination (nullptr)"); + } else if(in.qos_constraints == nullptr) { + LOGGER_ERROR( + "ADM_transfer_dataset(): invalid qos_constraints (nullptr)"); + } else if(in.distribution == nullptr) { + LOGGER_ERROR("ADM_transfer_dataset(): invalid distribution (nullptr)"); + } else if(in.job_id < 0) { + LOGGER_ERROR("ADM_transfer_dataset(): invalid job_id (< 0)"); + } else { + LOGGER_INFO("ADM_transfer_dataset({},{},{},{},{})", in.source, + in.destination, in.qos_constraints, in.distribution, + in.job_id); + out.ret = 0; + out.transfer_handle = "ok"; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_transfer_dataset) + +/** + * Sets information for the dataset identified by resource_id. + * + * @param in.resource_id A resource_id identifying the dataset of interest. + * @param in.info An opaque inf o argument containing information about the + * dataset (e.g. its lifespan, access methods, intended usage, etc.). + * @param in.job_id A job_id identifying the originating job. + * @param out.status A status code determining whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_set_dataset_information(hg_handle_t h) { + hg_return_t ret; + + ADM_set_dataset_information_in_t in; + ADM_set_dataset_information_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.status = -1; + + if(in.resource_id < 0) { + LOGGER_ERROR( + "ADM_set_dataset_information(): invalid resource_id (< 0)"); + } else if(in.info == nullptr) { + LOGGER_ERROR("ADM_set_dataset_information(): invalid info (nullptr)"); + } else if(in.job_id < 0) { + LOGGER_ERROR("ADM_set_dataset_information(): invalid job_id (< 0)"); + } else { + LOGGER_INFO("ADM_set_dataset_information({},{},{})", in.resource_id, + in.info, in.job_id); + out.ret = 0; + out.status = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_set_dataset_information) + +/** + * Changes the I/O resources used by a storage tier, typically an Ad hoc Storage + * System. + * + * @param in.tier_id A tier_id specifying the target storage tier. + * @param in.resources An opaque resources argument containing information about + * the I/O resources to modify (e.g. number of I/O nodes.). + * @param in.job_id A job_id identifying the originating job. + * @param out.status A status code determining whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_set_io_resources(hg_handle_t h) { + hg_return_t ret; + + ADM_set_io_resources_in_t in; + ADM_set_io_resources_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.status = -1; + + if(in.tier_id < 0) { + LOGGER_ERROR("ADM_set_io_resources(): invalid tier_id (nullptr)"); + } else if(in.resources == nullptr) { + LOGGER_ERROR("ADM_set_io_resources(): invalid resources (nullptr)"); + } else if(in.job_id < 0) { + LOGGER_ERROR("ADM_set_io_resources(): invalid job_id (< 0)"); + } else { + LOGGER_INFO("ADM_set_io_resources({},{},{})", in.tier_id, in.resources, + in.job_id); + out.ret = 0; + out.status = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_set_io_resources) + + +/** + * Returns the priority of the pending transfer identified by transfer_id. + * + * @param in.transfer_id A tier_id specifying the target storage tier. + * @param out.priority The priority of the pending transfer or an error code if + * it didn’t exist or is no longer pending. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_get_transfer_priority(hg_handle_t h) { + hg_return_t ret; + + ADM_get_transfer_priority_in_t in; + ADM_get_transfer_priority_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.priority = -1; + + if(in.transfer_id < 0) { + LOGGER_ERROR( + "ADM_get_transfer_priority(): invalid transfer_id (nullptr)"); + } else { + LOGGER_INFO("ADM_get_transfer_priority({})", in.transfer_id); + out.ret = 0; + out.priority = 0; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_get_transfer_priority) + +/** + * Moves the operation identified by transfer_id up or down by n positions in + * its scheduling queue. + * + * @param in.transfer_id A transf er_id identifying a pending transfer. + * @param in.n_positions A positive or negative number n for the number of + * positions the transfer should go up or down in its scheduling queue. + * @param out.status A status code indicating whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_set_transfer_priority(hg_handle_t h) { + hg_return_t ret; + + ADM_set_transfer_priority_in_t in; + ADM_set_transfer_priority_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.status = -1; + + if(in.transfer_id < 0) { + LOGGER_ERROR( + "ADM_set_transfer_priority(): invalid transfer_id (nullptr)"); + } else { + LOGGER_INFO("ADM_set_transfer_priority({}, {})", in.transfer_id, + in.n_positions); + out.ret = 0; + out.status = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_set_transfer_priority) + +/** + * Cancels the pending transfer identified by transfer_id. + * + * @param in.transfer_id A transfer_id identifying a pending transfer. + * @param out.status A status code indicating whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_cancel_transfer(hg_handle_t h) { + hg_return_t ret; + + ADM_cancel_transfer_in_t in; + ADM_cancel_transfer_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.status = -1; + + if(in.transfer_id < 0) { + LOGGER_ERROR("ADM_cancel_transfer(): invalid transfer_id (< 0)"); + } else { + LOGGER_INFO("ADM_cancel_transfer({})", in.transfer_id); + out.ret = 0; + out.status = 0; + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_cancel_transfer) + +/** + * Returns a list of pending transfers. Each operation will include a transf + * er_id as well as information about the involved resources and tiers. + * + * @param out.pending_transfers A list of pending_transfers. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_get_pending_transfers(hg_handle_t h) { + hg_return_t ret; + + ADM_get_pending_transfers_in_t in; + ADM_get_pending_transfers_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = 0; + out.pending_transfers = "list"; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_get_pending_transfers) + +/** + * Registers a QoS constraint defined by class, scope, and value for the element + * identified by id. + * + * @param in.scope The scope it should be applied to: dataset, node, or job. + * @param in.qos_class A QoS class (e.g. "badwidth", "iops", etc.). + * @param in.element_id A valid id for the element that should be constrained, + * i.e. a resource ID, a node hostname, or a Job ID. + * @param in.class_value An appropriate value for the selected class. + * @param out.status A status code indicating whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_set_qos_constraints_push(hg_handle_t h) { + hg_return_t ret; + + ADM_set_qos_constraints_push_in_t in; + ADM_set_qos_constraints_push_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + const std::string scp(in.scope); + + out.ret = -1; + out.status = -1; + + if(in.scope == nullptr) { + LOGGER_ERROR("ADM_set_qos_constraints_push(): invalid scope (nullptr)"); + } else if(in.qos_class == nullptr) { + LOGGER_ERROR( + "ADM_set_qos_constraints_push(): invalid qos_class (nullptr)"); + } else if(in.element_id < 0) { + LOGGER_ERROR( + "ADM_set_qos_constraints_push(): invalid element_id (< 0)"); + } else if(in.class_value == nullptr) { + LOGGER_ERROR( + "ADM_set_qos_constraints_push(): invalid class_value (nullptr)"); + } else { + LOGGER_INFO("ADM_set_qos_constraints_push({}, {}, {}, {})", in.scope, + in.qos_class, in.element_id, in.class_value); + if((scp == "dataset") || (scp == "node") || (scp == "job")) { + LOGGER_INFO( + "ADM_set_qos_constraints_push scope value is acceptable ({})", + in.scope); + out.ret = 0; + out.status = 0; + } else { + LOGGER_ERROR( + "ADM_set_qos_constraints_push scope value is not valid. Please use: dataset, node or job"); + } + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_push) + +/** + * Returns a list of QoS constraints defined for an element identified for id. + * + * @param in.scope The scope being queried: dataset, node, or job. + * @param in.element_id A valid id for the element of interest, i.e. a resource + * ID, a node hostname, or a Job ID. + * @param out.list A list of QoS constraints that includes all the classes + * currently defined for the element as well as the values set for them. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_set_qos_constraints_pull(hg_handle_t h) { + hg_return_t ret; + + ADM_set_qos_constraints_pull_in_t in; + ADM_set_qos_constraints_pull_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + const std::string scp(in.scope); + + out.ret = -1; + out.list = nullptr; + + if(in.scope == nullptr) { + LOGGER_ERROR("ADM_set_qos_constraints_pull(): invalid scope (nullptr)"); + } else if(in.element_id < 0) { + LOGGER_ERROR("ADM_set_qos_constraints_pull(): invalid element_id (< 0)"); + } else { + LOGGER_INFO("ADM_set_qos_constraints_pull({}, {})", in.scope, in.element_id); + if((scp == "dataset") || (scp == "node") || (scp == "job")) { + LOGGER_INFO( + "ADM_set_qos_constraints_pull scope value is acceptable ({})", + in.scope); + out.ret = 0; + out.list = "list"; + } else { + LOGGER_ERROR( + "ADM_set_qos_constraints_pull scope value is not valid. Please use: dataset, node or job "); + } + } + + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_pull) + +/** + * Defines a new operation, with the code found in path. The code will be + * identified by the user-provided operation_id and will accept the arguments + * defined, using the next format "arg0, arg1, arg2, . . . ". + * + * @param in.path A valid path for the operation code. + * @param in.operation_id A user-defined operation_id for the operation. + * @param in.arguments A list of arguments for the operation. + * @param out.status A status code indicating whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_define_data_operation(hg_handle_t h) { + hg_return_t ret; + + ADM_define_data_operation_in_t in; + ADM_define_data_operation_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.status = -1; + + if(in.path == nullptr) { + LOGGER_ERROR("ADM_define_data_operation(): invalid path (nullptr)"); + } else if(in.operation_id < 0) { + LOGGER_ERROR("ADM_define_data_operation(): invalid operation_id (< 0)"); + } else if(in.arguments == nullptr) { + LOGGER_ERROR( + "ADM_define_data_operation(): invalid arguments (nullptr)"); + } else { + LOGGER_INFO("ADM_define_data_operation ({}, {}, {})", in.path, + in.operation_id, in.arguments); + out.ret = 0; + out.status = 0; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_define_data_operation) + + +/** + * Connects and starts the data operation defined with operation_id and with the + * arguments, using the input and output data storage (i.e., files). If the + * operation can be executed in a streaming fashion (i.e., it can start even if + * the input data is not entirely available), the stream parameter must be set + * to true. + * + * @param in.operation_id The operation_id of the operation to be connected. + * @param in.input An input data resource for the operation. + * @param in.stream A stream boolean indicating if the operation should be + * executed in a streaming fashion. + * @param in.arguments The values for the arguments required by the operation. + * @param in.job_id A job_id identifying the originating job. + * @param out.data An output data resource where the result of the operation + * should be stored. + * @return out.operation_handle An operation_handle for the operation that + * allows clients to further interact with the operation (e.g query its status, + * cancel it, etc.). + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_connect_data_operation(hg_handle_t h) { + hg_return_t ret; + + ADM_connect_data_operation_in_t in; + ADM_connect_data_operation_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.data = nullptr; + out.operation_handle = nullptr; + + if(in.operation_id < 0) { + LOGGER_ERROR( + "ADM_connect_data_operation(): invalid operation_id (< 0)"); + } else if(in.input == nullptr) { + LOGGER_ERROR("ADM_define_data_operation(): invalid input (nullptr)"); + } else if(in.stream != true && in.stream != false) { + LOGGER_ERROR( + "ADM_connect_data_operation(): invalid stream (not true/false)"); + } else if(in.arguments == nullptr) { + LOGGER_ERROR( + "ADM_connect_data_operation(): invalid arguments (nullptr)"); + } else if(in.job_id < 0) { + LOGGER_ERROR("ADM_connect_data_operation(): invalid job_id (< 0)"); + } else { + LOGGER_INFO("ADM_connect_data_operation({}, {}, {}, {}, {})", + in.operation_id, in.input, in.stream, in.arguments, + in.job_id); + out.ret = 0; + out.data = "ouput"; + out.operation_handle = "operation_handle"; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_connect_data_operation) + +/** + * Finalises the operation defined with operation_id. + * + * @param in.operation_id The operation_id of the operation to be connected. + * @return out.status A status code indicating whether the operation was + * successful. + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_finalize_data_operation(hg_handle_t h) { + hg_return_t ret; + + ADM_finalize_data_operation_in_t in; + ADM_finalize_data_operation_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.status = -1; + + if(in.operation_id < 0) { + LOGGER_ERROR( + "ADM_finalize_data_operation(): invalid operation_id (< 0)"); + } else { + LOGGER_INFO("ADM_finalize_data_operation({})", in.operation_id); + out.ret = 0; + out.status = 0; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_finalize_data_operation) + +/** + * Links the data operation defined with operation_id with the pending transfer + * identified by transf er_id using the values provided as arguments. If the + * operation can be executed in a streaming fashion (i.e., it can start even if + * the input data is not entirely available), the stream parameter must be set + * to true. + * + * @param in.operation_id The operation_id of the operation to be connected. + * @param in.transfer_id The transfer_id of the pending transfer the operation + * should be linked to. + * @param in.stream A stream boolean indicating if the operation should be + * executed in a streaming fashion. + * @param in.arguments The values for the arguments required by the operation. + * @param in.job_id A job_id identifying the originating job. + * @return out.operation_handle An operation_handle for the operation that + * allows clients to further interact with the operation (e.g query its status, + * cancel it, etc.). + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_link_transfer_to_data_operation(hg_handle_t h) { + hg_return_t ret; + + ADM_link_transfer_to_data_operation_in_t in; + ADM_link_transfer_to_data_operation_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.operation_handle = nullptr; + + if(in.operation_id < 0) { + LOGGER_ERROR( + "ADM_link_transfer_to_data_operation(): invalid operation_id (< 0)"); + } else if(in.transfer_id < 0) { + LOGGER_ERROR( + "ADM_link_transfer_to_data_operation(): invalid transfer_id (< 0)"); + } else if(in.arguments == nullptr) { + LOGGER_ERROR( + "ADM_link_transfer_to_data_operation(): invalid arguments (nullptr)"); + } else if(in.stream != true && in.stream != false) { + LOGGER_ERROR( + "ADM_link_transfer_to_data_operation(): invalid stream (not true/false)"); + } else if(in.job_id < 0) { + LOGGER_ERROR( + "ADM_link_transfer_to_data_operation(): invalid job_id (< 0)"); + } else { + LOGGER_INFO("ADM_link_transfer_to_data_operation ({}, {}, {}, {}, {})", + in.operation_id, in.transfer_id, in.stream, in.arguments, + in.job_id); + out.ret = 0; + out.operation_handle = "operation_handle"; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_link_transfer_to_data_operation) + + +/** + * Returns the current I/O statistics for a specified job_id and an optional + * corresponding job_step. The information will be returned in an + * easy-to-process format, e.g., JSON (see Listing 3.1). + * + * @param in.job_id + * @param in.job_step + * @return out.job_statistics + * @return out.ret Returns if the remote procedure has been completed + * successfully or not. + */ +static void +ADM_get_statistics(hg_handle_t h) { + hg_return_t ret; + + ADM_get_statistics_in_t in; + ADM_get_statistics_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + out.job_statistics = nullptr; + + if(in.job_id < 0) { + LOGGER_ERROR("ADM_get_statistics(): invalid job_id (< 0)"); + } else if(in.job_step < 0) { + LOGGER_ERROR("ADM_get_statistics(): invalid job_step (< 0)"); + } else { + LOGGER_INFO("ADM_get_statistics ({}, {})", + in.job_id, in.job_step); + out.ret = 0; + out.job_statistics = "job_statistics"; + } + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_get_statistics) diff --git a/src/network/rpcs.hpp b/src/network/rpcs.hpp index df1302a218f54ce48ebf321d0a564c743d26c2c8..170fb249db014271909597943d13b9fb2d31ebb5 100644 --- a/src/network/rpcs.hpp +++ b/src/network/rpcs.hpp @@ -27,6 +27,10 @@ #include #include +#include +#include +#include + // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { @@ -34,6 +38,260 @@ /// ping DECLARE_MARGO_RPC_HANDLER(ping); +/// ADM_input +MERCURY_GEN_PROC(ADM_input_in_t, + ((hg_const_string_t) (origin))((hg_const_string_t) (target))) + +MERCURY_GEN_PROC(ADM_input_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_input); + +/// ADM_output + +MERCURY_GEN_PROC(ADM_output_in_t, + ((hg_const_string_t) (origin))((hg_const_string_t) (target))) + +MERCURY_GEN_PROC(ADM_output_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_output); + +/// ADM_inout + +MERCURY_GEN_PROC(ADM_inout_in_t, + ((hg_const_string_t) (origin))((hg_const_string_t) (target))) + +MERCURY_GEN_PROC(ADM_inout_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_inout); + +/// ADM_adhoc_context + +MERCURY_GEN_PROC(ADM_adhoc_context_in_t, ((hg_const_string_t) (context))) + +MERCURY_GEN_PROC(ADM_adhoc_context_out_t, + ((int32_t) (ret))((int32_t) (adhoc_context))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_context); + +/// ADM_adhoc_context_id + +MERCURY_GEN_PROC(ADM_adhoc_context_id_in_t, ((int32_t) (context_id))) + +MERCURY_GEN_PROC(ADM_adhoc_context_id_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_context_id); + +/// ADM_adhoc_nodes + +MERCURY_GEN_PROC(ADM_adhoc_nodes_in_t, ((int32_t) (nodes))) + +MERCURY_GEN_PROC(ADM_adhoc_nodes_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_nodes) + +/// ADM_adhoc_walltime + +MERCURY_GEN_PROC(ADM_adhoc_walltime_in_t, ((int32_t) (walltime))) + +MERCURY_GEN_PROC(ADM_adhoc_walltime_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_walltime); + + +/// ADM_adhoc_access + +MERCURY_GEN_PROC(ADM_adhoc_access_in_t, ((hg_const_string_t) (access))) + +MERCURY_GEN_PROC(ADM_adhoc_access_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_access); + +/// ADM_adhoc_distribution + +MERCURY_GEN_PROC(ADM_adhoc_distribution_in_t, + ((hg_const_string_t) (data_distribution))) + +MERCURY_GEN_PROC(ADM_adhoc_distribution_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_distribution); + +/// ADM_adhoc_background_flush + +MERCURY_GEN_PROC(ADM_adhoc_background_flush_in_t, ((hg_bool_t) (b_flush))) + +MERCURY_GEN_PROC(ADM_adhoc_background_flush_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_adhoc_background_flush); + +/// ADM_in_situ_ops + +MERCURY_GEN_PROC(ADM_in_situ_ops_in_t, ((hg_const_string_t) (in_situ))) + +MERCURY_GEN_PROC(ADM_in_situ_ops_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_in_situ_ops); + +/// ADM_in_transit_ops + +MERCURY_GEN_PROC(ADM_in_transit_ops_in_t, ((hg_const_string_t) (in_transit))) + +MERCURY_GEN_PROC(ADM_in_transit_ops_out_t, ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_in_transit_ops); + + +/// ADM_transfer_dataset + +MERCURY_GEN_PROC( + ADM_transfer_dataset_in_t, + ((hg_const_string_t) (source))((hg_const_string_t) (destination))( + (hg_const_string_t) (qos_constraints))( + (hg_const_string_t) (distribution))((int32_t) (job_id))) + +MERCURY_GEN_PROC(ADM_transfer_dataset_out_t, + ((int32_t) (ret))((hg_const_string_t) (transfer_handle))) + +DECLARE_MARGO_RPC_HANDLER(ADM_transfer_dataset); + +/// ADM_set_dataset_information + +MERCURY_GEN_PROC(ADM_set_dataset_information_in_t, + ((int32_t) (resource_id))((hg_const_string_t) (info))( + (int32_t) (job_id))) + +MERCURY_GEN_PROC(ADM_set_dataset_information_out_t, + ((int32_t) (ret))((int32_t) (status))) + +DECLARE_MARGO_RPC_HANDLER(ADM_set_dataset_information); + +/// ADM_set_io_resources + +MERCURY_GEN_PROC(ADM_set_io_resources_in_t, + ((int32_t) (tier_id))((hg_const_string_t) (resources))( + (int32_t) (job_id))) + +MERCURY_GEN_PROC(ADM_set_io_resources_out_t, + ((int32_t) (ret))((int32_t) (status))) + +DECLARE_MARGO_RPC_HANDLER(ADM_set_io_resources); + +/// ADM_get_transfer_priority + +MERCURY_GEN_PROC(ADM_get_transfer_priority_in_t, ((int32_t) (transfer_id))) + +MERCURY_GEN_PROC(ADM_get_transfer_priority_out_t, + ((int32_t) (ret))((int32_t) (priority))) + +DECLARE_MARGO_RPC_HANDLER(ADM_get_transfer_priority); + +/// ADM_set_transfer_priority + +MERCURY_GEN_PROC(ADM_set_transfer_priority_in_t, + ((int32_t) (transfer_id))((int32_t) (n_positions))) + +MERCURY_GEN_PROC(ADM_set_transfer_priority_out_t, + ((int32_t) (ret))((int32_t) (status))) + +DECLARE_MARGO_RPC_HANDLER(ADM_set_transfer_priority); + +/// ADM_cancel_transfer + +MERCURY_GEN_PROC(ADM_cancel_transfer_in_t, ((int32_t) (transfer_id))) + +MERCURY_GEN_PROC(ADM_cancel_transfer_out_t, + ((int32_t) (ret))((int32_t) (status))) + +DECLARE_MARGO_RPC_HANDLER(ADM_cancel_transfer); + +/// ADM_get_pending_transfers + +MERCURY_GEN_PROC(ADM_get_pending_transfers_in_t, ((hg_const_string_t) (value))) + +MERCURY_GEN_PROC(ADM_get_pending_transfers_out_t, + ((int32_t) (ret))((hg_const_string_t) (pending_transfers))) + +DECLARE_MARGO_RPC_HANDLER(ADM_get_pending_transfers); + +/// ADM_set_qos_constraints_push + +MERCURY_GEN_PROC( + ADM_set_qos_constraints_push_in_t, + ((hg_const_string_t) (scope))((hg_const_string_t) (qos_class))( + (int32_t) (element_id))((hg_const_string_t) (class_value))) + +MERCURY_GEN_PROC(ADM_set_qos_constraints_push_out_t, + ((int32_t) (ret))((int32_t) (status))) + +DECLARE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_push); + +/// ADM_set_qos_constraints_pull + +MERCURY_GEN_PROC(ADM_set_qos_constraints_pull_in_t, + ((hg_const_string_t) (scope))((int32_t) (element_id))) + +MERCURY_GEN_PROC(ADM_set_qos_constraints_pull_out_t, + ((int32_t) (ret))((hg_const_string_t) (list))) + +DECLARE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_pull); + +/// ADM_define_data_operation + +MERCURY_GEN_PROC(ADM_define_data_operation_in_t, + ((hg_const_string_t) (path))((int32_t) (operation_id))( + (hg_const_string_t) (arguments))) + +MERCURY_GEN_PROC(ADM_define_data_operation_out_t, + ((int32_t) (ret))((int32_t) (status))) + +DECLARE_MARGO_RPC_HANDLER(ADM_define_data_operation); + +/// ADM_connect_data_operation + +MERCURY_GEN_PROC(ADM_connect_data_operation_in_t, + ((int32_t) (operation_id))((hg_const_string_t) (input))( + (hg_bool_t) (stream))((hg_const_string_t) (arguments))( + (int32_t) (job_id))) + +MERCURY_GEN_PROC(ADM_connect_data_operation_out_t, + ((int32_t) (ret))((hg_const_string_t) (data))( + (hg_const_string_t) (operation_handle))) + +DECLARE_MARGO_RPC_HANDLER(ADM_connect_data_operation); + +/// ADM_finalize_data_operation + +MERCURY_GEN_PROC(ADM_finalize_data_operation_in_t, ((int32_t) (operation_id))) + +MERCURY_GEN_PROC(ADM_finalize_data_operation_out_t, + ((int32_t) (ret))((int32_t) (status))) + + +DECLARE_MARGO_RPC_HANDLER(ADM_finalize_data_operation); + +/// ADM_link_transfer_to_data_operation + +MERCURY_GEN_PROC(ADM_link_transfer_to_data_operation_in_t, + ((int32_t) (operation_id))((int32_t) (transfer_id))( + (hg_bool_t) (stream))((hg_const_string_t) (arguments))( + (int32_t) (job_id))) + +MERCURY_GEN_PROC(ADM_link_transfer_to_data_operation_out_t, + ((int32_t) (ret))((hg_const_string_t) (operation_handle))) + +DECLARE_MARGO_RPC_HANDLER(ADM_link_transfer_to_data_operation); + +/// ADM_get_statistics + +MERCURY_GEN_PROC(ADM_get_statistics_in_t, + ((int32_t) (job_id))((int32_t) (job_step))) + +MERCURY_GEN_PROC(ADM_get_statistics_out_t, + ((int32_t) (ret))((hg_const_string_t) (job_statistics))) + +DECLARE_MARGO_RPC_HANDLER(ADM_get_statistics); + + + //} // namespace scord::network::rpc #endif // SCORD_NETWORK_RPCS_HPP