diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 329a0f0ac83e3a5512369f95bd7bffd7ff8a28bf..87629b4b062be318626c217a978931863269eab8 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -46,7 +46,7 @@ rpc: script: - export LD_LIBRARY_PATH=/usr/local/lib:/usr/local/lib64:${CI_PROJECT_DIR}/compiled/lib - compiled/bin/scord -f --force-console & - - build/examples/cxx/ping ofi+tcp://127.0.0.1:52000 + - build/examples/cxx/ADM_ping ofi+tcp://127.0.0.1:52000 - pkill -TERM scord cache: key: $CI_COMMIT_REF_SLUG diff --git a/README.md b/README.md index 343519048ac6a2ed5f0eaa83348b8755206246ef..ab7dd106b6754db646a9e5b5dfc796f10f2da41e 100644 --- a/README.md +++ b/README.md @@ -180,7 +180,7 @@ Now we can use one of the example programs to send a `ping` RPC to `scord`: ```bash cd $HOME/scord/build/examples -./ping ofi+tcp://192.168.0.111:52000 +./ADM_ping ofi+tcp://192.168.0.111:52000 ``` And the server logs should update with an entry similar the following one: diff --git a/examples/cxx/ping.cpp b/examples/cxx/ADM_ping.cpp similarity index 76% rename from examples/cxx/ping.cpp rename to examples/cxx/ADM_ping.cpp index 32b65480183f9f63d841a8c0f7ee9508c72c5bf5..d687e60f11c106be87507880964a163c7705916a 100644 --- a/examples/cxx/ping.cpp +++ b/examples/cxx/ADM_ping.cpp @@ -23,25 +23,25 @@ *****************************************************************************/ #include -#include +#include int main(int argc, char* argv[]) { if(argc != 2) { fmt::print(stderr, "ERROR: no server address provided\n"); - fmt::print(stderr, "Usage: ping \n"); + fmt::print(stderr, "Usage: ADM_ping \n"); exit(EXIT_FAILURE); } - scord::network::rpc_client rpc_client{"tcp"}; - rpc_client.register_rpcs(); + admire::server server{"tcp", argv[1]}; - auto endp = rpc_client.lookup(argv[1]); - - fmt::print(stdout, "Calling PING remote procedure on {}...\n", argv[1]); - - endp.call("ping"); + try { + admire::ping(server); + } catch(const std::exception& e) { + fmt::print(stderr, "FATAL: ADM_ping() failed: {}\n", e.what()); + exit(EXIT_FAILURE); + } - fmt::print(stdout, "PING remote procedure completed successfully\n"); + fmt::print(stdout, "ADM_ping() remote procedure completed successfully\n"); } diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index a884f425762597e699388ebc9eac04f054d91b1e..c1e49ffb8a3c1bc2dc6b58c0486624c6917c76ee 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -23,7 +23,7 @@ ################################################################################ list(APPEND examples_cxx - ping + ADM_ping ADM_register_job ADM_update_job ADM_remove_job ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage ADM_deploy_adhoc_storage diff --git a/src/api/admire.cpp b/src/api/admire.cpp index 79a83e103be951a63943d6fea9545b32ebe26e2b..39113f4082d139d73eb371787b36e4e67f0876ec 100644 --- a/src/api/admire.cpp +++ b/src/api/admire.cpp @@ -54,6 +54,16 @@ init_logger() { namespace admire { +void +ping(const server& srv) { + + if(const auto ec = detail::ping(srv)) { + throw std::runtime_error( + fmt::format("ADM_register_job() error: {}", ADM_strerror(ec))); + } +} + + admire::job register_job(const server& srv, ADM_job_requirements_t reqs) { diff --git a/src/api/admire.h b/src/api/admire.h index a103d4a05aabad5e267f60a7d45169ca4fe707d5..ea992ed9e3c8ab3aa3716d4f1609c673e1e8688b 100644 --- a/src/api/admire.h +++ b/src/api/admire.h @@ -243,6 +243,16 @@ ADM_job_requirements_destroy(ADM_job_requirements_t reqs); /* Public prototypes */ /******************************************************************************/ +/** + * Send an RPC to a server to check if it's online. + * + * @param[in] server The server to which the request is directed + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ +ADM_return_t +ADM_ping(ADM_server_t server); + /** * Register a job and its requirements. * diff --git a/src/api/admire.hpp b/src/api/admire.hpp index 5ab690281fd87281882a379f27e947e5eee79c2a..1ab5f55ef3ebf547ed388c39a5a9fc93e979a420 100644 --- a/src/api/admire.hpp +++ b/src/api/admire.hpp @@ -43,13 +43,14 @@ struct job { job_id m_id; }; +void +ping(const server& srv); admire::job register_job(const server& srv, ADM_job_requirements_t reqs); ADM_return_t -update_job(const server& srv, ADM_job_t job, - ADM_job_requirements_t reqs); +update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs); ADM_return_t remove_job(const server& srv, ADM_job_t job); @@ -60,8 +61,7 @@ register_adhoc_storage(const server& srv, ADM_job_t job, ADM_adhoc_storage_handle_t* adhoc_handle); ADM_return_t -update_adhoc_storage(const server& srv, ADM_job_t job, - ADM_adhoc_context_t ctx, +update_adhoc_storage(const server& srv, ADM_job_t job, ADM_adhoc_context_t ctx, ADM_adhoc_storage_handle_t adhoc_handle); ADM_return_t @@ -83,8 +83,8 @@ set_dataset_information(const server& srv, ADM_job_t job, ADM_dataset_handle_t target, ADM_dataset_info_t info); ADM_return_t -set_io_resources(const server& srv, ADM_job_t job, - ADM_storage_handle_t tier, ADM_storage_resources_t resources); +set_io_resources(const server& srv, ADM_job_t job, ADM_storage_handle_t tier, + ADM_storage_resources_t resources); ADM_return_t get_transfer_priority(const server& srv, ADM_job_t job, @@ -107,9 +107,8 @@ ADM_return_t set_qos_constraints(const server& srv, ADM_job_t job, ADM_limit_t limit); ADM_return_t -get_qos_constraints(const server& srv, ADM_job_t job, - ADM_qos_scope_t scope, ADM_qos_entity_t entity, - ADM_limit_t** limits); +get_qos_constraints(const server& srv, ADM_job_t job, ADM_qos_scope_t scope, + ADM_qos_entity_t entity, ADM_limit_t** limits); ADM_return_t define_data_operation(const server& srv, ADM_job_t job, const char* path, @@ -131,8 +130,7 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job, bool should_stream, va_list args); ADM_return_t -get_statistics(const server& srv, ADM_job_t job, - ADM_job_stats_t** stats); +get_statistics(const server& srv, ADM_job_t job, ADM_job_stats_t** stats); } // namespace admire diff --git a/src/api/c_wrapper.cpp b/src/api/c_wrapper.cpp index a8816f58c57879d5eb094198dee31a1bd25972e6..1a4e03c2dbdb17ab6069076146169abe32efbc77 100644 --- a/src/api/c_wrapper.cpp +++ b/src/api/c_wrapper.cpp @@ -173,6 +173,12 @@ ADM_job_create(uint64_t id) { return adm_job; } +ADM_return_t +ADM_ping(ADM_server_t server) { + const admire::server srv{server->s_protocol, server->s_address}; + return admire::detail::ping(srv); +} + ADM_return_t ADM_register_job(ADM_server_t server, ADM_job_requirements_t reqs, ADM_job_t* job) { diff --git a/src/api/detail/impl.cpp b/src/api/detail/impl.cpp index b14f1fe0eae7a3a0cc79c1a06fc86a2a60600cc4..ba09387a1460c967f9f24309d60aee0b45683067 100644 --- a/src/api/detail/impl.cpp +++ b/src/api/detail/impl.cpp @@ -28,6 +28,21 @@ namespace admire::detail { +admire::error_code +ping(const server& srv) { + + scord::network::rpc_client rpc_client{srv.m_protocol}; + rpc_client.register_rpcs(); + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_ping()"); + endp.call("ADM_ping"); + + LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + tl::expected register_job(const admire::server& srv, ADM_job_requirements_t reqs) { (void) srv; diff --git a/src/api/detail/impl.hpp b/src/api/detail/impl.hpp index 501489c3888660975a2262bf4899e9a2ce51c184..cacc056ff3572e735014d77a21472acd3cd18079 100644 --- a/src/api/detail/impl.hpp +++ b/src/api/detail/impl.hpp @@ -34,6 +34,9 @@ using error_code = ADM_return_t; namespace admire::detail { +admire::error_code +ping(const server& srv); + tl::expected register_job(const server& srv, ADM_job_requirements_t reqs); diff --git a/src/network/engine.hpp b/src/network/engine.hpp index c38a60f45a34b854fd423a4f377f40d8d011792c..3ecd59633d25bb948e2ed414c7cdf937c38131f1 100644 --- a/src/network/engine.hpp +++ b/src/network/engine.hpp @@ -58,7 +58,7 @@ struct margo_context { void register_rpc(const std::string& name, bool requires_response) { - auto id = MARGO_REGISTER(m_mid, name.c_str(), void, void, ping); + auto id = MARGO_REGISTER(m_mid, name.c_str(), void, void, ADM_ping); m_rpc_names.emplace(name, id); if(!requires_response) { @@ -104,8 +104,8 @@ struct engine { register_rpcs() { // register RPCs manually for now - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ping", void, - void, ping, false); + REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_ping", void, + void, ADM_ping, false); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_register_job", ADM_register_job_in_t, @@ -126,22 +126,19 @@ struct engine { ADM_register_adhoc_storage, true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_update_adhoc_storage", - ADM_update_adhoc_storage_in_t, - ADM_update_adhoc_storage_out_t, - ADM_update_adhoc_storage, true); + "ADM_update_adhoc_storage", ADM_update_adhoc_storage_in_t, + ADM_update_adhoc_storage_out_t, ADM_update_adhoc_storage, + true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_remove_adhoc_storage", - ADM_remove_adhoc_storage_in_t, - ADM_remove_adhoc_storage_out_t, - ADM_remove_adhoc_storage, true); + "ADM_remove_adhoc_storage", ADM_remove_adhoc_storage_in_t, + ADM_remove_adhoc_storage_out_t, ADM_remove_adhoc_storage, + true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_deploy_adhoc_storage", - ADM_deploy_adhoc_storage_in_t, - ADM_deploy_adhoc_storage_out_t, - ADM_deploy_adhoc_storage, true); + "ADM_deploy_adhoc_storage", ADM_deploy_adhoc_storage_in_t, + ADM_deploy_adhoc_storage_out_t, ADM_deploy_adhoc_storage, + true); REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_input", ADM_input_in_t, ADM_input_out_t, ADM_input, true); diff --git a/src/network/rpcs.cpp b/src/network/rpcs.cpp index c154f3eb4358a1153a0b81b7c86298e3a16e4927..3f6dd76645eedd720db719821a7975ab92bf3ace 100644 --- a/src/network/rpcs.cpp +++ b/src/network/rpcs.cpp @@ -25,7 +25,7 @@ #include "rpcs.hpp" static void -ping(hg_handle_t h) { +ADM_ping(hg_handle_t h) { hg_return_t ret; @@ -37,7 +37,7 @@ ping(hg_handle_t h) { assert(ret == HG_SUCCESS); } -DEFINE_MARGO_RPC_HANDLER(ping); +DEFINE_MARGO_RPC_HANDLER(ADM_ping); static void ADM_register_job(hg_handle_t h) { diff --git a/src/network/rpcs.hpp b/src/network/rpcs.hpp index 0579aae4283967515cbb1a288b11f2d8d5c20071..23931fb01ff420e0face78368fb591ed34e1127a 100644 --- a/src/network/rpcs.hpp +++ b/src/network/rpcs.hpp @@ -35,69 +35,55 @@ // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { -/// ping -DECLARE_MARGO_RPC_HANDLER(ping); +/// ADM_ping +DECLARE_MARGO_RPC_HANDLER(ADM_ping); /// ADM_register_job -MERCURY_GEN_PROC(ADM_register_job_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_register_job_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_register_job_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_register_job_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_register_job); /// ADM_update_job -MERCURY_GEN_PROC(ADM_update_job_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_update_job_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_update_job_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_update_job_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_update_job); /// ADM_remove_job -MERCURY_GEN_PROC(ADM_remove_job_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_remove_job_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_remove_job_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_remove_job_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_remove_job); /// ADM_register_adhoc_storage -MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_register_adhoc_storage_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_register_adhoc_storage_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage); /// ADM_update_adhoc_storage -MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_update_adhoc_storage_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_update_adhoc_storage_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_update_adhoc_storage_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage); /// ADM_remove_adhoc_storage -MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_remove_adhoc_storage_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_remove_adhoc_storage_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage); /// ADM_deploy_adhoc_storage -MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, - ((int32_t) (reqs))) +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs))) -MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, - ((int32_t) (ret))) +MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_out_t, ((int32_t) (ret))) DECLARE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage);