diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index c1e49ffb8a3c1bc2dc6b58c0486624c6917c76ee..4df33f0d8d0a1d61b2cbaf3c03ca2513158fdd3f 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -27,7 +27,8 @@ list(APPEND examples_cxx ADM_register_job ADM_update_job ADM_remove_job ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage ADM_deploy_adhoc_storage - ADM_in_situ_ops ADM_in_transit_ops ADM_transfer_dataset + # ADM_in_situ_ops ADM_in_transit_ops + ADM_transfer_dataset ADM_set_dataset_information ADM_set_io_resources ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers ADM_set_qos_constraints ADM_get_qos_constraints ADM_define_data_operation ADM_connect_data_operation @@ -37,6 +38,6 @@ foreach (example IN LISTS examples_cxx) add_executable(${example}_cxx) target_sources(${example}_cxx PRIVATE ${example}.cpp) target_link_libraries(${example}_cxx - PUBLIC network_engine fmt::fmt adm_iosched) + PUBLIC common::network::engine fmt::fmt adm_iosched) set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example}) endforeach() diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 84cf47cfb1b02364d6afd224fe0d7f900fafe27d..7c2edf2d5f402c014340e12fa6b728b4268d307f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -22,21 +22,9 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_subdirectory(config) -add_subdirectory(utils) -add_subdirectory(logger) -add_subdirectory(network) +add_subdirectory(common) +add_subdirectory(scord) +add_subdirectory(scord-ctl) -add_executable(scord server.cpp main.cpp) - -target_include_directories( - scord PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} -) - -target_link_libraries( - scord PRIVATE config logger network_engine fmt::fmt Boost::program_options -) - -install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR}) - -add_subdirectory(api) +# public libraries +add_subdirectory(lib) diff --git a/src/api/detail/impl.cpp b/src/api/detail/impl.cpp deleted file mode 100644 index ba09387a1460c967f9f24309d60aee0b45683067..0000000000000000000000000000000000000000 --- a/src/api/detail/impl.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/****************************************************************************** - * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain - * - * This software was partially supported by the EuroHPC-funded project ADMIRE - * (Project ID: 956748, https://www.admire-eurohpc.eu). - * - * This file is part of scord. - * - * scord is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * scord is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with scord. If not, see . - * - * SPDX-License-Identifier: GPL-3.0-or-later - *****************************************************************************/ - -#include -#include -#include "impl.hpp" - -namespace admire::detail { - -admire::error_code -ping(const server& srv) { - - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); - - auto endp = rpc_client.lookup(srv.m_address); - - LOGGER_INFO("ADM_ping()"); - endp.call("ADM_ping"); - - LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS); - return ADM_SUCCESS; -} - -tl::expected -register_job(const admire::server& srv, ADM_job_requirements_t reqs) { - (void) srv; - (void) reqs; - - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); - - auto endp = rpc_client.lookup(srv.m_address); - - LOGGER_INFO("ADM_register_job(...)"); - - ADM_register_job_in_t in{}; - ADM_register_job_out_t out; - - endp.call("ADM_register_job", &in, &out); - - if(out.ret < 0) { - LOGGER_ERROR("ADM_register_job() = {}", out.ret); - return tl::make_unexpected(static_cast(out.ret)); - } - - LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS); - return admire::job{42}; -} - -} // namespace admire::detail diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..f3fbb8b22569acb1ebb1998ba0efddb01dd6bb25 --- /dev/null +++ b/src/common/CMakeLists.txt @@ -0,0 +1,46 @@ +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +add_library(common INTERFACE) +target_include_directories(common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) + +add_subdirectory(utils) +target_include_directories(_utils INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) +add_library(common::utils ALIAS _utils) + +add_subdirectory(config) +target_include_directories(_config INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) +add_library(common::config ALIAS _config) + +add_subdirectory(logger) +target_include_directories(_logger INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) +add_library(common::logger ALIAS _logger) + +add_subdirectory(network) +target_include_directories(_network_engine INTERFACE + ${CMAKE_CURRENT_SOURCE_DIR}) +add_library(common::network::engine ALIAS _network_engine) +target_include_directories(_rpc_server INTERFACE + ${CMAKE_CURRENT_SOURCE_DIR}) +add_library(common::network::rpc_server ALIAS _rpc_server) diff --git a/src/config/CMakeLists.txt b/src/common/config/CMakeLists.txt similarity index 94% rename from src/config/CMakeLists.txt rename to src/common/config/CMakeLists.txt index 872dfb27cafa0d0cb064ea061a5578c5722b9e04..fa464f06961b2d49ad30770b4964386fa2a3a969 100644 --- a/src/config/CMakeLists.txt +++ b/src/common/config/CMakeLists.txt @@ -23,18 +23,18 @@ ################################################################################ # Create a config target for all configuration code -add_library(config STATIC) +add_library(_config STATIC) # Since some of the sources will be auto-generated, we need to search for # includes in ${CMAKE_CURRENT_BINARY_DIR} target_include_directories( - config PRIVATE ${CMAKE_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR} + _config PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR} ) -set_property(TARGET config PROPERTY POSITION_INDEPENDENT_CODE ON) +set_property(TARGET _config PROPERTY POSITION_INDEPENDENT_CODE ON) target_sources( - config + _config PRIVATE config.hpp defaults.hpp ${CMAKE_CURRENT_BINARY_DIR}/defaults.cpp @@ -46,7 +46,7 @@ target_sources( settings.hpp ) -target_link_libraries(config PRIVATE utils file_options::file_options) +target_link_libraries(_config PRIVATE common::utils file_options::file_options) # ############################################################################## # Produce several auto-generated files for 'config' diff --git a/src/config/config.hpp b/src/common/config/config.hpp similarity index 100% rename from src/config/config.hpp rename to src/common/config/config.hpp diff --git a/src/config/defaults.cpp.in b/src/common/config/defaults.cpp.in similarity index 100% rename from src/config/defaults.cpp.in rename to src/common/config/defaults.cpp.in diff --git a/src/config/defaults.hpp b/src/common/config/defaults.hpp similarity index 100% rename from src/config/defaults.hpp rename to src/common/config/defaults.hpp diff --git a/src/config/file_options.yml b/src/common/config/file_options.yml similarity index 100% rename from src/config/file_options.yml rename to src/common/config/file_options.yml diff --git a/src/config/genopts.yml.in b/src/common/config/genopts.yml.in similarity index 100% rename from src/config/genopts.yml.in rename to src/common/config/genopts.yml.in diff --git a/src/config/parsers.cpp b/src/common/config/parsers.cpp similarity index 99% rename from src/config/parsers.cpp rename to src/common/config/parsers.cpp index 0fdc97067b37c6929534ebd0b609cb89baacbc30..a45d853a1aa0eac0508f4c1a3aa09e5aa12ace55 100644 --- a/src/config/parsers.cpp +++ b/src/common/config/parsers.cpp @@ -29,7 +29,7 @@ #include #include #include -#include +#include "parsers.hpp" namespace fs = std::filesystem; diff --git a/src/config/parsers.hpp b/src/common/config/parsers.hpp similarity index 100% rename from src/config/parsers.hpp rename to src/common/config/parsers.hpp diff --git a/src/config/settings.cpp b/src/common/config/settings.cpp similarity index 100% rename from src/config/settings.cpp rename to src/common/config/settings.cpp diff --git a/src/config/settings.hpp b/src/common/config/settings.hpp similarity index 100% rename from src/config/settings.hpp rename to src/common/config/settings.hpp diff --git a/src/logger/CMakeLists.txt b/src/common/logger/CMakeLists.txt similarity index 89% rename from src/logger/CMakeLists.txt rename to src/common/logger/CMakeLists.txt index a56254f66145ba311d2b712169d101a516a9bd3a..6836b1244fbdc87d6692341bdc6c1f6220971e20 100644 --- a/src/logger/CMakeLists.txt +++ b/src/common/logger/CMakeLists.txt @@ -22,10 +22,8 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_library(logger INTERFACE) +add_library(_logger INTERFACE) -target_include_directories(logger INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) +target_sources(_logger INTERFACE logger.hpp) -target_sources(logger INTERFACE logger.hpp) - -target_link_libraries(logger INTERFACE spdlog::spdlog fmt::fmt) +target_link_libraries(_logger INTERFACE spdlog::spdlog fmt::fmt) diff --git a/src/logger/logger.hpp b/src/common/logger/logger.hpp similarity index 100% rename from src/logger/logger.hpp rename to src/common/logger/logger.hpp diff --git a/src/network/CMakeLists.txt b/src/common/network/CMakeLists.txt similarity index 82% rename from src/network/CMakeLists.txt rename to src/common/network/CMakeLists.txt index 38e7e5b68570b232d120dd4792399cf18697f54c..5aa5930d8f503db018442e559298c0468453c441 100644 --- a/src/network/CMakeLists.txt +++ b/src/common/network/CMakeLists.txt @@ -22,16 +22,24 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_library(network_engine STATIC) +add_library(_network_engine STATIC) target_sources( - network_engine + _network_engine INTERFACE engine.hpp - PRIVATE rpcs.hpp rpcs.cpp detail/address.hpp + PRIVATE detail/address.hpp ) -target_include_directories(network_engine INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries( - network_engine PUBLIC logger transport_library Mercury::Mercury + _network_engine PUBLIC common::logger transport_library Mercury::Mercury Argobots::Argobots Margo::Margo ) -set_property(TARGET network_engine PROPERTY POSITION_INDEPENDENT_CODE ON) +set_property(TARGET _network_engine PROPERTY POSITION_INDEPENDENT_CODE ON) + +add_library(_rpc_server STATIC) +target_sources( + _rpc_server + INTERFACE server.hpp + PRIVATE server.cpp +) + +target_link_libraries(_rpc_server PUBLIC common::config _network_engine) diff --git a/src/network/detail/address.hpp b/src/common/network/detail/address.hpp similarity index 100% rename from src/network/detail/address.hpp rename to src/common/network/detail/address.hpp diff --git a/src/common/network/engine.hpp b/src/common/network/engine.hpp new file mode 100644 index 0000000000000000000000000000000000000000..79c36d8ea72a6d05c963571250f7fa93fec945c2 --- /dev/null +++ b/src/common/network/engine.hpp @@ -0,0 +1,282 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_NETWORK_ENGINE_HPP +#define SCORD_NETWORK_ENGINE_HPP + +#include +#include +#include +#include +#include +#include "detail/address.hpp" + + +namespace scord::network { + +namespace detail { + +#define REGISTER_RPC(__engine, __func_name, __in_t, __out_t, __handler, \ + requires_response) \ + { \ + REGISTER_RPC_IMPL((__engine)->m_context->m_mid, \ + (__engine)->m_context->m_rpc_names, __func_name, \ + __in_t, __out_t, __handler, requires_response); \ + } + +#define REGISTER_RPC_IMPL(__mid, __rpc_names, __func_name, __in_t, __out_t, \ + __handler, requires_response) \ + { \ + hg_id_t id = margo_provider_register_name( \ + __mid, __func_name, BOOST_PP_CAT(hg_proc_, __in_t), \ + BOOST_PP_CAT(hg_proc_, __out_t), _handler_for_##__handler, \ + MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL); \ + (__rpc_names).emplace(__func_name, id); \ + if(!(requires_response)) { \ + ::margo_registered_disable_response(__mid, id, HG_TRUE); \ + } \ + } + + +struct margo_context { + + explicit margo_context(::margo_instance_id mid) : m_mid(mid) {} + + margo_instance_id m_mid; + std::unordered_map m_rpc_names; +}; + +} // namespace detail + +// forward declarations +struct endpoint; + +struct engine { + + enum class execution_mode : bool { + server = MARGO_SERVER_MODE, + client = MARGO_CLIENT_MODE + }; + + explicit engine(std::string_view address, + execution_mode = execution_mode::client) { + struct margo_init_info info = MARGO_INIT_INFO_INITIALIZER; + + m_context = std::make_shared( + margo_init_ext(address.data(), MARGO_SERVER_MODE, &info)); + + if(m_context->m_mid == MARGO_INSTANCE_NULL) { + throw std::runtime_error("Margo initialization failed"); + } + } + + ~engine() { + if(m_context) { + ::margo_finalize(m_context->m_mid); + } + } + + void + listen() const { + + /* NOTE: there isn't anything else for the server to do at this point + * except wait for itself to be shut down. The + * margo_wait_for_finalize() call here yields to let Margo drive + * progress until that happens. + */ + ::margo_wait_for_finalize(m_context->m_mid); + } + + void + stop() { + ::margo_finalize(m_context->m_mid); + + // It is not safe to access m_margo_context->m_mid after the + // margo_finalize() call. Make sure that no other threads can do a + // double margo_finalize() (e.g when calling ~engine()) by resetting + // m_margo_context. + m_context.reset(); + } + + endpoint + lookup(const std::string& address) const; + + std::shared_ptr m_context; +}; + +struct endpoint { +private: + // Endpoints should only be created by calling engine::lookup() + friend class engine; + + endpoint(std::shared_ptr context, + std::shared_ptr address) + : m_margo_context(std::move(context)), m_address(std::move(address)) {} + +public: + endpoint(const endpoint& /*other*/) = default; + endpoint& + operator=(const endpoint& /*other*/) = default; + endpoint(endpoint&& /*rhs*/) = default; + endpoint& + operator=(endpoint&& /*rhs*/) = default; + + template + void + call(const std::string& id, Args&&... args) { + + const auto it = m_margo_context->m_rpc_names.find(id); + + if(it == m_margo_context->m_rpc_names.end()) { + throw std::runtime_error( + fmt::format("Unknown remote procedure: {}", id)); + } + + hg_handle_t handle; + auto ret = ::margo_create(m_margo_context->m_mid, + m_address->mercury_address(), it->second, + &handle); + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during endpoint::call(): {}", + ::HG_Error_to_string(ret))); + } + + ret = ::margo_forward(handle, nullptr); + + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during endpoint::call(): {}", + ::HG_Error_to_string(ret))); + } + + ret = ::margo_destroy(handle); + + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during endpoint::call(): {}", + ::HG_Error_to_string(ret))); + } + } + + /** + * Deprecated call, used to support Margo directly + * + **/ + template + [[deprecated("It should be eventually replaced by a generic call")]] void + call(const std::string& id, T1 input = nullptr, T2 output = nullptr) { + + const auto it = m_margo_context->m_rpc_names.find(id); + + if(it == m_margo_context->m_rpc_names.end()) { + throw std::runtime_error( + fmt::format("Unknown remote procedure: {}", id)); + } + + hg_handle_t handle; + auto ret = ::margo_create(m_margo_context->m_mid, + m_address->mercury_address(), it->second, + &handle); + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during endpoint::call(): {}", + ::HG_Error_to_string(ret))); + } + + ret = ::margo_forward(handle, input); + + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during endpoint::call(): {}", + ::HG_Error_to_string(ret))); + } + + if(output != nullptr) { + ret = ::margo_get_output(handle, output); + } + + + ret = ::margo_destroy(handle); + + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during endpoint::call(): {}", + ::HG_Error_to_string(ret))); + } + } + +private: + std::shared_ptr m_margo_context; + std::shared_ptr m_address; +}; + +// now that we have the complete definition of engine and endpoint, we can +// finally define engine::lookup completely +inline endpoint +engine::lookup(const std::string& address) const { + + hg_addr_t svr_addr; + auto ret = + ::margo_addr_lookup(m_context->m_mid, address.c_str(), &svr_addr); + if(ret != HG_SUCCESS) { + throw std::runtime_error( + fmt::format("Error during engine::lookup(): {}", + ::HG_Error_to_string(ret))); + } + + return {m_context, std::make_shared( + ::margo_get_class(m_context->m_mid), svr_addr)}; +} + + +struct rpc_client : engine { + explicit rpc_client(const std::string& protocol) + : engine(protocol, execution_mode::client) {} + + template + rpc_client(const std::string& protocol, + Callback&& rpc_registration_callback) + : engine(protocol, execution_mode::client) { + rpc_registration_callback(this); + } +}; + +struct rpc_acceptor : engine { + + static std::string + format_address(const std::string& protocol, const std::string& address, + int port) { + return fmt::format("{}://{}:{}", protocol, address, port); + } + + rpc_acceptor(const std::string& protocol, const std::string& bind_address, + int port) + : engine(format_address(protocol, bind_address, port)) {} +}; + + +} // namespace scord::network + +#endif // SCORD_NETWORK_ENGINE_HPP diff --git a/src/server.cpp b/src/common/network/server.cpp similarity index 98% rename from src/server.cpp rename to src/common/network/server.cpp index c8899f6dc82a57665681e82a096fdea2fcd05707..1818650af3da629ec3073d616fc9eaf395fe936f 100644 --- a/src/server.cpp +++ b/src/common/network/server.cpp @@ -33,11 +33,11 @@ #include #include -#include #include #include -#include #include +#include "engine.hpp" +#include "server.hpp" namespace scord { @@ -264,8 +264,10 @@ server::install_rpc_handlers() { m_settings->transport_protocol(), m_settings->bind_address(), m_settings->remote_port()); - m_network_engine->register_rpcs(); -} // namespace scord + if(m_rpc_registration_callback) { + m_rpc_registration_callback(m_network_engine); + } +} void server::check_configuration() { diff --git a/src/server.hpp b/src/common/network/server.hpp similarity index 76% rename from src/server.hpp rename to src/common/network/server.hpp index 7c767323af01f0b824cf9599448bbf2c0c5f74dd..930cc84727e9b086feed88a4fa69270b1bc7eb40 100644 --- a/src/server.hpp +++ b/src/common/network/server.hpp @@ -27,7 +27,7 @@ #include #include -#include +#include "engine.hpp" namespace scord { @@ -46,6 +46,15 @@ public: ~server(); void configure(const config::settings& settings); + + template + void + configure(const config::settings& settings, + Callback rpc_registration_callback) { + configure(settings); + m_rpc_registration_callback = rpc_registration_callback; + } + config::settings get_configuration() const; int @@ -57,6 +66,18 @@ public: void teardown_and_exit(); + + template + void + install_rpc_handlers(Callable fun) { + + install_rpc_handlers(); + + // FIXME: improve network_engine so that we don't need to rely on + // calling a lambda here to register RPCs + fun(m_network_engine); + } + private: int daemonize(); @@ -82,6 +103,8 @@ private: std::unique_ptr m_settings; std::unique_ptr m_network_engine; std::unique_ptr m_signal_listener; + std::function&)> + m_rpc_registration_callback; }; diff --git a/src/utils/CMakeLists.txt b/src/common/utils/CMakeLists.txt similarity index 89% rename from src/utils/CMakeLists.txt rename to src/common/utils/CMakeLists.txt index 37360b29fe5f315157715be5b7a55569e8976cfa..576d08a1e36108f10cdcbf95288bd2b486ccb203 100644 --- a/src/utils/CMakeLists.txt +++ b/src/common/utils/CMakeLists.txt @@ -22,9 +22,8 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ -add_library(utils STATIC) +add_library(_utils STATIC) -target_include_directories(utils PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) -set_property(TARGET utils PROPERTY POSITION_INDEPENDENT_CODE ON) +set_property(TARGET _utils PROPERTY POSITION_INDEPENDENT_CODE ON) -target_sources(utils PRIVATE utils.hpp utils.cpp signal_listener.hpp) +target_sources(_utils PRIVATE utils.hpp utils.cpp signal_listener.hpp) diff --git a/src/utils/signal_listener.hpp b/src/common/utils/signal_listener.hpp similarity index 100% rename from src/utils/signal_listener.hpp rename to src/common/utils/signal_listener.hpp diff --git a/src/utils/utils.cpp b/src/common/utils/utils.cpp similarity index 99% rename from src/utils/utils.cpp rename to src/common/utils/utils.cpp index 05284e19fe5a8fe9fb6dfda57a154539546e867c..59d992f820435de8bb75ff730a0b45eaad85266b 100644 --- a/src/utils/utils.cpp +++ b/src/common/utils/utils.cpp @@ -31,7 +31,7 @@ #include #include -#include +#include "utils.hpp" namespace scord::utils { diff --git a/src/utils/utils.hpp b/src/common/utils/utils.hpp similarity index 100% rename from src/utils/utils.hpp rename to src/common/utils/utils.hpp diff --git a/src/api/CMakeLists.txt b/src/lib/CMakeLists.txt similarity index 80% rename from src/api/CMakeLists.txt rename to src/lib/CMakeLists.txt index e6a5eddb86126c00280e421cb1be2dc76b8f453c..94a90638ef9edbb8836414f768bf5f14aae7452e 100644 --- a/src/api/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -22,6 +22,16 @@ # SPDX-License-Identifier: GPL-3.0-or-later # ################################################################################ +# target for the public rpcs defined by the API that need to be shared by scord +add_library(api_rpcs STATIC) +target_sources(api_rpcs + PRIVATE rpcs/public.cpp rpcs/public.hpp) + +target_include_directories(api_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/rpcs) +target_link_libraries(api_rpcs PUBLIC common::logger Margo::Margo) +set_property(TARGET api_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON) + +# the client library implementing the actual API add_library(adm_iosched SHARED) target_sources(adm_iosched @@ -30,7 +40,8 @@ target_sources(adm_iosched target_include_directories(adm_iosched PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) -target_link_libraries(adm_iosched PRIVATE network_engine PUBLIC tl::expected) +target_link_libraries(adm_iosched PRIVATE common::network::engine api_rpcs PUBLIC + tl::expected) install( TARGETS adm_iosched diff --git a/src/api/admire.cpp b/src/lib/admire.cpp similarity index 73% rename from src/api/admire.cpp rename to src/lib/admire.cpp index 39113f4082d139d73eb371787b36e4e67f0876ec..ebc64a8ae54289ab172dbd9ba78b89b68025ca5a 100644 --- a/src/api/admire.cpp +++ b/src/lib/admire.cpp @@ -23,8 +23,9 @@ *****************************************************************************/ #include -#include -#include +#include +#include +#include "rpcs/public.hpp" #include "detail/impl.hpp" @@ -48,6 +49,105 @@ init_logger() { scord::logger::create_global_logger("libadm_iosched", "console color"); } +void +rpc_registration_cb(scord::network::rpc_client* client) { + + REGISTER_RPC(client, "ADM_ping", void, void, ADM_ping, false); + REGISTER_RPC(client, "ADM_input", ADM_input_in_t, ADM_input_out_t, + ADM_input, true); + + + REGISTER_RPC(client, "ADM_output", ADM_output_in_t, ADM_output_out_t, + ADM_output, true); + + REGISTER_RPC(client, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t, + ADM_inout, true); + + REGISTER_RPC(client, "ADM_adhoc_context", ADM_adhoc_context_in_t, + ADM_adhoc_context_out_t, ADM_adhoc_context, true); + + REGISTER_RPC(client, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t, + ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true); + + REGISTER_RPC(client, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t, + ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true); + + REGISTER_RPC(client, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t, + ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true); + + REGISTER_RPC(client, "ADM_adhoc_access", ADM_adhoc_access_in_t, + ADM_adhoc_access_out_t, ADM_adhoc_access, true); + + REGISTER_RPC(client, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t, + ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true); + + REGISTER_RPC(client, "ADM_adhoc_background_flush", + ADM_adhoc_background_flush_in_t, + ADM_adhoc_background_flush_out_t, ADM_adhoc_background_flush, + true); + + REGISTER_RPC(client, "ADM_in_situ_ops", ADM_in_situ_ops_in_t, + ADM_in_situ_ops_out_t, ADM_in_situ_ops, true); + + REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, + ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); + + REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, + ADM_transfer_dataset_out_t, ADM_transfer_dataset, true); + + REGISTER_RPC(client, "ADM_set_dataset_information", + ADM_set_dataset_information_in_t, + ADM_set_dataset_information_out_t, ADM_set_dataset_information, + true); + + REGISTER_RPC(client, "ADM_set_io_resources", ADM_set_io_resources_in_t, + ADM_set_io_resources_out_t, ADM_set_io_resources, true); + + REGISTER_RPC( + client, "ADM_get_transfer_priority", ADM_get_transfer_priority_in_t, + ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, true); + + REGISTER_RPC( + client, "ADM_set_transfer_priority", ADM_set_transfer_priority_in_t, + ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, true); + + REGISTER_RPC(client, "ADM_cancel_transfer", ADM_cancel_transfer_in_t, + ADM_cancel_transfer_out_t, ADM_cancel_transfer, true); + + REGISTER_RPC( + client, "ADM_get_pending_transfers", ADM_get_pending_transfers_in_t, + ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, true); + + REGISTER_RPC(client, "ADM_set_qos_constraints", + ADM_set_qos_constraints_in_t, ADM_set_qos_constraints_out_t, + ADM_set_qos_constraints, true); + + REGISTER_RPC(client, "ADM_get_qos_constraints", + ADM_get_qos_constraints_in_t, ADM_get_qos_constraints_out_t, + ADM_get_qos_constraints, true); + + REGISTER_RPC( + client, "ADM_define_data_operation", ADM_define_data_operation_in_t, + ADM_define_data_operation_out_t, ADM_define_data_operation, true); + + REGISTER_RPC(client, "ADM_connect_data_operation", + ADM_connect_data_operation_in_t, + ADM_connect_data_operation_out_t, ADM_connect_data_operation, + true); + + REGISTER_RPC(client, "ADM_finalize_data_operation", + ADM_finalize_data_operation_in_t, + ADM_finalize_data_operation_out_t, ADM_finalize_data_operation, + true); + + REGISTER_RPC(client, "ADM_link_transfer_to_data_operation", + ADM_link_transfer_to_data_operation_in_t, + ADM_link_transfer_to_data_operation_out_t, + ADM_link_transfer_to_data_operation, true); + + REGISTER_RPC(client, "ADM_get_statistics", ADM_get_statistics_in_t, + ADM_get_statistics_out_t, ADM_get_statistics, true); +} } // namespace @@ -83,8 +183,7 @@ update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs) { (void) job; (void) reqs; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -109,8 +208,7 @@ remove_job(const server& srv, ADM_job_t job) { (void) srv; (void) job; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -139,8 +237,7 @@ register_adhoc_storage(const server& srv, ADM_job_t job, (void) ctx; (void) adhoc_handle; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -168,8 +265,7 @@ update_adhoc_storage(const server& srv, ADM_job_t job, ADM_adhoc_context_t ctx, (void) ctx; (void) adhoc_handle; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -196,8 +292,7 @@ remove_adhoc_storage(const server& srv, ADM_job_t job, (void) job; (void) adhoc_handle; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -224,8 +319,7 @@ deploy_adhoc_storage(const server& srv, ADM_job_t job, (void) job; (void) adhoc_handle; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -258,8 +352,7 @@ transfer_dataset(const server& srv, ADM_job_t job, (void) mapping; (void) tx_handle; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -287,8 +380,7 @@ set_dataset_information(const server& srv, ADM_job_t job, (void) target; (void) info; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -316,8 +408,7 @@ set_io_resources(const server& srv, ADM_job_t job, ADM_storage_handle_t tier, (void) tier; (void) resources; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -346,8 +437,7 @@ get_transfer_priority(const server& srv, ADM_job_t job, (void) tx_handle; (void) priority; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -375,8 +465,7 @@ set_transfer_priority(const server& srv, ADM_job_t job, (void) tx_handle; (void) incr; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -403,8 +492,7 @@ cancel_transfer(const server& srv, ADM_job_t job, (void) job; (void) tx_handle; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -432,8 +520,7 @@ get_pending_transfers(const server& srv, ADM_job_t job, (void) job; (void) pending_transfers; - scord::network::rpc_client rpc_client{srv.m_protocol}; - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -460,9 +547,7 @@ set_qos_constraints(const server& srv, ADM_job_t job, ADM_limit_t limit) { (void) job; (void) limit; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -492,9 +577,7 @@ get_qos_constraints(const server& srv, ADM_job_t job, ADM_qos_scope_t scope, (void) entity; (void) limits; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -524,9 +607,7 @@ define_data_operation(const server& srv, ADM_job_t job, const char* path, (void) op; (void) args; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -558,9 +639,7 @@ connect_data_operation(const server& srv, ADM_job_t job, (void) should_stream; (void) args; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -590,9 +669,7 @@ finalize_data_operation(const server& srv, ADM_job_t job, (void) op; (void) status; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -623,9 +700,7 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job, (void) should_stream; (void) args; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); @@ -652,9 +727,7 @@ get_statistics(const server& srv, ADM_job_t job, ADM_job_stats_t** stats) { (void) job; (void) stats; - scord::network::rpc_client rpc_client{srv.m_protocol}; - - rpc_client.register_rpcs(); + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; auto endp = rpc_client.lookup(srv.m_address); diff --git a/src/api/admire.h b/src/lib/admire.h similarity index 100% rename from src/api/admire.h rename to src/lib/admire.h diff --git a/src/api/admire.hpp b/src/lib/admire.hpp similarity index 100% rename from src/api/admire.hpp rename to src/lib/admire.hpp diff --git a/src/api/c_wrapper.cpp b/src/lib/c_wrapper.cpp similarity index 99% rename from src/api/c_wrapper.cpp rename to src/lib/c_wrapper.cpp index 1a4e03c2dbdb17ab6069076146169abe32efbc77..1ffffe0678c1ea3bc599cb95eb444e08e7504b73 100644 --- a/src/api/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -24,7 +24,7 @@ #include #include -#include +#include #include "detail/impl.hpp" struct adm_server { diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp new file mode 100644 index 0000000000000000000000000000000000000000..116d2112ab9190a50f0f212d9e7030c2ec15dcd1 --- /dev/null +++ b/src/lib/detail/impl.cpp @@ -0,0 +1,171 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "rpcs/public.hpp" +#include "impl.hpp" + +void +rpc_registration_cb(scord::network::rpc_client* client) { + + REGISTER_RPC(client, "ADM_ping", void, void, ADM_ping, false); + REGISTER_RPC(client, "ADM_input", ADM_input_in_t, ADM_input_out_t, + ADM_input, true); + + + REGISTER_RPC(client, "ADM_output", ADM_output_in_t, ADM_output_out_t, + ADM_output, true); + + REGISTER_RPC(client, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t, + ADM_inout, true); + + REGISTER_RPC(client, "ADM_adhoc_context", ADM_adhoc_context_in_t, + ADM_adhoc_context_out_t, ADM_adhoc_context, true); + + REGISTER_RPC(client, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t, + ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true); + + REGISTER_RPC(client, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t, + ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true); + + REGISTER_RPC(client, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t, + ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true); + + REGISTER_RPC(client, "ADM_adhoc_access", ADM_adhoc_access_in_t, + ADM_adhoc_access_out_t, ADM_adhoc_access, true); + + REGISTER_RPC(client, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t, + ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true); + + REGISTER_RPC(client, "ADM_adhoc_background_flush", + ADM_adhoc_background_flush_in_t, + ADM_adhoc_background_flush_out_t, ADM_adhoc_background_flush, + true); + + REGISTER_RPC(client, "ADM_in_situ_ops", ADM_in_situ_ops_in_t, + ADM_in_situ_ops_out_t, ADM_in_situ_ops, true); + + REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, + ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); + + REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, + ADM_transfer_dataset_out_t, ADM_transfer_dataset, true); + + REGISTER_RPC(client, "ADM_set_dataset_information", + ADM_set_dataset_information_in_t, + ADM_set_dataset_information_out_t, ADM_set_dataset_information, + true); + + REGISTER_RPC(client, "ADM_set_io_resources", ADM_set_io_resources_in_t, + ADM_set_io_resources_out_t, ADM_set_io_resources, true); + + REGISTER_RPC( + client, "ADM_get_transfer_priority", ADM_get_transfer_priority_in_t, + ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, true); + + REGISTER_RPC( + client, "ADM_set_transfer_priority", ADM_set_transfer_priority_in_t, + ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, true); + + REGISTER_RPC(client, "ADM_cancel_transfer", ADM_cancel_transfer_in_t, + ADM_cancel_transfer_out_t, ADM_cancel_transfer, true); + + REGISTER_RPC( + client, "ADM_get_pending_transfers", ADM_get_pending_transfers_in_t, + ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, true); + + REGISTER_RPC(client, "ADM_set_qos_constraints", + ADM_set_qos_constraints_in_t, ADM_set_qos_constraints_out_t, + ADM_set_qos_constraints, true); + + REGISTER_RPC(client, "ADM_get_qos_constraints", + ADM_get_qos_constraints_in_t, ADM_get_qos_constraints_out_t, + ADM_get_qos_constraints, true); + + REGISTER_RPC( + client, "ADM_define_data_operation", ADM_define_data_operation_in_t, + ADM_define_data_operation_out_t, ADM_define_data_operation, true); + + REGISTER_RPC(client, "ADM_connect_data_operation", + ADM_connect_data_operation_in_t, + ADM_connect_data_operation_out_t, ADM_connect_data_operation, + true); + + REGISTER_RPC(client, "ADM_finalize_data_operation", + ADM_finalize_data_operation_in_t, + ADM_finalize_data_operation_out_t, ADM_finalize_data_operation, + true); + + REGISTER_RPC(client, "ADM_link_transfer_to_data_operation", + ADM_link_transfer_to_data_operation_in_t, + ADM_link_transfer_to_data_operation_out_t, + ADM_link_transfer_to_data_operation, true); + + REGISTER_RPC(client, "ADM_get_statistics", ADM_get_statistics_in_t, + ADM_get_statistics_out_t, ADM_get_statistics, true); +} + +namespace admire::detail { + +admire::error_code +ping(const server& srv) { + + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_ping()"); + endp.call("ADM_ping"); + + LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS); + return ADM_SUCCESS; +} + +tl::expected +register_job(const admire::server& srv, ADM_job_requirements_t reqs) { + (void) srv; + (void) reqs; + + scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb}; + + auto endp = rpc_client.lookup(srv.m_address); + + LOGGER_INFO("ADM_register_job(...)"); + + ADM_register_job_in_t in{}; + ADM_register_job_out_t out; + + endp.call("ADM_register_job", &in, &out); + + if(out.ret < 0) { + LOGGER_ERROR("ADM_register_job() = {}", out.ret); + return tl::make_unexpected(static_cast(out.ret)); + } + + LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS); + return admire::job{42}; +} + +} // namespace admire::detail diff --git a/src/api/detail/impl.hpp b/src/lib/detail/impl.hpp similarity index 100% rename from src/api/detail/impl.hpp rename to src/lib/detail/impl.hpp diff --git a/src/api/errors.c b/src/lib/errors.c similarity index 100% rename from src/api/errors.c rename to src/lib/errors.c diff --git a/src/network/rpcs.cpp b/src/lib/rpcs/public.cpp similarity index 99% rename from src/network/rpcs.cpp rename to src/lib/rpcs/public.cpp index 3f6dd76645eedd720db719821a7975ab92bf3ace..83dbfae8eabcfef4800ea1387a8ec6189ac421ce 100644 --- a/src/network/rpcs.cpp +++ b/src/lib/rpcs/public.cpp @@ -22,7 +22,7 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#include "rpcs.hpp" +#include "public.hpp" static void ADM_ping(hg_handle_t h) { diff --git a/src/network/rpcs.hpp b/src/lib/rpcs/public.hpp similarity index 98% rename from src/network/rpcs.hpp rename to src/lib/rpcs/public.hpp index 23931fb01ff420e0face78368fb591ed34e1127a..6b9f9d4f47baa243d91c8e4eae522430ec7d6e7b 100644 --- a/src/network/rpcs.hpp +++ b/src/lib/rpcs/public.hpp @@ -22,15 +22,15 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ -#ifndef SCORD_NETWORK_RPCS_HPP -#define SCORD_NETWORK_RPCS_HPP +// clang-format off +#ifndef SCORD_RPCS_PUBLIC_HPP +#define SCORD_RPCS_PUBLIC_HPP #include -#include #include #include #include - +#include // FIXME: cannot be in a namespace due to Margo limitations // namespace scord::network::rpc { @@ -342,4 +342,5 @@ DECLARE_MARGO_RPC_HANDLER(ADM_get_statistics); //} // namespace scord::network::rpc -#endif // SCORD_NETWORK_RPCS_HPP +#endif // SCORD_RPCS_PUBLIC_HPP +// clang-format on diff --git a/src/network/engine.hpp b/src/network/engine.hpp deleted file mode 100644 index 3ecd59633d25bb948e2ed414c7cdf937c38131f1..0000000000000000000000000000000000000000 --- a/src/network/engine.hpp +++ /dev/null @@ -1,441 +0,0 @@ -/****************************************************************************** - * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain - * - * This software was partially supported by the EuroHPC-funded project ADMIRE - * (Project ID: 956748, https://www.admire-eurohpc.eu). - * - * This file is part of scord. - * - * scord is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * scord is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with scord. If not, see . - * - * SPDX-License-Identifier: GPL-3.0-or-later - *****************************************************************************/ - -#ifndef SCORD_NETWORK_ENGINE_HPP -#define SCORD_NETWORK_ENGINE_HPP - -#include -#include -#include -#include -#include -#include -#include "rpcs.hpp" - - -namespace scord::network { - -namespace detail { - -#define REGISTER_RPC(__mid, __m_rpc_names, __func_name, __in_t, __out_t, \ - __handler, requires_response) \ - { \ - hg_id_t id = margo_provider_register_name( \ - __mid, __func_name, BOOST_PP_CAT(hg_proc_, __in_t), \ - BOOST_PP_CAT(hg_proc_, __out_t), _handler_for_##__handler, \ - MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL); \ - __m_rpc_names.emplace(__func_name, id); \ - if(!requires_response) { \ - ::margo_registered_disable_response(__mid, id, HG_TRUE); \ - } \ - } - - -struct margo_context { - - explicit margo_context(::margo_instance_id mid) : m_mid(mid) {} - - void - register_rpc(const std::string& name, bool requires_response) { - auto id = MARGO_REGISTER(m_mid, name.c_str(), void, void, ADM_ping); - m_rpc_names.emplace(name, id); - - if(!requires_response) { - ::margo_registered_disable_response(m_mid, id, HG_TRUE); - } - } - - margo_instance_id m_mid; - std::unordered_map m_rpc_names; -}; - -} // namespace detail - -// forward declarations -struct endpoint; - -struct engine { - - enum class execution_mode : bool { - server = MARGO_SERVER_MODE, - client = MARGO_CLIENT_MODE - }; - - explicit engine(std::string_view address, - execution_mode = execution_mode::client) { - struct margo_init_info info = MARGO_INIT_INFO_INITIALIZER; - - m_context = std::make_shared( - margo_init_ext(address.data(), MARGO_SERVER_MODE, &info)); - - if(m_context->m_mid == MARGO_INSTANCE_NULL) { - throw std::runtime_error("Margo initialization failed"); - } - } - - ~engine() { - if(m_context) { - ::margo_finalize(m_context->m_mid); - } - } - - void - register_rpcs() { - - // register RPCs manually for now - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_ping", void, - void, ADM_ping, false); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_register_job", ADM_register_job_in_t, - ADM_register_job_out_t, ADM_register_job, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_update_job", - ADM_update_job_in_t, ADM_update_job_out_t, ADM_update_job, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_remove_job", - ADM_remove_job_in_t, ADM_remove_job_out_t, ADM_remove_job, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_register_adhoc_storage", - ADM_register_adhoc_storage_in_t, - ADM_register_adhoc_storage_out_t, - ADM_register_adhoc_storage, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_update_adhoc_storage", ADM_update_adhoc_storage_in_t, - ADM_update_adhoc_storage_out_t, ADM_update_adhoc_storage, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_remove_adhoc_storage", ADM_remove_adhoc_storage_in_t, - ADM_remove_adhoc_storage_out_t, ADM_remove_adhoc_storage, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_deploy_adhoc_storage", ADM_deploy_adhoc_storage_in_t, - ADM_deploy_adhoc_storage_out_t, ADM_deploy_adhoc_storage, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_input", - ADM_input_in_t, ADM_input_out_t, ADM_input, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_output", - ADM_output_in_t, ADM_output_out_t, ADM_output, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_inout", - ADM_inout_in_t, ADM_inout_out_t, ADM_inout, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_context", ADM_adhoc_context_in_t, - ADM_adhoc_context_out_t, ADM_adhoc_context, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t, - ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t, - ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t, - ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_access", ADM_adhoc_access_in_t, - ADM_adhoc_access_out_t, ADM_adhoc_access, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t, - ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_adhoc_background_flush", - ADM_adhoc_background_flush_in_t, - ADM_adhoc_background_flush_out_t, - ADM_adhoc_background_flush, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_in_situ_ops", ADM_in_situ_ops_in_t, - ADM_in_situ_ops_out_t, ADM_in_situ_ops, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_in_transit_ops", ADM_in_transit_ops_in_t, - ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_transfer_dataset", ADM_transfer_dataset_in_t, - ADM_transfer_dataset_out_t, ADM_transfer_dataset, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_set_dataset_information", - ADM_set_dataset_information_in_t, - ADM_set_dataset_information_out_t, - ADM_set_dataset_information, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_set_io_resources", ADM_set_io_resources_in_t, - ADM_set_io_resources_out_t, ADM_set_io_resources, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_get_transfer_priority", - ADM_get_transfer_priority_in_t, - ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_set_transfer_priority", - ADM_set_transfer_priority_in_t, - ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_cancel_transfer", ADM_cancel_transfer_in_t, - ADM_cancel_transfer_out_t, ADM_cancel_transfer, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_get_pending_transfers", - ADM_get_pending_transfers_in_t, - ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_set_qos_constraints", ADM_set_qos_constraints_in_t, - ADM_set_qos_constraints_out_t, ADM_set_qos_constraints, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_get_qos_constraints", ADM_get_qos_constraints_in_t, - ADM_get_qos_constraints_out_t, ADM_get_qos_constraints, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_define_data_operation", - ADM_define_data_operation_in_t, - ADM_define_data_operation_out_t, ADM_define_data_operation, - true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_connect_data_operation", - ADM_connect_data_operation_in_t, - ADM_connect_data_operation_out_t, - ADM_connect_data_operation, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_finalize_data_operation", - ADM_finalize_data_operation_in_t, - ADM_finalize_data_operation_out_t, - ADM_finalize_data_operation, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_link_transfer_to_data_operation", - ADM_link_transfer_to_data_operation_in_t, - ADM_link_transfer_to_data_operation_out_t, - ADM_link_transfer_to_data_operation, true); - - REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, - "ADM_get_statistics", ADM_get_statistics_in_t, - ADM_get_statistics_out_t, ADM_get_statistics, true); - } - - void - listen() const { - - /* NOTE: there isn't anything else for the server to do at this point - * except wait for itself to be shut down. The - * margo_wait_for_finalize() call here yields to let Margo drive - * progress until that happens. - */ - ::margo_wait_for_finalize(m_context->m_mid); - } - - void - stop() { - ::margo_finalize(m_context->m_mid); - - // It is not safe to access m_margo_context->m_mid after the - // margo_finalize() call. Make sure that no other threads can do a - // double margo_finalize() (e.g when calling ~engine()) by resetting - // m_margo_context. - m_context.reset(); - } - - endpoint - lookup(const std::string& address) const; - - std::shared_ptr m_context; -}; - -struct endpoint { -private: - // Endpoints should only be created by calling engine::lookup() - friend class engine; - - endpoint(std::shared_ptr context, - std::shared_ptr address) - : m_margo_context(std::move(context)), m_address(std::move(address)) {} - -public: - endpoint(const endpoint& /*other*/) = default; - endpoint& - operator=(const endpoint& /*other*/) = default; - endpoint(endpoint&& /*rhs*/) = default; - endpoint& - operator=(endpoint&& /*rhs*/) = default; - - template - void - call(const std::string& id, Args&&... args) { - - const auto it = m_margo_context->m_rpc_names.find(id); - - if(it == m_margo_context->m_rpc_names.end()) { - throw std::runtime_error( - fmt::format("Unknown remote procedure: {}", id)); - } - - hg_handle_t handle; - auto ret = ::margo_create(m_margo_context->m_mid, - m_address->mercury_address(), it->second, - &handle); - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during endpoint::call(): {}", - ::HG_Error_to_string(ret))); - } - - ret = ::margo_forward(handle, nullptr); - - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during endpoint::call(): {}", - ::HG_Error_to_string(ret))); - } - - ret = ::margo_destroy(handle); - - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during endpoint::call(): {}", - ::HG_Error_to_string(ret))); - } - } - - /** - * Deprecated call, used to support Margo directly - * - **/ - template - [[deprecated("It should be eventually replaced by a generic call")]] void - call(const std::string& id, T1 input = nullptr, T2 output = nullptr) { - - const auto it = m_margo_context->m_rpc_names.find(id); - - if(it == m_margo_context->m_rpc_names.end()) { - throw std::runtime_error( - fmt::format("Unknown remote procedure: {}", id)); - } - - hg_handle_t handle; - auto ret = ::margo_create(m_margo_context->m_mid, - m_address->mercury_address(), it->second, - &handle); - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during endpoint::call(): {}", - ::HG_Error_to_string(ret))); - } - - ret = ::margo_forward(handle, input); - - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during endpoint::call(): {}", - ::HG_Error_to_string(ret))); - } - - if(output != nullptr) { - ret = ::margo_get_output(handle, output); - } - - - ret = ::margo_destroy(handle); - - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during endpoint::call(): {}", - ::HG_Error_to_string(ret))); - } - } - -private: - std::shared_ptr m_margo_context; - std::shared_ptr m_address; -}; - -// now that we have the complete definition of engine and endpoint, we can -// finally define engine::lookup completely -inline endpoint -engine::lookup(const std::string& address) const { - - hg_addr_t svr_addr; - auto ret = - ::margo_addr_lookup(m_context->m_mid, address.c_str(), &svr_addr); - if(ret != HG_SUCCESS) { - throw std::runtime_error( - fmt::format("Error during engine::lookup(): {}", - ::HG_Error_to_string(ret))); - } - - return {m_context, std::make_shared( - ::margo_get_class(m_context->m_mid), svr_addr)}; -} - - -struct rpc_client : engine { - explicit rpc_client(const std::string& protocol) - : engine(protocol, execution_mode::client) {} -}; - -struct rpc_acceptor : engine { - - static std::string - format_address(const std::string& protocol, const std::string& address, - int port) { - return fmt::format("{}://{}:{}", protocol, address, port); - } - - rpc_acceptor(const std::string& protocol, const std::string& bind_address, - int port) - : engine(format_address(protocol, bind_address, port)) {} -}; - - -} // namespace scord::network - -#endif // SCORD_NETWORK_ENGINE_HPP diff --git a/src/scord-ctl/CMakeLists.txt b/src/scord-ctl/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..26dfc86b2fac4342ebd342c0736c427b141b8e8d --- /dev/null +++ b/src/scord-ctl/CMakeLists.txt @@ -0,0 +1,42 @@ +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +# import rpc definitions for scord-ctl +add_subdirectory(rpcs) + +# scord-ctl daemon +add_executable(scord-ctl scord-ctl.cpp) + +target_include_directories( + scord-ctl + PUBLIC ${CMAKE_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_BINARY_DIR}/src ${CMAKE_CURRENT_BINARY_DIR} +) + +target_link_libraries( + scord-ctl PRIVATE common::config common::logger common::network::rpc_server + scord_ctl_private_rpcs fmt::fmt Boost::program_options +) + +install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/scord-ctl/rpcs/CMakeLists.txt b/src/scord-ctl/rpcs/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ae011dd902d58b27184e3eae3df59649c7c09742 --- /dev/null +++ b/src/scord-ctl/rpcs/CMakeLists.txt @@ -0,0 +1,31 @@ +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +add_library(scord_private_rpcs STATIC) +target_sources(scord_private_rpcs + PRIVATE private.cpp private.hpp) + +target_include_directories(scord_private_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(scord_private_rpcs PUBLIC common::logger Margo::Margo) +set_property(TARGET scord_private_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/src/scord-ctl/rpcs/private.cpp b/src/scord-ctl/rpcs/private.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d472d44ec0dab2230738f7799449a82e628c1411 --- /dev/null +++ b/src/scord-ctl/rpcs/private.cpp @@ -0,0 +1,41 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include "private.hpp" + +static void +ADM_ping(hg_handle_t h) { + + hg_return_t ret; + + [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); + + LOGGER_INFO("PING(noargs)"); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_ping); diff --git a/src/scord-ctl/rpcs/private.hpp b/src/scord-ctl/rpcs/private.hpp new file mode 100644 index 0000000000000000000000000000000000000000..6429eade640b78d47aaa91700c55359616102aaf --- /dev/null +++ b/src/scord-ctl/rpcs/private.hpp @@ -0,0 +1,32 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_CTL_RPCS_PRIVATE_HPP +#define SCORD_CTL_RPCS_PRIVATE_HPP + +#include + +DECLARE_MARGO_RPC_HANDLER(ADM_ping); + +#endif // SCORD_CTL_RPCS_PRIVATE_HPP diff --git a/src/main.cpp b/src/scord-ctl/scord-ctl.cpp similarity index 93% rename from src/main.cpp rename to src/scord-ctl/scord-ctl.cpp index 8686cd3785af20360ea83bd1ef0bf9e84019db32..b99caff9aff2180422a09250cddf095eabf694aa 100644 --- a/src/main.cpp +++ b/src/scord-ctl/scord-ctl.cpp @@ -32,8 +32,10 @@ #include #include -#include +#include #include +#include +#include "rpcs/private.hpp" namespace fs = std::filesystem; namespace bpo = boost::program_options; @@ -150,7 +152,15 @@ main(int argc, char* argv[]) { try { scord::server daemon; - daemon.configure(cfg); + const auto rpc_registration_cb = [](auto&& ctx) { + LOGGER_INFO(" * Registering RPCs handlers..."); + + REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false); + + // TODO: add internal RPCs for communication with scord + }; + + daemon.configure(cfg, rpc_registration_cb); return daemon.run(); } catch(const std::exception& ex) { fmt::print(stderr, diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b7d7d6d134332229c77aa71e931696ac24c4f068 --- /dev/null +++ b/src/scord/CMakeLists.txt @@ -0,0 +1,72 @@ +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +# import rpc definitions for scord +add_subdirectory(rpcs) + +# scord daemon +add_executable(scord scord.cpp) + +target_include_directories( + scord + PUBLIC ${CMAKE_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR} + PRIVATE ${CMAKE_BINARY_DIR}/src ${CMAKE_CURRENT_BINARY_DIR} +) + +target_link_libraries( + scord + PRIVATE common::config + common::logger + common::network::rpc_server + api_rpcs + scord_private_rpcs + fmt::fmt + Boost::program_options +) + +install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/scord/rpcs/CMakeLists.txt b/src/scord/rpcs/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..38b0acf513f7b63de7ec76dc2e59d86999062793 --- /dev/null +++ b/src/scord/rpcs/CMakeLists.txt @@ -0,0 +1,31 @@ +################################################################################ +# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +add_library(scord_ctl_private_rpcs STATIC) +target_sources(scord_ctl_private_rpcs + PRIVATE private.cpp private.hpp) + +target_include_directories(scord_ctl_private_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(scord_ctl_private_rpcs PUBLIC common::logger Margo::Margo) +set_property(TARGET scord_ctl_private_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/src/scord/rpcs/private.cpp b/src/scord/rpcs/private.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d472d44ec0dab2230738f7799449a82e628c1411 --- /dev/null +++ b/src/scord/rpcs/private.cpp @@ -0,0 +1,41 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include "private.hpp" + +static void +ADM_ping(hg_handle_t h) { + + hg_return_t ret; + + [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); + + LOGGER_INFO("PING(noargs)"); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); +} + +DEFINE_MARGO_RPC_HANDLER(ADM_ping); diff --git a/src/scord/rpcs/private.hpp b/src/scord/rpcs/private.hpp new file mode 100644 index 0000000000000000000000000000000000000000..3af7df82cfa03fdc9c391de65758c7c53068b953 --- /dev/null +++ b/src/scord/rpcs/private.hpp @@ -0,0 +1,32 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_RPCS_PRIVATE_HPP +#define SCORD_RPCS_PRIVATE_HPP + +#include + +DECLARE_MARGO_RPC_HANDLER(ADM_ping); + +#endif // SCORD_RPCS_PRIVATE_HPP diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b1430ff99f678572c7f5a49a434a0627e448e52a --- /dev/null +++ b/src/scord/scord.cpp @@ -0,0 +1,277 @@ +/****************************************************************************** + * Copyright 2021, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include "rpcs/private.hpp" + +namespace fs = std::filesystem; +namespace bpo = boost::program_options; + +void +print_version(const std::string& progname) { + fmt::print("{} {}\n", progname, scord::version_string); +} + +void +print_help(const std::string& progname, + const bpo::options_description& opt_desc) { + fmt::print("Usage: {} [options]\n\n", progname); + fmt::print("{}", opt_desc); +} + + +int +main(int argc, char* argv[]) { + + scord::config::settings cfg; + + // define the command line options allowed + bpo::options_description opt_desc("Options"); + opt_desc.add_options() + // run in foreground + (",f", + bpo::bool_switch()->default_value(false)->notifier( + [&](const bool& flag_value) { + cfg.daemonize(!flag_value); + }), + "foreground operation") + + // force logging messages to the console + ("force-console,C", + bpo::value() + ->implicit_value("") + ->zero_tokens() + ->notifier([&](const std::string&) { + cfg.use_console(true); + }), + "override any logging options defined in configuration files and " + "send all daemon output to the console") + + // use provided configuration file instead of the system-wide + // configuration file defined when building the daemon + ("config-file,c", + bpo::value() + ->value_name("FILENAME") + ->implicit_value("") + ->notifier([&](const std::string& filename) { + cfg.config_file(filename); + }), + "override the system-wide configuration file with FILENAME") + + // print the daemon version + ("version,v", + bpo::value()->implicit_value("")->zero_tokens(), + "print version string") + + // print help + ("help,h", + bpo::value()->implicit_value("")->zero_tokens(), + "produce help message"); + + // parse the command line + bpo::variables_map vm; + + try { + bpo::store(bpo::parse_command_line(argc, argv, opt_desc), vm); + + // the --help and --version arguments are special, since we want + // to process them even if the global configuration file doesn't exist + if(vm.count("help")) { + print_help(cfg.progname(), opt_desc); + return EXIT_SUCCESS; + } + + if(vm.count("version")) { + print_version(cfg.progname()); + return EXIT_SUCCESS; + } + + const fs::path config_file = (vm.count("config-file") == 0) + ? cfg.config_file() + : vm["config-file"].as(); + + if(!fs::exists(config_file)) { + fmt::print(stderr, + "Failed to access daemon configuration file {}\n", + config_file); + return EXIT_FAILURE; + } + + try { + cfg.load_from_file(config_file); + } catch(const std::exception& ex) { + fmt::print(stderr, + "Failed reading daemon configuration file:\n" + " {}\n", + ex.what()); + return EXIT_FAILURE; + } + + // calling notify() here basically invokes all define notifiers, thus + // overriding any configuration loaded from the global configuration + // file with its command-line counterparts if provided (for those + // options where this is available) + bpo::notify(vm); + } catch(const bpo::error& ex) { + fmt::print(stderr, "ERROR: {}\n\n", ex.what()); + return EXIT_FAILURE; + } + + try { + scord::server daemon; + + const auto rpc_registration_cb = [](auto&& ctx) { + LOGGER_INFO(" * Registering RPCs handlers..."); + + REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false); + REGISTER_RPC(ctx, "ADM_input", ADM_input_in_t, ADM_input_out_t, + ADM_input, true); + + + REGISTER_RPC(ctx, "ADM_output", ADM_output_in_t, ADM_output_out_t, + ADM_output, true); + + REGISTER_RPC(ctx, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t, + ADM_inout, true); + + REGISTER_RPC(ctx, "ADM_adhoc_context", ADM_adhoc_context_in_t, + ADM_adhoc_context_out_t, ADM_adhoc_context, true); + + REGISTER_RPC(ctx, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t, + ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, + true); + + REGISTER_RPC(ctx, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t, + ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true); + + REGISTER_RPC(ctx, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t, + ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true); + + REGISTER_RPC(ctx, "ADM_adhoc_access", ADM_adhoc_access_in_t, + ADM_adhoc_access_out_t, ADM_adhoc_access, true); + + REGISTER_RPC( + ctx, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t, + ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true); + + REGISTER_RPC(ctx, "ADM_adhoc_background_flush", + ADM_adhoc_background_flush_in_t, + ADM_adhoc_background_flush_out_t, + ADM_adhoc_background_flush, true); + + REGISTER_RPC(ctx, "ADM_in_situ_ops", ADM_in_situ_ops_in_t, + ADM_in_situ_ops_out_t, ADM_in_situ_ops, true); + + REGISTER_RPC(ctx, "ADM_in_transit_ops", ADM_in_transit_ops_in_t, + ADM_in_transit_ops_out_t, ADM_in_transit_ops, true); + + REGISTER_RPC(ctx, "ADM_transfer_dataset", ADM_transfer_dataset_in_t, + ADM_transfer_dataset_out_t, ADM_transfer_dataset, + true); + + REGISTER_RPC(ctx, "ADM_set_dataset_information", + ADM_set_dataset_information_in_t, + ADM_set_dataset_information_out_t, + ADM_set_dataset_information, true); + + REGISTER_RPC(ctx, "ADM_set_io_resources", ADM_set_io_resources_in_t, + ADM_set_io_resources_out_t, ADM_set_io_resources, + true); + + REGISTER_RPC(ctx, "ADM_get_transfer_priority", + ADM_get_transfer_priority_in_t, + ADM_get_transfer_priority_out_t, + ADM_get_transfer_priority, true); + + REGISTER_RPC(ctx, "ADM_set_transfer_priority", + ADM_set_transfer_priority_in_t, + ADM_set_transfer_priority_out_t, + ADM_set_transfer_priority, true); + + REGISTER_RPC(ctx, "ADM_cancel_transfer", ADM_cancel_transfer_in_t, + ADM_cancel_transfer_out_t, ADM_cancel_transfer, true); + + REGISTER_RPC(ctx, "ADM_get_pending_transfers", + ADM_get_pending_transfers_in_t, + ADM_get_pending_transfers_out_t, + ADM_get_pending_transfers, true); + + REGISTER_RPC(ctx, "ADM_set_qos_constraints", + ADM_set_qos_constraints_in_t, + ADM_set_qos_constraints_out_t, ADM_set_qos_constraints, + true); + + REGISTER_RPC(ctx, "ADM_get_qos_constraints", + ADM_get_qos_constraints_in_t, + ADM_get_qos_constraints_out_t, ADM_get_qos_constraints, + true); + + REGISTER_RPC(ctx, "ADM_define_data_operation", + ADM_define_data_operation_in_t, + ADM_define_data_operation_out_t, + ADM_define_data_operation, true); + + REGISTER_RPC(ctx, "ADM_connect_data_operation", + ADM_connect_data_operation_in_t, + ADM_connect_data_operation_out_t, + ADM_connect_data_operation, true); + + REGISTER_RPC(ctx, "ADM_finalize_data_operation", + ADM_finalize_data_operation_in_t, + ADM_finalize_data_operation_out_t, + ADM_finalize_data_operation, true); + + REGISTER_RPC(ctx, "ADM_link_transfer_to_data_operation", + ADM_link_transfer_to_data_operation_in_t, + ADM_link_transfer_to_data_operation_out_t, + ADM_link_transfer_to_data_operation, true); + + REGISTER_RPC(ctx, "ADM_get_statistics", ADM_get_statistics_in_t, + ADM_get_statistics_out_t, ADM_get_statistics, true); + + // TODO: add internal RPCs for communication with scord-ctl + }; + + daemon.configure(cfg, rpc_registration_cb); + return daemon.run(); + } catch(const std::exception& ex) { + fmt::print(stderr, + "An unhandled exception reached the top of main(), " + "{} will exit:\n what(): {}\n", + cfg.progname(), ex.what()); + return EXIT_FAILURE; + } +}