diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b5690ac4f9168c47c076a79c647b59d85dc9130e..752983ac7cba88450d88a6f391a7e9b0840c798c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -14,11 +14,12 @@ build: - mkdir -p build - cd build - cmake -DCMAKE_PREFIX_PATH:STRING=/usr/local -DCMAKE_INSTALL_PREFIX:STRING=${CI_PROJECT_DIR}/compiled -DSCORD_BUILD_EXAMPLES:BOOL=ON -DSCORD_TRANSPORT_LIBRARY=libfabric -DSCORD_TRANSPORT_PROTOCOL=ofi+tcp -DSCORD_BIND_ADDRESS=127.0.0.1 -DSCORD_BIND_PORT=52000 .. - - make install + - make -j$(nproc) install artifacts: paths: - compiled/bin/ - compiled/etc/ + - compiled/lib/ - build/examples/ # depending on your build setup it's most likely a good idea to cache outputs to reduce the build time cache: @@ -33,9 +34,9 @@ test: stage: test needs: [build] script: - - export LD_LIBRARY_PATH=/usr/local/lib:/usr/local/lib64 + - export LD_LIBRARY_PATH=/usr/local/lib:/usr/local/lib64:${CI_PROJECT_DIR}/compiled/lib - compiled/bin/scord -f --force-console & - build/examples/ping ofi+tcp://127.0.0.1:52000 - - pkill -9 scord + - pkill -TERM scord cache: key: $CI_COMMIT_REF_SLUG diff --git a/CMakeLists.txt b/CMakeLists.txt index e9d0f67616b5fedc228324976b6cd6031186eb06..3199c66c0cdd600eda898ab76fa9f0e28a2716cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -183,6 +183,7 @@ FetchContent_Declare( ) FetchContent_MakeAvailable(spdlog) +set_target_properties(spdlog PROPERTIES POSITION_INDEPENDENT_CODE ON) ### file_options: required for reading configuration files message(STATUS "[${PROJECT_NAME}] Downloading and building file_options") @@ -217,6 +218,15 @@ mark_variables_as_advanced(REGEX "^(FETCHCONTENT|fmt|FMT|spdlog|SPDLOG)_.*$") # ############################################################################## # Process subdirectories # ############################################################################## + +# set compile flags +add_compile_options("-Wall" "-Wextra" "$<$:-O3>") +if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") + add_compile_options("-stdlib=libc++") +else() + # nothing special for gcc at the moment +endif() + add_subdirectory(etc) add_subdirectory(src) diff --git a/examples/ADM_cancel_transfer.cpp b/examples/ADM_cancel_transfer.cpp index f8cee99f4c504bc7cfdf84473ba5ad08bb80d94a..3a0137fd2d5025758e1cbf9aa620a378c43acfc4 100644 --- a/examples/ADM_cancel_transfer.cpp +++ b/examples/ADM_cancel_transfer.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -12,35 +12,27 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_transfer_handle_t tx_handle{}; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_cancel_transfer remote procedure on {} with transfer id {} ...\n", - argv[1], argv[2]); - ADM_cancel_transfer_in_t in; try { - in.transfer_id = std::stoi(argv[2]); + ret = admire::cancel_transfer(server, job, tx_handle); } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - ADM_cancel_transfer_out_t out; - endp.call("ADM_cancel_transfer", &in, &out); - - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_cancel_transfer remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_cancel_transfer() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_cancel_transfer remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_cancel_transfer() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_connect_data_operation.cpp b/examples/ADM_connect_data_operation.cpp index 26c5118ee408069d1eb11fc753f42694d09b6684..a8cf7b12f33da835be34df1b4590622e48c0c69e 100644 --- a/examples/ADM_connect_data_operation.cpp +++ b/examples/ADM_connect_data_operation.cpp @@ -1,5 +1,5 @@ #include -#include +#include bool string_to_convert(std::string s) { @@ -24,52 +24,33 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_dataset_handle_t input{}; + ADM_dataset_handle_t output{}; + bool should_stream = false; + va_list args; // FIXME placeholder + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_connect_data_operation remote procedure on {} with operation id {}, input {}, stream {}, arguments {} and job id {} ...\n", - argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]); - ADM_connect_data_operation_in_t in; try { - in.operation_id = std::stoi(argv[2]); + ret = admire::connect_data_operation(server, job, input, output, + should_stream, args); } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_connect_data_operation() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - in.input = argv[3]; - try { - in.stream = string_to_convert(argv[4]); - } catch(const std::invalid_argument& ia) { - fmt::print(stderr, "ERROR: Incorrect input value. Please try again.\n"); - exit(EXIT_FAILURE); - } - in.arguments = argv[5]; - try { - in.job_id = std::stoi(argv[6]); - } catch(const std::exception& e) { - fmt::print( - stderr, - "ERROR: ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); - exit(EXIT_FAILURE); - } - - ADM_connect_data_operation_out_t out; - - endp.call("ADM_connect_data_operation", &in, &out); - - if(out.ret < 0) { + if(ret != ADM_SUCCESS) { fmt::print( stdout, - "ADM_connect_data_operation remote procedure not completed successfully\n"); + "ADM_connect_data_operation() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_connect_data_operation remote procedure completed successfully\n"); } + + fmt::print(stdout, + "ADM_connect_data_operation() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_define_data_operation.cpp b/examples/ADM_define_data_operation.cpp index a5b737d7d81ca5373c49344bf69d325665f5d6ca..571a19f763ed6bf06e22fc458ad6668b6fd17196 100644 --- a/examples/ADM_define_data_operation.cpp +++ b/examples/ADM_define_data_operation.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -13,39 +13,30 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + const char* path = ""; + ADM_data_operation_handle_t op_handle; + va_list args; // FIXME: placeholder + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_define_data_operation remote procedure on {} -> {} with operation id {} and arguments {} ...\n", - argv[1], argv[2], argv[3], argv[4]); - - ADM_define_data_operation_in_t in; - in.path = argv[2]; try { - in.operation_id = std::stoi(argv[3]); + ret = admire::define_data_operation(server, job, path, &op_handle, + args); } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_define_data_operation() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - in.arguments = argv[4]; - - ADM_define_data_operation_out_t out; - - endp.call("ADM_define_data_operation", &in, &out); - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_define_data_operation remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_define_data_operation() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_define_data_operation remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_define_data_operation() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_deploy_adhoc_storage.cpp b/examples/ADM_deploy_adhoc_storage.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0bf4fd9c2c738a0652780d10aecbf0c73a590c7f --- /dev/null +++ b/examples/ADM_deploy_adhoc_storage.cpp @@ -0,0 +1,38 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_deploy_adhoc_storage " + "\n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_adhoc_storage_handle_t adhoc_handle{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::deploy_adhoc_storage(server, job, adhoc_handle); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_deploy_adhoc_storage() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_deploy_adhoc_storage() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_deploy_adhoc_storage() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_finalize_data_operation.cpp b/examples/ADM_finalize_data_operation.cpp index 7ad3f08477a4722b2f5796395946cbaced86b7a1..8227ac342d55b4731e0539a467d0db8859ec3a25 100644 --- a/examples/ADM_finalize_data_operation.cpp +++ b/examples/ADM_finalize_data_operation.cpp @@ -1,5 +1,5 @@ #include -#include +#include int main(int argc, char* argv[]) { @@ -12,35 +12,30 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_data_operation_handle_t op_handle{}; + ADM_data_operation_status_t status; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_finalize_data_operation remote procedure on {} with operation id {} ...\n", - argv[1], argv[2]); - ADM_finalize_data_operation_in_t in; try { - in.operation_id = std::stoi(argv[2]); + ret = admire::finalize_data_operation(server, job, op_handle, &status); } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_finalize_data_operation() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - ADM_finalize_data_operation_out_t out; - endp.call("ADM_finalize_data_operation", &in, &out); - - - if(out.ret < 0) { + if(ret != ADM_SUCCESS) { fmt::print( stdout, - "ADM_finalize_data_operation remote procedure not completed successfully\n"); + "ADM_finalize_data_operation() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_finalize_data_operation remote procedure completed successfully\n"); } + + fmt::print(stdout, + "ADM_finalize_data_operation() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_get_pending_transfers.cpp b/examples/ADM_get_pending_transfers.cpp index 99fdb84884fb2e04f6c812d4783dc76661ae7d56..1729e7b41fe8c87959821bfc88c663e4e8686028 100644 --- a/examples/ADM_get_pending_transfers.cpp +++ b/examples/ADM_get_pending_transfers.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -11,29 +11,27 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); - - fmt::print(stdout, - "Calling ADM_get_pending_transfers remote procedure on {} ...\n", - argv[1]); - ADM_get_pending_transfers_in_t in; - in.value = NULL; - ADM_get_pending_transfers_out_t out; - - endp.call("ADM_get_pending_transfers", &in, &out); + ADM_job_handle_t job{}; + ADM_transfer_handle_t** tx_handles = nullptr; + ADM_return_t ret = ADM_SUCCESS; + try { + ret = admire::get_pending_transfers(server, job, tx_handles); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_get_pending_transfers() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_get_pending_transfers remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_get_pending_transfers() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_get_pending_transfers remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_get_pending_transfers() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_get_qos_constraints.cpp b/examples/ADM_get_qos_constraints.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3b52fd69c161424dd12420a772a2ccb401386603 --- /dev/null +++ b/examples/ADM_get_qos_constraints.cpp @@ -0,0 +1,38 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 4) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_get_qos_constraints \n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_qos_scope_t scope{}; + ADM_qos_entity_t entity{}; + ADM_limit_t* limits; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::get_qos_constraints(server, job, scope, entity, &limits); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_cancel_transfer() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } +} diff --git a/examples/ADM_get_statistics.cpp b/examples/ADM_get_statistics.cpp index 6d08b25d5694a25794128821e806c9430bd00823..c9e03dc3a00bb07384a5c3dc5dd351a56d6a74c6 100644 --- a/examples/ADM_get_statistics.cpp +++ b/examples/ADM_get_statistics.cpp @@ -1,5 +1,5 @@ #include -#include +#include int main(int argc, char* argv[]) { @@ -12,42 +12,27 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_job_stats_t* stats = nullptr; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_get_statistics remote procedure on {} with job id {} and job step {} ...\n", - argv[1], argv[2], argv[3]); - ADM_get_statistics_in_t in; try { - in.job_id = std::stoi(argv[2]); + ret = admire::get_statistics(server, job, &stats); } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - try { - in.job_step = std::stoi(argv[3]); - } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - - ADM_get_statistics_out_t out; - endp.call("ADM_get_statistics", &in, &out); - - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_get_statistics remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_cancel_transfer() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_get_statistics remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_cancel_transfer() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_get_transfer_priority.cpp b/examples/ADM_get_transfer_priority.cpp index a09663a378bbe759f51951330b7056892468507d..a8ef68a2052d339fd52951d4397617a516906062 100644 --- a/examples/ADM_get_transfer_priority.cpp +++ b/examples/ADM_get_transfer_priority.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -13,35 +13,28 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_transfer_handle_t tx_handle{}; + ADM_transfer_priority_t priority; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_get_transfer_priority remote procedure on {} with transfer id {} ...\n", - argv[1], argv[2]); - ADM_get_transfer_priority_in_t in; try { - in.transfer_id = std::stoi(argv[2]); + ret = admire::get_transfer_priority(server, job, tx_handle, &priority); } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_get_transfer_priority() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - ADM_get_transfer_priority_out_t out; - endp.call("ADM_get_transfer_priority", &in, &out); - - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_get_transfer_priority remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_get_transfer_priority() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_get_transfer_priority remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_get_transfer_priority() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_link_transfer_to_data_operation.cpp b/examples/ADM_link_transfer_to_data_operation.cpp index e939a981b31b2e0727caf94c1ad847f610b189f5..db1a4f0935b4e98af1284c6a5a5f75873c908440 100644 --- a/examples/ADM_link_transfer_to_data_operation.cpp +++ b/examples/ADM_link_transfer_to_data_operation.cpp @@ -1,17 +1,5 @@ #include -#include - -bool -string_to_convert(std::string s) { - if(s == "true" || s == "TRUE" || s == "True") { - return true; - } else if(s == "false" || s == "FALSE" || s == "False") { - return false; - } else { - throw std::invalid_argument( - "ERROR: Incorrect input value. Please try again.\n"); - } -} +#include int main(int argc, char* argv[]) { @@ -24,57 +12,30 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_data_operation_handle_t op_handle; + bool should_stream = false; + va_list args; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_link_transfer_to_data_operation remote procedure on {} with operation id {}, transfer id {}, stream {}, arguments {} and job id {} ...\n", - argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]); - ADM_link_transfer_to_data_operation_in_t in; - try { - in.operation_id = std::stoi(argv[2]); - } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - try { - in.transfer_id = std::stoi(argv[3]); - } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } try { - in.stream = string_to_convert(argv[4]); - } catch(const std::invalid_argument& ia) { - fmt::print( - stderr, - "ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); - exit(EXIT_FAILURE); - } - in.arguments = argv[5]; - try { - in.job_id = std::stoi(argv[6]); + ret = admire::link_transfer_to_data_operation(server, job, op_handle, + should_stream, args); } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - ADM_link_transfer_to_data_operation_out_t out; - - endp.call("ADM_link_transfer_to_data_operation", &in, &out); - - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_link_transfer_to_data_operation remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_cancel_transfer() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_link_transfer_to_data_operation remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_cancel_transfer() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_register_adhoc_storage.cpp b/examples/ADM_register_adhoc_storage.cpp new file mode 100644 index 0000000000000000000000000000000000000000..60dd044ac10d1d9eb2433b0e4b340a9ed27d3905 --- /dev/null +++ b/examples/ADM_register_adhoc_storage.cpp @@ -0,0 +1,41 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_register_adhoc_storage " + "\n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_adhoc_context_t ctx{}; + ADM_adhoc_storage_handle_t adhoc_handle{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::register_adhoc_storage(server, job, ctx, &adhoc_handle); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_register_adhoc_storage() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print( + stdout, + "ADM_register_adhoc_storage() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, + "ADM_register_adhoc_storage() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_register_job.cpp b/examples/ADM_register_job.cpp new file mode 100644 index 0000000000000000000000000000000000000000..56672509a2947b8279efa97a1e6e2dc85b694a88 --- /dev/null +++ b/examples/ADM_register_job.cpp @@ -0,0 +1,36 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_register_job \n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_job_requirements_t reqs{}; + ADM_dataset_info_t info{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::register_job(server, reqs, &job); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_register_job() failed: {}\n", e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, "ADM_register_job() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_register_job() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_remove_adhoc_storage.cpp b/examples/ADM_remove_adhoc_storage.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9cfd194def855ec53e83ef0b21a675d6692a9740 --- /dev/null +++ b/examples/ADM_remove_adhoc_storage.cpp @@ -0,0 +1,38 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_remove_adhoc_storage " + "\n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_adhoc_storage_handle_t adhoc_handle{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::remove_adhoc_storage(server, job, adhoc_handle); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_remove_adhoc_storage() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_remove_adhoc_storage() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_remove_adhoc_storage() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_remove_job.cpp b/examples/ADM_remove_job.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f489c2c1d13b14f66ab798ebf422d6ba4bf7f001 --- /dev/null +++ b/examples/ADM_remove_job.cpp @@ -0,0 +1,34 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_remove_job \n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::remove_job(server, job); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_remove_job() failed: {}\n", e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, "ADM_remove_job() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_remove_job() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_set_dataset_information.cpp b/examples/ADM_set_dataset_information.cpp index 78d5dd5fe9ef720f39f457f7c7f5e653164c26ee..0ac0e2e79ed487a5b9fe3a92ad67e823d196ecbd 100644 --- a/examples/ADM_set_dataset_information.cpp +++ b/examples/ADM_set_dataset_information.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -13,44 +13,30 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; + ADM_job_handle_t job{}; + ADM_dataset_handle_t target{}; + ADM_dataset_info_t info{}; + ADM_return_t ret = ADM_SUCCESS; - auto endp = rpc_client.lookup(argv[1]); - - fmt::print( - stdout, - "Calling ADM_set_dataset_information remote procedure on {} with resource id {}, info {} and" - " job id {} ...\n", - argv[1], argv[2], argv[3], argv[4]); - ADM_set_dataset_information_in_t in; - try { - in.resource_id = std::stoi(argv[2]); - } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - in.info = argv[3]; try { - in.job_id = std::stoi(argv[4]); + ret = admire::set_dataset_information(server, job, target, info); } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_set_dataset_information() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - ADM_set_dataset_information_out_t out; - endp.call("ADM_set_dataset_information", &in, &out); - - - if(out.ret < 0) { + if(ret != ADM_SUCCESS) { fmt::print( stdout, - "ADM_set_dataset_information remote procedure not completed successfully\n"); + "ADM_set_dataset_information() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_set_dataset_information remote procedure completed successfully\n"); } + + fmt::print(stdout, + "ADM_set_dataset_information() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_set_io_resources.cpp b/examples/ADM_set_io_resources.cpp index 8cbebb1a43c02232ab10eaacea4c05304ce9c103..d2a3be647fe8233a7fd73600f184a12e9ca316a9 100644 --- a/examples/ADM_set_io_resources.cpp +++ b/examples/ADM_set_io_resources.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -13,43 +13,28 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_storage_handle_t tier{}; + ADM_storage_resources_t resources{}; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_set_io_resources remote procedure on {} with tier id {}, resources {} and" - " job id {} ...\n", - argv[1], argv[2], argv[3], argv[4]); - ADM_set_io_resources_in_t in; try { - in.tier_id = std::stoi(argv[2]); + ret = admire::set_io_resources(server, job, tier, resources); } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - in.resources = argv[3]; - try { - in.job_id = std::stoi(argv[4]); - } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - ADM_set_io_resources_out_t out; - - endp.call("ADM_set_io_resources", &in, &out); - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_set_io_resources remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_cancel_transfer() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_set_io_resources remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_cancel_transfer() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_set_qos_constraints.cpp b/examples/ADM_set_qos_constraints.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ed2b0e05808a3c26173978acc3212705a58718bc --- /dev/null +++ b/examples/ADM_set_qos_constraints.cpp @@ -0,0 +1,39 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 6) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print( + stderr, + "Usage: ADM_set_qos_constraints \n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_limit_t limit{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::set_qos_constraints(server, job, limit); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_set_qos_constraints() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_set_qos_constraints() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_set_qos_constraints() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_set_qos_constraints_pull.cpp b/examples/ADM_set_qos_constraints_pull.cpp deleted file mode 100644 index ee57c9fb055a068a9d16afd134870d1ce000fe3b..0000000000000000000000000000000000000000 --- a/examples/ADM_set_qos_constraints_pull.cpp +++ /dev/null @@ -1,48 +0,0 @@ -#include -#include - - -int -main(int argc, char* argv[]) { - - if(argc != 4) { - fmt::print(stderr, "ERROR: no location provided\n"); - fmt::print( - stderr, - "Usage: ADM_set_qos_constraints_pull \n"); - exit(EXIT_FAILURE); - } - - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); - - auto endp = rpc_client.lookup(argv[1]); - - fmt::print( - stdout, - "Calling ADM_set_qos_constraints_pull remote procedure on {} with scope {} and element id {} ...\n", - argv[1], argv[2], argv[3]); - ADM_set_qos_constraints_pull_in_t in; - in.scope = argv[2]; - try { - in.element_id = std::stoi(argv[3]); - } catch(const std::exception& e) { - fmt::print(stderr, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - - ADM_set_qos_constraints_pull_out_t out; - - endp.call("ADM_set_qos_constraints_pull", &in, &out); - - if(out.ret < 0) { - fmt::print( - stderr, - "ADM_set_qos_constraints_pull remote procedure not completed successfully\n"); - exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_set_qos_constraints_pull remote procedure completed successfully\n"); - } -} diff --git a/examples/ADM_set_qos_constraints_push.cpp b/examples/ADM_set_qos_constraints_push.cpp deleted file mode 100644 index e5c2b1f58735f2e430bb714a1872220d9d8021fc..0000000000000000000000000000000000000000 --- a/examples/ADM_set_qos_constraints_push.cpp +++ /dev/null @@ -1,51 +0,0 @@ -#include -#include - - -int -main(int argc, char* argv[]) { - - if(argc != 6) { - fmt::print(stderr, "ERROR: no location provided\n"); - fmt::print( - stderr, - "Usage: ADM_set_qos_constraints_push \n"); - exit(EXIT_FAILURE); - } - - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); - - auto endp = rpc_client.lookup(argv[1]); - - fmt::print( - stdout, - "Calling ADM_set_qos_constraints_push remote procedure on {} with scope {}, QoS class {}, element id {} and class value {} ...\n", - argv[1], argv[2], argv[3], argv[4], argv[5]); - ADM_set_qos_constraints_push_in_t in; - in.scope = argv[2]; - in.qos_class = argv[3]; - try { - in.element_id = std::stoi(argv[4]); - } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - in.class_value = argv[4]; - - ADM_set_qos_constraints_push_out_t out; - - endp.call("ADM_set_qos_constraints_push", &in, &out); - - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_set_qos_constraints_push remote procedure not completed successfully\n"); - exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_set_qos_constraints_push remote procedure completed successfully\n"); - } -} diff --git a/examples/ADM_set_transfer_priority.cpp b/examples/ADM_set_transfer_priority.cpp index a82e3c0929ff7d46edad53c897cb64c467c395b0..b00e7283b6392497aebdd86d3fbfaa1e89f9a3f6 100644 --- a/examples/ADM_set_transfer_priority.cpp +++ b/examples/ADM_set_transfer_priority.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -13,41 +13,28 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); + ADM_job_handle_t job{}; + ADM_transfer_handle_t tx_handle{}; + int incr = 42; + ADM_return_t ret = ADM_SUCCESS; - fmt::print( - stdout, - "Calling ADM_set_transfer_priority remote procedure on {} with transfer id {} and number of positions {}...\n", - argv[1], argv[2], argv[3]); - ADM_set_transfer_priority_in_t in; try { - in.transfer_id = std::stoi(argv[2]); + ret = admire::set_transfer_priority(server, job, tx_handle, incr); } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_set_transfer_priority() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - try { - in.n_positions = std::stoi(argv[3]); - } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); - exit(EXIT_FAILURE); - } - ADM_set_transfer_priority_out_t out; - - endp.call("ADM_set_transfer_priority", &in, &out); - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_set_transfer_priority remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_set_transfer_priority() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); - } else { - fmt::print( - stdout, - "ADM_set_transfer_priority remote procedure completed successfully\n"); } + + fmt::print(stdout, "ADM_set_transfer_priority() remote procedure completed " + "successfully\n"); } diff --git a/examples/ADM_transfer_dataset.cpp b/examples/ADM_transfer_dataset.cpp index b1dd75f5e025225e8fac200a643f47cebbc4d101..96312a7ec6193e5fd44452c2a80bde2b4a41c37d 100644 --- a/examples/ADM_transfer_dataset.cpp +++ b/examples/ADM_transfer_dataset.cpp @@ -1,5 +1,5 @@ #include -#include +#include int @@ -14,41 +14,32 @@ main(int argc, char* argv[]) { exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; + ADM_job_handle_t job{}; + ADM_dataset_handle_t** sources = nullptr; + ADM_dataset_handle_t** targets = nullptr; + ADM_limit_t** limits = nullptr; + ADM_tx_mapping_t mapping = ADM_MAPPING_ONE_TO_ONE; + ADM_transfer_handle_t tx_handle{}; + ADM_return_t ret = ADM_SUCCESS; - auto endp = rpc_client.lookup(argv[1]); - - fmt::print( - stdout, - "Calling ADM_transfer_dataset remote procedure on {} : {} -> {} using " - " qos constraints {}, distribution {} and job id {} ...\n", - argv[1], argv[2], argv[3], argv[4], argv[5], argv[6]); - ADM_transfer_dataset_in_t in; - in.source = argv[2]; - in.destination = argv[3]; - in.qos_constraints = argv[4]; - in.distribution = argv[5]; try { - in.job_id = std::stoi(argv[6]); + ret = admire::transfer_dataset(server, job, sources, targets, limits, + mapping, &tx_handle); } catch(const std::exception& e) { - fmt::print(stdout, "ERROR: Incorrect input type. Please try again.\n"); + fmt::print(stderr, "FATAL: ADM_cancel_transfer() failed: {}\n", + e.what()); exit(EXIT_FAILURE); } - ADM_transfer_dataset_out_t out; - - endp.call("ADM_transfer_dataset", &in, &out); - - if(out.ret < 0) { - fmt::print( - stdout, - "ADM_transfer_dataset remote procedure not completed successfully\n"); + if(ret != ADM_SUCCESS) { + fmt::print(stdout, + "ADM_transfer_dataset() remote procedure not completed " + "successfully\n"); exit(EXIT_FAILURE); } else { - fmt::print( - stdout, - "ADM_transfer_dataset remote procedure completed successfully\n"); + fmt::print(stdout, "ADM_transfer_dataset() remote procedure completed " + "successfully\n"); } } diff --git a/examples/ADM_update_adhoc_storage.cpp b/examples/ADM_update_adhoc_storage.cpp new file mode 100644 index 0000000000000000000000000000000000000000..fc4355271bbf3ca0dcf1cedcd21e63140fb59baf --- /dev/null +++ b/examples/ADM_update_adhoc_storage.cpp @@ -0,0 +1,41 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_update_adhoc_storage " + "\n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_adhoc_context_t ctx{}; + ADM_adhoc_storage_handle_t adhoc_handle{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::update_adhoc_storage(server, job, ctx, adhoc_handle); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_update_adhoc_storage() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print( + stdout, + "ADM_update_adhoc_storage() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, + "ADM_update_adhoc_storage() remote procedure completed " + "successfully\n"); +} diff --git a/examples/ADM_update_job.cpp b/examples/ADM_update_job.cpp new file mode 100644 index 0000000000000000000000000000000000000000..ab82690fb8b87ca71bfe548909c52eaca5ec03f7 --- /dev/null +++ b/examples/ADM_update_job.cpp @@ -0,0 +1,36 @@ +#include +#include + + +int +main(int argc, char* argv[]) { + + if(argc != 3) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, "Usage: ADM_update_job \n"); + exit(EXIT_FAILURE); + } + + admire::server server{"tcp", argv[1]}; + + ADM_job_handle_t job{}; + ADM_job_requirements_t reqs{}; + ADM_dataset_info_t info{}; + ADM_return_t ret = ADM_SUCCESS; + + try { + ret = admire::update_job(server, job, reqs); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_update_job() failed: {}\n", e.what()); + exit(EXIT_FAILURE); + } + + if(ret != ADM_SUCCESS) { + fmt::print(stdout, "ADM_update_job() remote procedure not completed " + "successfully\n"); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_update_job() remote procedure completed " + "successfully\n"); +} diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 884538459e056bb6b7d7f80a5c366d85f95f3b7c..e7620f25ebad4ee8ab2f37b42c77aeeffe391745 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -23,17 +23,19 @@ ################################################################################ list(APPEND examples - ping ADM_input ADM_output ADM_inout ADM_adhoc_context ADM_adhoc_context_id - ADM_adhoc_nodes ADM_adhoc_walltime ADM_adhoc_access ADM_adhoc_distribution - ADM_adhoc_background_flush ADM_in_situ_ops ADM_in_transit_ops ADM_transfer_dataset + ping + ADM_register_job ADM_update_job ADM_remove_job + ADM_register_adhoc_storage ADM_update_adhoc_storage + ADM_remove_adhoc_storage ADM_deploy_adhoc_storage + ADM_in_situ_ops ADM_in_transit_ops ADM_transfer_dataset ADM_set_dataset_information ADM_set_io_resources ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers - ADM_set_qos_constraints_push ADM_define_data_operation ADM_connect_data_operation + ADM_set_qos_constraints ADM_get_qos_constraints ADM_define_data_operation ADM_connect_data_operation ADM_finalize_data_operation ADM_link_transfer_to_data_operation ADM_get_statistics) foreach (example IN LISTS examples) add_executable(${example}) target_sources(${example} PRIVATE ${example}.cpp) - target_link_libraries(${example} PUBLIC network_engine fmt::fmt) + target_link_libraries(${example} + PUBLIC network_engine fmt::fmt adm_iosched) endforeach() - diff --git a/examples/ADM_adhoc_access.cpp b/examples/deprecated/ADM_adhoc_access.cpp similarity index 99% rename from examples/ADM_adhoc_access.cpp rename to examples/deprecated/ADM_adhoc_access.cpp index 2d28cf3a65da24872aee2e5e871170363f35bb3c..13b6b2c9e13a4c1ec26851a4facf604bf5d3c818 100644 --- a/examples/ADM_adhoc_access.cpp +++ b/examples/deprecated/ADM_adhoc_access.cpp @@ -1,7 +1,6 @@ #include #include - int main(int argc, char* argv[]) { @@ -38,4 +37,3 @@ main(int argc, char* argv[]) { "ADM_adhoc_access remote procedure completed successfully\n"); } } - diff --git a/examples/ADM_adhoc_background_flush.cpp b/examples/deprecated/ADM_adhoc_background_flush.cpp similarity index 87% rename from examples/ADM_adhoc_background_flush.cpp rename to examples/deprecated/ADM_adhoc_background_flush.cpp index e8b5207139644341cf429c52669ed09fa0c476e9..588d227a89beea66c36385eeae3935787027fbfd 100644 --- a/examples/ADM_adhoc_background_flush.cpp +++ b/examples/deprecated/ADM_adhoc_background_flush.cpp @@ -8,7 +8,8 @@ string_to_convert(std::string s) { } else if(s == "false" || s == "FALSE" || s == "False") { return false; } else { - throw std::invalid_argument("ERROR: Incorrect input value. Please try again.\n"); + throw std::invalid_argument( + "ERROR: Incorrect input value. Please try again.\n"); } } @@ -38,8 +39,8 @@ main(int argc, char* argv[]) { in.b_flush = string_to_convert(argv[2]); } catch(const std::invalid_argument& ia) { fmt::print( - stderr, - "ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); + stderr, + "ERROR: Incorrect input value. Please introduce TRUE/FALSE value. \n"); exit(EXIT_FAILURE); } @@ -57,4 +58,4 @@ main(int argc, char* argv[]) { stdout, "ADM_adhoc_background_flush remote procedure completed successfully\n"); } -} \ No newline at end of file +} diff --git a/examples/ADM_adhoc_context.cpp b/examples/deprecated/ADM_adhoc_context.cpp similarity index 100% rename from examples/ADM_adhoc_context.cpp rename to examples/deprecated/ADM_adhoc_context.cpp diff --git a/examples/ADM_adhoc_context_id.cpp b/examples/deprecated/ADM_adhoc_context_id.cpp similarity index 100% rename from examples/ADM_adhoc_context_id.cpp rename to examples/deprecated/ADM_adhoc_context_id.cpp diff --git a/examples/ADM_adhoc_distribution.cpp b/examples/deprecated/ADM_adhoc_distribution.cpp similarity index 100% rename from examples/ADM_adhoc_distribution.cpp rename to examples/deprecated/ADM_adhoc_distribution.cpp diff --git a/examples/ADM_adhoc_nodes.cpp b/examples/deprecated/ADM_adhoc_nodes.cpp similarity index 100% rename from examples/ADM_adhoc_nodes.cpp rename to examples/deprecated/ADM_adhoc_nodes.cpp diff --git a/examples/ADM_adhoc_walltime.cpp b/examples/deprecated/ADM_adhoc_walltime.cpp similarity index 100% rename from examples/ADM_adhoc_walltime.cpp rename to examples/deprecated/ADM_adhoc_walltime.cpp diff --git a/examples/ADM_inout.cpp b/examples/deprecated/ADM_inout.cpp similarity index 100% rename from examples/ADM_inout.cpp rename to examples/deprecated/ADM_inout.cpp diff --git a/examples/ADM_input.cpp b/examples/deprecated/ADM_input.cpp similarity index 100% rename from examples/ADM_input.cpp rename to examples/deprecated/ADM_input.cpp diff --git a/examples/ADM_output.cpp b/examples/deprecated/ADM_output.cpp similarity index 100% rename from examples/ADM_output.cpp rename to examples/deprecated/ADM_output.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8af57333f12b9ae06e013d9262e4e93a22929f03..84cf47cfb1b02364d6afd224fe0d7f900fafe27d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -38,3 +38,5 @@ target_link_libraries( ) install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR}) + +add_subdirectory(api) diff --git a/src/api/CMakeLists.txt b/src/api/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ae30d58f94680bcbcfbcb507ee9cfc96b2102a48 --- /dev/null +++ b/src/api/CMakeLists.txt @@ -0,0 +1,40 @@ +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +add_library(adm_iosched SHARED c_wrapper.cpp admire.hpp) + +target_sources(adm_iosched + PUBLIC admire.h + PRIVATE admire.cpp) + +target_include_directories(adm_iosched PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) + +target_link_libraries(adm_iosched PRIVATE network_engine) + +install( + TARGETS adm_iosched + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/adm_iosched +) diff --git a/src/api/admire.cpp b/src/api/admire.cpp new file mode 100644 index 0000000000000000000000000000000000000000..c9ab9e7b0f241fb2e677ef5d5e404a365f036a52 --- /dev/null +++ b/src/api/admire.cpp @@ -0,0 +1,686 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include + + +namespace { + +void +init_library() __attribute__((constructor)); + +void +init_logger(); + +void +init_library() { + init_logger(); +} + +/** Logging for the library */ +void +init_logger() { + // for now, just create a simple console logger + scord::logger::create_global_logger("libadm_iosched", "console color"); +} + +} // namespace + + +namespace admire { + +ADM_return_t +register_job(const server& srv, ADM_job_requirements_t reqs, + ADM_job_handle_t* job) { + (void) srv; + (void) reqs; + (void) job; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_register_job(...)"); + + ADM_register_job_in_t in{}; + ADM_register_job_out_t out; + + endp.call("ADM_register_job", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_register_job() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +update_job(const server& srv, ADM_job_handle_t job, + ADM_job_requirements_t reqs) { + (void) srv; + (void) job; + (void) reqs; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_update_job(...)"); + + ADM_update_job_in_t in{}; + ADM_update_job_out_t out; + + endp.call("ADM_update_job", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_update_job() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_update_job() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +remove_job(const server& srv, ADM_job_handle_t job) { + (void) srv; + (void) job; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_remove_job(...)"); + + ADM_remove_job_in_t in{}; + ADM_remove_job_out_t out; + + endp.call("ADM_remove_job", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_remove_job() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_remove_job() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +register_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t* adhoc_handle) { + (void) srv; + (void) job; + (void) ctx; + (void) adhoc_handle; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_register_adhoc_storage(...)"); + + ADM_register_adhoc_storage_in_t in{}; + ADM_register_adhoc_storage_out_t out; + + endp.call("ADM_register_adhoc_storage", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_register_adhoc_storage() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_register_adhoc_storage() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +update_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t adhoc_handle) { + (void) srv; + (void) job; + (void) ctx; + (void) adhoc_handle; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_update_adhoc_storage(...)"); + + ADM_update_adhoc_storage_in_t in{}; + ADM_update_adhoc_storage_out_t out; + + endp.call("ADM_update_adhoc_storage", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_update_adhoc_storage() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_update_adhoc_storage() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +remove_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle) { + (void) srv; + (void) job; + (void) adhoc_handle; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_remove_adhoc_storage(...)"); + + ADM_remove_adhoc_storage_in_t in{}; + ADM_remove_adhoc_storage_out_t out; + + endp.call("ADM_remove_adhoc_storage", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_remove_adhoc_storage() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_remove_adhoc_storage() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +deploy_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle) { + (void) srv; + (void) job; + (void) adhoc_handle; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_deploy_adhoc_storage(...)"); + + ADM_deploy_adhoc_storage_in_t in{}; + ADM_deploy_adhoc_storage_out_t out; + + endp.call("ADM_deploy_adhoc_storage", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_deploy_adhoc_storage() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_deploy_adhoc_storage() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +transfer_dataset(const server& srv, ADM_job_handle_t job, + ADM_dataset_handle_t** sources, ADM_dataset_handle_t** targets, + ADM_limit_t** limits, ADM_tx_mapping_t mapping, + ADM_transfer_handle_t* tx_handle) { + (void) srv; + (void) job; + (void) sources; + (void) targets; + (void) limits; + (void) mapping; + (void) tx_handle; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_transfer_dataset(...)"); + + ADM_transfer_dataset_in_t in{}; + ADM_transfer_dataset_out_t out; + + endp.call("ADM_transfer_dataset", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_transfer_dataset() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_transfer_dataset() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +set_dataset_information(const server& srv, ADM_job_handle_t job, + ADM_dataset_handle_t target, ADM_dataset_info_t info) { + (void) srv; + (void) job; + (void) target; + (void) info; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_set_dataset_information(...)"); + + ADM_set_dataset_information_in_t in{}; + ADM_set_dataset_information_out_t out; + + endp.call("ADM_set_dataset_information", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_set_dataset_information() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_set_dataset_information() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +set_io_resources(const server& srv, ADM_job_handle_t job, + ADM_storage_handle_t tier, ADM_storage_resources_t resources) { + (void) srv; + (void) job; + (void) tier; + (void) resources; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_set_io_resources(...)"); + + ADM_set_io_resources_in_t in{}; + ADM_set_io_resources_out_t out; + + endp.call("ADM_set_io_resources", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_set_io_resources() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_set_io_resources() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +get_transfer_priority(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, + ADM_transfer_priority_t* priority) { + (void) srv; + (void) job; + (void) tx_handle; + (void) priority; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_get_transfer_priority(...)"); + + ADM_get_transfer_priority_in_t in{}; + ADM_get_transfer_priority_out_t out; + + endp.call("ADM_get_transfer_priority", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_get_transfer_priority() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_get_transfer_priority() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +set_transfer_priority(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, int incr) { + (void) srv; + (void) job; + (void) tx_handle; + (void) incr; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_set_transfer_priority(...)"); + + ADM_set_transfer_priority_in_t in{}; + ADM_set_transfer_priority_out_t out; + + endp.call("ADM_set_transfer_priority", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_set_transfer_priority() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_set_transfer_priority() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +cancel_transfer(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle) { + + (void) job; + (void) tx_handle; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_cancel_transfer(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_cancel_transfer_in_t in{42}; + ADM_cancel_transfer_out_t out; + + endp.call("ADM_cancel_transfer", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_cancel_transfer() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_cancel_transfer() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +get_pending_transfers(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t** pending_transfers) { + (void) srv; + (void) job; + (void) pending_transfers; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_get_pending_transfers(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_get_pending_transfers_in_t in{}; + ADM_get_pending_transfers_out_t out; + + endp.call("ADM_get_pending_transfers", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_get_pending_transfers() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_get_pending_transfers() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +set_qos_constraints(const server& srv, ADM_job_handle_t job, + ADM_limit_t limit) { + (void) srv; + (void) job; + (void) limit; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_set_qos_constraints(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_set_qos_constraints_in_t in{}; + ADM_set_qos_constraints_out_t out; + + endp.call("ADM_set_qos_constraints", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_set_qos_constraints() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_set_qos_constraints() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +get_qos_constraints(const server& srv, ADM_job_handle_t job, + ADM_qos_scope_t scope, ADM_qos_entity_t entity, + ADM_limit_t** limits) { + (void) srv; + (void) job; + (void) scope; + (void) entity; + (void) limits; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_get_qos_constraints(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_get_qos_constraints_in_t in{}; + ADM_get_qos_constraints_out_t out; + + endp.call("ADM_get_qos_constraints", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_get_qos_constraints() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_get_qos_constraints() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +define_data_operation(const server& srv, ADM_job_handle_t job, const char* path, + ADM_data_operation_handle_t* op, va_list args) { + (void) srv; + (void) job; + (void) path; + (void) op; + (void) args; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_define_data_operation(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_define_data_operation_in_t in{}; + ADM_define_data_operation_out_t out; + + endp.call("ADM_define_data_operation", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_define_data_operation() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_define_data_operation() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +connect_data_operation(const server& srv, ADM_job_handle_t job, + ADM_dataset_handle_t input, ADM_dataset_handle_t output, + bool should_stream, va_list args) { + (void) srv; + (void) job; + (void) input; + (void) output; + (void) should_stream; + (void) args; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_connect_data_operation(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_connect_data_operation_in_t in{}; + ADM_connect_data_operation_out_t out; + + endp.call("ADM_connect_data_operation", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_connect_data_operation() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_connect_data_operation() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +finalize_data_operation(const server& srv, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + ADM_data_operation_status_t* status) { + (void) srv; + (void) job; + (void) op; + (void) status; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_finalize_data_operation(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_finalize_data_operation_in_t in{}; + ADM_finalize_data_operation_out_t out; + + endp.call("ADM_finalize_data_operation", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_finalize_data_operation() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_finalize_data_operation() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +link_transfer_to_data_operation(const server& srv, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + bool should_stream, va_list args) { + (void) srv; + (void) job; + (void) op; + (void) should_stream; + (void) args; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_link_transfer_to_data_operation(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_link_transfer_to_data_operation_in_t in{}; + ADM_link_transfer_to_data_operation_out_t out; + + endp.call("ADM_link_transfer_to_data_operation", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_link_transfer_to_data_operation() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_link_transfer_to_data_operation() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +ADM_return_t +get_statistics(const server& srv, ADM_job_handle_t job, + ADM_job_stats_t** stats) { + (void) srv; + (void) job; + (void) stats; + + scord::network::rpc_client rpc_client{srv.m_protocol}; + + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_get_statistics(...)"); + + // FIXME: change RPC fields to ADM_transfer_handle_t + ADM_get_statistics_in_t in{}; + ADM_get_statistics_out_t out; + + endp.call("ADM_get_statistics", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_get_statistics() = {}", out.ret); + return static_cast(out.ret); + } + + LOGGER_INFO("ADM_get_statistics() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +} // namespace admire diff --git a/src/api/admire.h b/src/api/admire.h new file mode 100644 index 0000000000000000000000000000000000000000..73c9e19064a5ba340ba6e8af379da13ce6abe00a --- /dev/null +++ b/src/api/admire.h @@ -0,0 +1,522 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_ADMIRE_H +#define SCORD_ADMIRE_H + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define ADM_IOSCHED_API_VERSION "0.1.0" +#define ADM_IOSCHED_API_VERSION_MAJOR 0 +#define ADM_IOSCHED_API_VERSION_MINOR 1 +#define ADM_IOSCHED_API_VERSION_PATCH 0 + +/** + * Public type and struct definitions + */ + +/* Error return codes */ +enum ADM_return_t { + ADM_SUCCESS = 0, + // FIXME: generic error only for now + ADM_OTHER_ERROR +}; + +/* A server */ +typedef struct adm_server ADM_server_t; + +/* A node */ +typedef const char* ADM_node_t; + + +/* A dataset handle */ +typedef struct adm_dataset* ADM_dataset_handle_t; + +/* A job handle */ +typedef struct adm_job* ADM_job_handle_t; + +/* The scope affected by a QoS limit */ +enum ADM_qos_scope_t { + ADM_QOS_SCOPE_DATASET, + ADM_QOS_SCOPE_NODE, + ADM_QOS_SCOPE_JOB +}; + +/** The class of QoS limit applied to a scope */ +enum ADM_qos_class_t { ADM_QOS_CLASS_BANDWIDTH, ADM_QOS_CLASS_IOPS }; + +/** An ADMIRE entity upon which QoS can be defined */ +union ADM_qos_entity_t { + ADM_node_t l_node; + ADM_job_handle_t l_job; + ADM_dataset_handle_t l_dataset; +}; + +/** A QoS limit */ +typedef struct { + // TODO: empty for now + ADM_qos_scope_t l_scope; + ADM_qos_class_t l_class; + ADM_qos_entity_t l_element; +} ADM_limit_t; + +/** A transfer mapping */ +typedef enum { + ADM_MAPPING_ONE_TO_ONE, + ADM_MAPPING_ONE_TO_N, + ADM_MAPPING_N_TO_N +} ADM_tx_mapping_t; + +/** A handle to a created transfer */ +typedef struct { + // TODO: empty for now +} ADM_transfer_handle_t; + +/** Information about a dataset */ +typedef struct { + // TODO: empty for now +} ADM_dataset_info_t; + +/** A storage tier handle */ +typedef struct { + // TODO: empty for now +} ADM_storage_handle_t; + +/** Information about resources assigned to a storage tier */ +typedef struct { + // TODO: empty for now +} ADM_storage_resources_t; + +typedef int ADM_transfer_priority_t; + +typedef struct { + // TODO: empty for now +} ADM_data_operation_handle_t; + +typedef struct { + // TODO: empty for now +} ADM_data_operation_status_t; + +typedef struct { + // TODO: empty for now +} ADM_job_stats_t; + +/** Execution modes for an adhoc storage system */ +typedef enum { + ADM_ADHOC_MODE_IN_JOB_SHARED, + ADM_ADHOC_MODE_IN_JOB_DEDICATED, + ADM_ADHOC_MODE_SEPARATE_NEW, + ADM_ADHOC_MODE_SEPARATE_EXISTING +} ADM_adhoc_mode_t; + +/** Access modes for an adhoc storage system */ +typedef enum { + ADM_ADHOC_ACCESS_RDONLY, + ADM_ADHOC_ACCESS_WRONLY, + ADM_ADHOC_ACCESS_RDWR, +} ADM_adhoc_access_t; + +/** Abstract type to represent data distributions for adhoc storage systems */ +typedef struct adm_adhoc_data_distribution* ADM_adhoc_data_distribution_t; + +/** The context for an adhoc storage instance */ +typedef struct { + /** The adhoc storage system execution mode */ + ADM_adhoc_mode_t c_mode; + /** The adhoc storage system access type */ + ADM_adhoc_access_t c_access; + /** The number of nodes for the adhoc storage system */ + uint32_t c_nodes; + /** The adhoc storage system walltime */ + uint32_t c_walltime; + /** Whether the adhoc storage system should flush data in the background */ + bool c_should_bg_flush; +} ADM_adhoc_context_t; + +typedef ADM_adhoc_context_t* ADM_adhoc_storage_handle_t; + +/** The I/O requirements for a job */ +typedef struct { + /** A list of input datasets */ + ADM_dataset_handle_t** r_inputs; + /** A list of output datasets */ + ADM_dataset_handle_t** r_outputs; + /** A definition for a specific adhoc storage instance */ + ADM_adhoc_storage_handle_t r_adhoc_storage; +} ADM_job_requirements_t; + + +/******************************************************************************/ +/* Public prototypes */ +/******************************************************************************/ + +/** + * Register a job and its requirements. + * + * @param[in] server The server to which the request is directed + * @param[in] reqs The requirements for the job. + * @param[out] job An ADHOC_HANDLE referring to the newly-created + * adhoc storage instance. + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, + ADM_job_handle_t* job); + +ADM_return_t +ADM_update_job(ADM_server_t server, ADM_job_handle_t job, + ADM_job_requirements_t reqs); + +ADM_return_t +ADM_remove_job(ADM_server_t server, ADM_job_handle_t job); + +/** + * Register an adhoc storage system. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] ctx The EXECUTION_CONTEXT for the adhoc storage system. + * @param[out] adhoc_handle An ADHOC_HANDLE referring to the newly-created + * adhoc storage instance. + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t* adhoc_handle); + +/** + * Update an already-registered adhoc storage system. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] ctx The updated EXECUTION_CONTEXT for the adhoc storage system. + * @param[in] adhoc_handle An ADHOC_HANDLE referring to the adhoc storage + * instance of interest. + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t adhoc_handle); + +/** + * Remove an already-registered adhoc storage system. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] adhoc_handle An ADHOC_HANDLE referring to the adhoc storage + * instance of interest. + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle); + +/** + * Initiate the deployment of an adhoc storage system instance. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] adhoc_handle An ADHOC_HANDLE referring to the adhoc storage + * instance of interest. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle); + +/** + * Transfers the dataset identified by the source_name to the storage tier + * defined by destination_name, and apply the provided constraints during the + * transfer. This function returns a handle that can be used to track the + * operation (i.e., get statistics, or status). + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] sources A list of DATASETs identifying the source dataset/s + * to be transferred. + * @param[in] targets A list of DATASETs identifying the destination + * dataset/s and its/their desired locations in a storage tier. + * @param[in] limits A list of QOS_CONSTRAINTS that must be applied to + * the transfer. These may not exceed the global ones set at node, application, + * or resource level. + * @param[in] mapping A distribution strategy for the transfers (e.g. + * ONE_TO_ONE, ONE_TO_MANY, MANY_TO_MANY) + * @param[out] transfer_handle A TRANSFER_HANDLE allowing clients to interact + * with the transfer (e.g. wait for its completion, query its status, cancel it, + * etc. + * @return Returns if the remote procedure has been completed + * successfully or not. + */ +ADM_return_t +ADM_transfer_dataset(ADM_server_t server, ADM_job_handle_t job, + ADM_dataset_handle_t** sources, + ADM_dataset_handle_t** targets, ADM_limit_t** limits, + ADM_tx_mapping_t mapping, + ADM_transfer_handle_t* tx_handle); + + +/** + * Sets information for the dataset identified by resource_id. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] target A DATASET_HANDLE referring to the dataset of interest. + * @param[in] info A DATASET_INFO with information about the + * dataset (e.g. its lifespan, access methods, intended usage, etc.). + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_set_dataset_information(ADM_server_t server, ADM_job_handle_t job, + ADM_dataset_handle_t target, + ADM_dataset_info_t info); + +/** + * Changes the I/O resources used by a storage tier, typically an Ad hoc Storage + * System. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] tier A STORAGE_HANDLE referring to the target storage tier. + * @param[in] resources A RESOURCES argument containing information + * about the I/O resources to set (e.g. number of I/O nodes.). + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_set_io_resources(ADM_server_t server, ADM_job_handle_t job, + ADM_storage_handle_t tier, + ADM_storage_resources_t resources); + + +/** + * Returns the priority of the pending transfer identified by transfer_id. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] tx_handle A TRANSFER_HANDLE referring to a pending transfer + * @param[out] priority The priority of the pending transfer or an error code if + * it didn’t exist or is no longer pending. + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, + ADM_transfer_priority_t* priority); + + +/** + * Moves the operation identified by transfer_id up or down by n positions in + * its scheduling queue. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] tx_handle A TRANSFER_HANDLE referring to a pending transfer + * @param[in] incr A positive or negative number for the number of + * positions the transfer should go up or down in its scheduling queue. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, int incr); + + +/** + * Cancels the pending transfer identified by transfer_id. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] tx_handle A TRANSFER_HANDLE referring to a pending transfer. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_cancel_transfer(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle); + + +/** + * Returns a list of pending transfers. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[out] pending_transfers A list of pending_transfers. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t** pending_transfers); + + +/** + * Registers a QoS constraint defined by class, scope, and value for the element + * identified by id. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] limit A QOS_LIMIT specifying at least: + * - The QOS_SCOPE the limit should be applied to: e.g. + * dataset, node, or job. + * - The QOS_CLASS of the limit (e.g. "bandwidth", "iops", + * etc.). + * - The QOS_ENTITY it should be applied to (e.g. job, node, + * dataset, etc.) + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, + ADM_limit_t limit); + + +/** + * Returns a list of QoS constraints defined for an element identified for id. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] scope The scope being queried: dataset, node, or job. + * @param[in] entity An QOS_ENTITY referring to the target of the query, i.e. a + * RESOURCE_HANDLE, a NODE hostname, or a JOB_HANDLE. + * @param[in] limits A list of QOS_LIMITS that includes all the classes + * currently defined for the element as well as the values set for them. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, + ADM_qos_scope_t scope, ADM_qos_entity_t entity, + ADM_limit_t** limits); + + +/** + * Defines a new operation, with the code found in path. The code will be + * identified by the user-provided operation_id and will accept the arguments + * defined, using the next format "arg0, arg1, arg2, ... ". + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] path A valid path for the operation executable. + * @param[in] ... A list of ARGUMENTS for the operation. + * @param[out] op An OPERATION_HANDLE for the newly-defined operation. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_define_data_operation(ADM_server_t server, ADM_job_handle_t job, + const char* path, ADM_data_operation_handle_t* op, + ...); + + +/** + * Connects and starts the data operation referred to by OPERATION_HANDLE and + * with the arguments, using the input and output data storage (i.e., files). If + * the operation can be executed in a streaming fashion (i.e., it can start even + * if the input data is not entirely available), the stream parameter must be + * set to true. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] op The OPERATION_HANDLE of the operation to be connected. + * @param[in] input An input DATASET_HANDLE for the operation. + * @param[in] output An output DATASET_HANDLE where the result of + * the operation should be stored. + * @param[in] should_stream A boolean indicating if the operation + * should be executed in a streaming fashion. + * @param[in] ... The VALUES for the arguments required by the operation. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_connect_data_operation(ADM_server_t server, ADM_job_handle_t job, + ADM_dataset_handle_t input, + ADM_dataset_handle_t output, bool should_stream, + ...); + + +/** + * Finalises the operation defined with operation_id. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] op The OPERATION_HANDLE of the operation to be connected. + * @return[out] status An OPERATION_STATUS type indicating whether the + * operation was successful. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + ADM_data_operation_status_t* status); + + +/** + * Links the data operation defined with operation_id with the pending transfer + * identified by transf er_id using the values provided as arguments. If the + * operation can be executed in a streaming fashion (i.e., it can start even if + * the input data is not entirely available), the stream parameter must be set + * to true. + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] op The OPERATION_HANDLE of the operation to be connected. + * @param[in] tx_handle The TRANSFER_HANDLE referring to the pending transfer + * the operation should be linked to. + * @param[in] job A JOB_HANDLE identifying the originating job. + * @param[in] should_stream A boolean indicating whether the operation + * should be executed in a streaming fashion. + * @param[in] ... The VALUES for the arguments required by the operation. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + bool should_stream, ...); + + +/** + * Returns the current I/O statistics for a specified job_id and an optional + * corresponding job_step. The information will be returned in an + * easy-to-process format, e.g., JSON (see Listing 3.1). + * + * @param[in] server The server to which the request is directed + * @param[in] job A JOB_HANDLE identifying the originating job and, + * optionally, its JOB_STEP. + * @return[out] stats A list of JOB_STATS. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_get_statistics(ADM_server_t server, ADM_job_handle_t job, + ADM_job_stats_t** stats); + +#ifdef __cplusplus +} // extern "C" +#endif + +#endif // SCORD_ADMIRE_H diff --git a/src/api/admire.hpp b/src/api/admire.hpp new file mode 100644 index 0000000000000000000000000000000000000000..e340c1e6c9273a2ec82fa3fc454ece52ff7e28f2 --- /dev/null +++ b/src/api/admire.hpp @@ -0,0 +1,132 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include + +#ifndef SCORD_ADMIRE_HPP +#define SCORD_ADMIRE_HPP + +namespace admire { + +struct server { + std::string m_protocol; + std::string m_address; +}; + +ADM_return_t +register_job(const server& srv, ADM_job_requirements_t reqs, + ADM_job_handle_t* job); + +ADM_return_t +update_job(const server& srv, ADM_job_handle_t job, + ADM_job_requirements_t reqs); + +ADM_return_t +remove_job(const server& srv, ADM_job_handle_t job); + +ADM_return_t +register_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t* adhoc_handle); + +ADM_return_t +update_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t adhoc_handle); + +ADM_return_t +remove_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle); + +ADM_return_t +deploy_adhoc_storage(const server& srv, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle); + +ADM_return_t +transfer_dataset(const server& srv, ADM_job_handle_t job, + ADM_dataset_handle_t** sources, ADM_dataset_handle_t** targets, + ADM_limit_t** limits, ADM_tx_mapping_t mapping, + ADM_transfer_handle_t* tx_handle); + +ADM_return_t +set_dataset_information(const server& srv, ADM_job_handle_t job, + ADM_dataset_handle_t target, ADM_dataset_info_t info); + +ADM_return_t +set_io_resources(const server& srv, ADM_job_handle_t job, + ADM_storage_handle_t tier, ADM_storage_resources_t resources); + +ADM_return_t +get_transfer_priority(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, + ADM_transfer_priority_t* priority); + +ADM_return_t +set_transfer_priority(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, int incr); + +ADM_return_t +cancel_transfer(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle); + +ADM_return_t +get_pending_transfers(const server& srv, ADM_job_handle_t job, + ADM_transfer_handle_t** pending_transfers); + +ADM_return_t +set_qos_constraints(const server& srv, ADM_job_handle_t job, ADM_limit_t limit); + +ADM_return_t +get_qos_constraints(const server& srv, ADM_job_handle_t job, + ADM_qos_scope_t scope, ADM_qos_entity_t entity, + ADM_limit_t** limits); + +ADM_return_t +define_data_operation(const server& srv, ADM_job_handle_t job, const char* path, + ADM_data_operation_handle_t* op, va_list args); + +ADM_return_t +connect_data_operation(const server& srv, ADM_job_handle_t job, + ADM_dataset_handle_t input, ADM_dataset_handle_t output, + bool should_stream, va_list args); + +ADM_return_t +finalize_data_operation(const server& srv, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + ADM_data_operation_status_t* status); + +ADM_return_t +link_transfer_to_data_operation(const server& srv, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + bool should_stream, va_list args); + +ADM_return_t +get_statistics(const server& srv, ADM_job_handle_t job, + ADM_job_stats_t** stats); + +} // namespace admire + +#endif // SCORD_ADMIRE_HPP diff --git a/src/api/c_wrapper.cpp b/src/api/c_wrapper.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9c4547958a27ee3025cf1858e51c48cc20408f62 --- /dev/null +++ b/src/api/c_wrapper.cpp @@ -0,0 +1,249 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include + +struct adm_server { + const char* s_protocol; + const char* s_address; +}; + +ADM_return_t +ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, + ADM_job_handle_t* job) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::register_job(srv, reqs, job); +} + +ADM_return_t +ADM_update_job(ADM_server_t server, ADM_job_handle_t job, + ADM_job_requirements_t reqs) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::update_job(srv, job, reqs); +} + +ADM_return_t +ADM_remove_job(ADM_server_t server, ADM_job_handle_t job) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::remove_job(srv, job); +} + +ADM_return_t +ADM_register_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t* adhoc_handle) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::register_adhoc_storage(srv, job, ctx, adhoc_handle); +} + +ADM_return_t +ADM_update_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_context_t ctx, + ADM_adhoc_storage_handle_t adhoc_handle) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::update_adhoc_storage(srv, job, ctx, adhoc_handle); +} + +ADM_return_t +ADM_remove_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::remove_adhoc_storage(srv, job, adhoc_handle); +} + +ADM_return_t +ADM_deploy_adhoc_storage(ADM_server_t server, ADM_job_handle_t job, + ADM_adhoc_storage_handle_t adhoc_handle) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::deploy_adhoc_storage(srv, job, adhoc_handle); +} + +ADM_return_t +ADM_transfer_dataset(ADM_server_t server, ADM_job_handle_t job, + ADM_dataset_handle_t** sources, + ADM_dataset_handle_t** targets, ADM_limit_t** limits, + ADM_tx_mapping_t mapping, + ADM_transfer_handle_t* tx_handle) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::transfer_dataset(srv, job, sources, targets, limits, mapping, + tx_handle); +} + +ADM_return_t +ADM_set_dataset_information(ADM_server_t server, ADM_job_handle_t job, + ADM_dataset_handle_t target, + ADM_dataset_info_t info) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::set_dataset_information(srv, job, target, info); +} + +ADM_return_t +ADM_set_io_resources(ADM_server_t server, ADM_job_handle_t job, + ADM_storage_handle_t tier, + ADM_storage_resources_t resources) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::set_io_resources(srv, job, tier, resources); +} + +ADM_return_t +ADM_get_transfer_priority(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, + ADM_transfer_priority_t* priority) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::get_transfer_priority(srv, job, tx_handle, priority); +} + +ADM_return_t +ADM_set_transfer_priority(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle, int incr) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::set_transfer_priority(srv, job, tx_handle, incr); +} + +ADM_return_t +ADM_cancel_transfer(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t tx_handle) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::cancel_transfer(srv, job, tx_handle); +} + +ADM_return_t +ADM_get_pending_transfers(ADM_server_t server, ADM_job_handle_t job, + ADM_transfer_handle_t** pending_transfers) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::get_pending_transfers(srv, job, pending_transfers); +} + +ADM_return_t +ADM_set_qos_constraints(ADM_server_t server, ADM_job_handle_t job, + ADM_limit_t limit) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::set_qos_constraints(srv, job, limit); +} + +ADM_return_t +ADM_get_qos_constraints(ADM_server_t server, ADM_job_handle_t job, + ADM_qos_scope_t scope, ADM_qos_entity_t entity, + ADM_limit_t** limits) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::get_qos_constraints(srv, job, scope, entity, limits); +} + +ADM_return_t +ADM_define_data_operation(ADM_server_t server, ADM_job_handle_t job, + const char* path, ADM_data_operation_handle_t* op, + ...) { + + const admire::server srv{server.s_protocol, server.s_address}; + + va_list args; + va_start(args, op); + auto ret = admire::define_data_operation(srv, job, path, op, args); + va_end(args); + + return ret; +} + +ADM_return_t +ADM_connect_data_operation(ADM_server_t server, ADM_job_handle_t job, + ADM_dataset_handle_t input, + ADM_dataset_handle_t output, bool should_stream, + ...) { + + const admire::server srv{server.s_protocol, server.s_address}; + + va_list args; + va_start(args, should_stream); + auto ret = admire::connect_data_operation(srv, job, input, output, + should_stream, args); + va_end(args); + + return ret; +} + +ADM_return_t +ADM_finalize_data_operation(ADM_server_t server, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + ADM_data_operation_status_t* status) { + + const admire::server srv{server.s_protocol, server.s_address}; + + return admire::finalize_data_operation(srv, job, op, status); +} + +ADM_return_t +ADM_link_transfer_to_data_operation(ADM_server_t server, ADM_job_handle_t job, + ADM_data_operation_handle_t op, + bool should_stream, ...) { + + const admire::server srv{server.s_protocol, server.s_address}; + + va_list args; + va_start(args, should_stream); + auto ret = admire::link_transfer_to_data_operation(srv, job, op, + should_stream, args); + va_end(args); + + return ret; +} + +ADM_return_t +ADM_get_statistics(ADM_server_t server, ADM_job_handle_t job, + ADM_job_stats_t** stats) { + const admire::server srv{server.s_protocol, server.s_address}; + return admire::get_statistics(srv, job, stats); +} diff --git a/src/network/CMakeLists.txt b/src/network/CMakeLists.txt index 23fbb115f18ada994433702997204a1e1dfd1693..38e7e5b68570b232d120dd4792399cf18697f54c 100644 --- a/src/network/CMakeLists.txt +++ b/src/network/CMakeLists.txt @@ -34,3 +34,4 @@ target_link_libraries( network_engine PUBLIC logger transport_library Mercury::Mercury Argobots::Argobots Margo::Margo ) +set_property(TARGET network_engine PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/src/network/engine.hpp b/src/network/engine.hpp index dc8a15d3b7d73eeba990b67faa1969f5b6726924..c38a60f45a34b854fd423a4f377f40d8d011792c 100644 --- a/src/network/engine.hpp +++ b/src/network/engine.hpp @@ -1,5 +1,5 @@ /****************************************************************************** - * Copyright 2021, Barcelona Supercomputing Center (BSC), Spain + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). @@ -107,8 +107,43 @@ struct engine { REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ping", void, void, ping, false); - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_input", + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_register_job", ADM_register_job_in_t, + ADM_register_job_out_t, ADM_register_job, true); + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_update_job", + ADM_update_job_in_t, ADM_update_job_out_t, ADM_update_job, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_remove_job", + ADM_remove_job_in_t, ADM_remove_job_out_t, ADM_remove_job, + true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_register_adhoc_storage", + ADM_register_adhoc_storage_in_t, + ADM_register_adhoc_storage_out_t, + ADM_register_adhoc_storage, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_update_adhoc_storage", + ADM_update_adhoc_storage_in_t, + ADM_update_adhoc_storage_out_t, + ADM_update_adhoc_storage, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_remove_adhoc_storage", + ADM_remove_adhoc_storage_in_t, + ADM_remove_adhoc_storage_out_t, + ADM_remove_adhoc_storage, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, + "ADM_deploy_adhoc_storage", + ADM_deploy_adhoc_storage_in_t, + ADM_deploy_adhoc_storage_out_t, + ADM_deploy_adhoc_storage, true); + + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_input", ADM_input_in_t, ADM_input_out_t, ADM_input, true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_output", @@ -193,16 +228,14 @@ struct engine { true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_set_qos_constraints_push", - ADM_set_qos_constraints_push_in_t, - ADM_set_qos_constraints_push_out_t, - ADM_set_qos_constraints_push, true); + "ADM_set_qos_constraints", ADM_set_qos_constraints_in_t, + ADM_set_qos_constraints_out_t, ADM_set_qos_constraints, + true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_set_qos_constraints_pull", - ADM_set_qos_constraints_pull_in_t, - ADM_set_qos_constraints_pull_out_t, - ADM_set_qos_constraints_pull, true); + "ADM_get_qos_constraints", ADM_get_qos_constraints_in_t, + ADM_get_qos_constraints_out_t, ADM_get_qos_constraints, + true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_define_data_operation", diff --git a/src/network/rpcs.cpp b/src/network/rpcs.cpp index 7fb34105c14bcb46b62ef70d4c3d99b43107c3e4..c154f3eb4358a1153a0b81b7c86298e3a16e4927 100644 --- a/src/network/rpcs.cpp +++ b/src/network/rpcs.cpp @@ -39,6 +39,225 @@ ping(hg_handle_t h) { DEFINE_MARGO_RPC_HANDLER(ping); +static void +ADM_register_job(hg_handle_t h) { + + hg_return_t ret; + + ADM_register_job_in_t in; + ADM_register_job_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_register_job()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_register_job); + + +static void +ADM_update_job(hg_handle_t h) { + + hg_return_t ret; + + ADM_update_job_in_t in; + ADM_update_job_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_update_job()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_update_job); + + +static void +ADM_remove_job(hg_handle_t h) { + + hg_return_t ret; + + ADM_remove_job_in_t in; + ADM_remove_job_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_remove_job()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_remove_job); + +static void +ADM_register_adhoc_storage(hg_handle_t h) { + + hg_return_t ret; + + ADM_register_adhoc_storage_in_t in; + ADM_register_adhoc_storage_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_register_adhoc_storage()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage); + +static void +ADM_update_adhoc_storage(hg_handle_t h) { + + hg_return_t ret; + + ADM_update_adhoc_storage_in_t in; + ADM_update_adhoc_storage_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_update_adhoc_storage()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage); + +static void +ADM_remove_adhoc_storage(hg_handle_t h) { + + hg_return_t ret; + + ADM_remove_adhoc_storage_in_t in; + ADM_remove_adhoc_storage_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_remove_adhoc_storage()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage); + +static void +ADM_deploy_adhoc_storage(hg_handle_t h) { + + hg_return_t ret; + + ADM_deploy_adhoc_storage_in_t in; + ADM_deploy_adhoc_storage_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = -1; + + LOGGER_INFO("ADM_deploy_adhoc_storage()"); + + out.ret = 0; + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage); + /** * Specifes the origin location in a storage tier where input is located, as * well as the target location where it should be placed in a different storage @@ -930,6 +1149,8 @@ ADM_get_pending_transfers(hg_handle_t h) { out.ret = 0; out.pending_transfers = "list"; + LOGGER_INFO("ADM_get_pending_transfers()"); + ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -957,45 +1178,42 @@ DEFINE_MARGO_RPC_HANDLER(ADM_get_pending_transfers) * successfully or not. */ static void -ADM_set_qos_constraints_push(hg_handle_t h) { +ADM_set_qos_constraints(hg_handle_t h) { hg_return_t ret; - ADM_set_qos_constraints_push_in_t in; - ADM_set_qos_constraints_push_out_t out; + ADM_set_qos_constraints_in_t in; + ADM_set_qos_constraints_out_t out; margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - const std::string scp(in.scope); - out.ret = -1; out.status = -1; if(in.scope == nullptr) { - LOGGER_ERROR("ADM_set_qos_constraints_push(): invalid scope (nullptr)"); + LOGGER_ERROR("ADM_set_qos_constraints(): invalid scope (nullptr)"); } else if(in.qos_class == nullptr) { - LOGGER_ERROR( - "ADM_set_qos_constraints_push(): invalid qos_class (nullptr)"); + LOGGER_ERROR("ADM_set_qos_constraints(): invalid qos_class (nullptr)"); } else if(in.element_id < 0) { - LOGGER_ERROR( - "ADM_set_qos_constraints_push(): invalid element_id (< 0)"); + LOGGER_ERROR("ADM_set_qos_constraints(): invalid element_id (< 0)"); } else if(in.class_value == nullptr) { LOGGER_ERROR( - "ADM_set_qos_constraints_push(): invalid class_value (nullptr)"); + "ADM_set_qos_constraints(): invalid class_value (nullptr)"); } else { - LOGGER_INFO("ADM_set_qos_constraints_push({}, {}, {}, {})", in.scope, + LOGGER_INFO("ADM_set_qos_constraints({}, {}, {}, {})", in.scope, in.qos_class, in.element_id, in.class_value); + const std::string scp(in.scope); if((scp == "dataset") || (scp == "node") || (scp == "job")) { LOGGER_INFO( - "ADM_set_qos_constraints_push scope value is acceptable ({})", + "ADM_set_qos_constraints scope value is acceptable ({})", in.scope); out.ret = 0; out.status = 0; } else { LOGGER_ERROR( - "ADM_set_qos_constraints_push scope value is not valid. Please use: dataset, node or job"); + "ADM_set_qos_constraints scope value is not valid. Please use: dataset, node or job"); } } @@ -1010,7 +1228,7 @@ ADM_set_qos_constraints_push(hg_handle_t h) { assert(ret == HG_SUCCESS); } -DEFINE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_push) +DEFINE_MARGO_RPC_HANDLER(ADM_set_qos_constraints) /** * Returns a list of QoS constraints defined for an element identified for id. @@ -1024,37 +1242,38 @@ DEFINE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_push) * successfully or not. */ static void -ADM_set_qos_constraints_pull(hg_handle_t h) { +ADM_get_qos_constraints(hg_handle_t h) { hg_return_t ret; - ADM_set_qos_constraints_pull_in_t in; - ADM_set_qos_constraints_pull_out_t out; + ADM_get_qos_constraints_in_t in; + ADM_get_qos_constraints_out_t out; margo_instance_id mid = margo_hg_handle_get_instance(h); ret = margo_get_input(h, &in); assert(ret == HG_SUCCESS); - const std::string scp(in.scope); - out.ret = -1; out.list = nullptr; if(in.scope == nullptr) { - LOGGER_ERROR("ADM_set_qos_constraints_pull(): invalid scope (nullptr)"); + LOGGER_ERROR("ADM_get_qos_constraints(): invalid scope (nullptr)"); } else if(in.element_id < 0) { - LOGGER_ERROR("ADM_set_qos_constraints_pull(): invalid element_id (< 0)"); + LOGGER_ERROR("ADM_get_qos_constraints(): invalid element_id (< 0)"); } else { - LOGGER_INFO("ADM_set_qos_constraints_pull({}, {})", in.scope, in.element_id); + LOGGER_INFO("ADM_get_qos_constraints({}, {})", in.scope, in.element_id); + + const std::string scp(in.scope); + if((scp == "dataset") || (scp == "node") || (scp == "job")) { LOGGER_INFO( - "ADM_set_qos_constraints_pull scope value is acceptable ({})", + "ADM_get_qos_constraints scope value is acceptable ({})", in.scope); out.ret = 0; out.list = "list"; } else { LOGGER_ERROR( - "ADM_set_qos_constraints_pull scope value is not valid. Please use: dataset, node or job "); + "ADM_get_qos_constraints scope value is not valid. Please use: dataset, node or job "); } } @@ -1069,7 +1288,7 @@ ADM_set_qos_constraints_pull(hg_handle_t h) { assert(ret == HG_SUCCESS); } -DEFINE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_pull) +DEFINE_MARGO_RPC_HANDLER(ADM_get_qos_constraints) /** * Defines a new operation, with the code found in path. The code will be @@ -1344,8 +1563,7 @@ ADM_get_statistics(hg_handle_t h) { } else if(in.job_step < 0) { LOGGER_ERROR("ADM_get_statistics(): invalid job_step (< 0)"); } else { - LOGGER_INFO("ADM_get_statistics ({}, {})", - in.job_id, in.job_step); + LOGGER_INFO("ADM_get_statistics ({}, {})", in.job_id, in.job_step); out.ret = 0; out.job_statistics = "job_statistics"; } diff --git a/src/network/rpcs.hpp b/src/network/rpcs.hpp index 170fb249db014271909597943d13b9fb2d31ebb5..0579aae4283967515cbb1a288b11f2d8d5c20071 100644 --- a/src/network/rpcs.hpp +++ b/src/network/rpcs.hpp @@ -38,6 +38,69 @@ /// ping DECLARE_MARGO_RPC_HANDLER(ping); +/// ADM_register_job +MERCURY_GEN_PROC(ADM_register_job_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_register_job_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_register_job); + +/// ADM_update_job +MERCURY_GEN_PROC(ADM_update_job_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_update_job_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_update_job); + +/// ADM_remove_job +MERCURY_GEN_PROC(ADM_remove_job_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_remove_job_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_remove_job); + +/// ADM_register_adhoc_storage +MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage); + +/// ADM_update_adhoc_storage +MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_update_adhoc_storage_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage); + +/// ADM_remove_adhoc_storage +MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_remove_adhoc_storage_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage); + +/// ADM_deploy_adhoc_storage +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, + ((int32_t) (reqs))) + +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, + ((int32_t) (ret))) + +DECLARE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage); + /// ADM_input MERCURY_GEN_PROC(ADM_input_in_t, ((hg_const_string_t) (origin))((hg_const_string_t) (target))) @@ -212,27 +275,27 @@ MERCURY_GEN_PROC(ADM_get_pending_transfers_out_t, DECLARE_MARGO_RPC_HANDLER(ADM_get_pending_transfers); -/// ADM_set_qos_constraints_push +/// ADM_set_qos_constraints MERCURY_GEN_PROC( - ADM_set_qos_constraints_push_in_t, + ADM_set_qos_constraints_in_t, ((hg_const_string_t) (scope))((hg_const_string_t) (qos_class))( (int32_t) (element_id))((hg_const_string_t) (class_value))) -MERCURY_GEN_PROC(ADM_set_qos_constraints_push_out_t, +MERCURY_GEN_PROC(ADM_set_qos_constraints_out_t, ((int32_t) (ret))((int32_t) (status))) -DECLARE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_push); +DECLARE_MARGO_RPC_HANDLER(ADM_set_qos_constraints); -/// ADM_set_qos_constraints_pull +/// ADM_get_qos_constraints -MERCURY_GEN_PROC(ADM_set_qos_constraints_pull_in_t, +MERCURY_GEN_PROC(ADM_get_qos_constraints_in_t, ((hg_const_string_t) (scope))((int32_t) (element_id))) -MERCURY_GEN_PROC(ADM_set_qos_constraints_pull_out_t, +MERCURY_GEN_PROC(ADM_get_qos_constraints_out_t, ((int32_t) (ret))((hg_const_string_t) (list))) -DECLARE_MARGO_RPC_HANDLER(ADM_set_qos_constraints_pull); +DECLARE_MARGO_RPC_HANDLER(ADM_get_qos_constraints); /// ADM_define_data_operation @@ -291,7 +354,6 @@ MERCURY_GEN_PROC(ADM_get_statistics_out_t, DECLARE_MARGO_RPC_HANDLER(ADM_get_statistics); - //} // namespace scord::network::rpc #endif // SCORD_NETWORK_RPCS_HPP