diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt
index c1e49ffb8a3c1bc2dc6b58c0486624c6917c76ee..4df33f0d8d0a1d61b2cbaf3c03ca2513158fdd3f 100644
--- a/examples/cxx/CMakeLists.txt
+++ b/examples/cxx/CMakeLists.txt
@@ -27,7 +27,8 @@ list(APPEND examples_cxx
ADM_register_job ADM_update_job ADM_remove_job
ADM_register_adhoc_storage ADM_update_adhoc_storage
ADM_remove_adhoc_storage ADM_deploy_adhoc_storage
- ADM_in_situ_ops ADM_in_transit_ops ADM_transfer_dataset
+ # ADM_in_situ_ops ADM_in_transit_ops
+ ADM_transfer_dataset
ADM_set_dataset_information ADM_set_io_resources ADM_get_transfer_priority
ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers
ADM_set_qos_constraints ADM_get_qos_constraints ADM_define_data_operation ADM_connect_data_operation
@@ -37,6 +38,6 @@ foreach (example IN LISTS examples_cxx)
add_executable(${example}_cxx)
target_sources(${example}_cxx PRIVATE ${example}.cpp)
target_link_libraries(${example}_cxx
- PUBLIC network_engine fmt::fmt adm_iosched)
+ PUBLIC common::network::engine fmt::fmt adm_iosched)
set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example})
endforeach()
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 84cf47cfb1b02364d6afd224fe0d7f900fafe27d..7c2edf2d5f402c014340e12fa6b728b4268d307f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -22,21 +22,9 @@
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
-add_subdirectory(config)
-add_subdirectory(utils)
-add_subdirectory(logger)
-add_subdirectory(network)
+add_subdirectory(common)
+add_subdirectory(scord)
+add_subdirectory(scord-ctl)
-add_executable(scord server.cpp main.cpp)
-
-target_include_directories(
- scord PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}
-)
-
-target_link_libraries(
- scord PRIVATE config logger network_engine fmt::fmt Boost::program_options
-)
-
-install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR})
-
-add_subdirectory(api)
+# public libraries
+add_subdirectory(lib)
diff --git a/src/api/detail/impl.cpp b/src/api/detail/impl.cpp
deleted file mode 100644
index ba09387a1460c967f9f24309d60aee0b45683067..0000000000000000000000000000000000000000
--- a/src/api/detail/impl.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/******************************************************************************
- * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
- *
- * This software was partially supported by the EuroHPC-funded project ADMIRE
- * (Project ID: 956748, https://www.admire-eurohpc.eu).
- *
- * This file is part of scord.
- *
- * scord is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * scord is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with scord. If not, see .
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *****************************************************************************/
-
-#include
-#include
-#include "impl.hpp"
-
-namespace admire::detail {
-
-admire::error_code
-ping(const server& srv) {
-
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
-
- auto endp = rpc_client.lookup(srv.m_address);
-
- LOGGER_INFO("ADM_ping()");
- endp.call("ADM_ping");
-
- LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS);
- return ADM_SUCCESS;
-}
-
-tl::expected
-register_job(const admire::server& srv, ADM_job_requirements_t reqs) {
- (void) srv;
- (void) reqs;
-
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
-
- auto endp = rpc_client.lookup(srv.m_address);
-
- LOGGER_INFO("ADM_register_job(...)");
-
- ADM_register_job_in_t in{};
- ADM_register_job_out_t out;
-
- endp.call("ADM_register_job", &in, &out);
-
- if(out.ret < 0) {
- LOGGER_ERROR("ADM_register_job() = {}", out.ret);
- return tl::make_unexpected(static_cast(out.ret));
- }
-
- LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS);
- return admire::job{42};
-}
-
-} // namespace admire::detail
diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..f3fbb8b22569acb1ebb1998ba0efddb01dd6bb25
--- /dev/null
+++ b/src/common/CMakeLists.txt
@@ -0,0 +1,46 @@
+################################################################################
+# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+add_library(common INTERFACE)
+target_include_directories(common INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+
+add_subdirectory(utils)
+target_include_directories(_utils INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+add_library(common::utils ALIAS _utils)
+
+add_subdirectory(config)
+target_include_directories(_config INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+add_library(common::config ALIAS _config)
+
+add_subdirectory(logger)
+target_include_directories(_logger INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+add_library(common::logger ALIAS _logger)
+
+add_subdirectory(network)
+target_include_directories(_network_engine INTERFACE
+ ${CMAKE_CURRENT_SOURCE_DIR})
+add_library(common::network::engine ALIAS _network_engine)
+target_include_directories(_rpc_server INTERFACE
+ ${CMAKE_CURRENT_SOURCE_DIR})
+add_library(common::network::rpc_server ALIAS _rpc_server)
diff --git a/src/config/CMakeLists.txt b/src/common/config/CMakeLists.txt
similarity index 94%
rename from src/config/CMakeLists.txt
rename to src/common/config/CMakeLists.txt
index 872dfb27cafa0d0cb064ea061a5578c5722b9e04..fa464f06961b2d49ad30770b4964386fa2a3a969 100644
--- a/src/config/CMakeLists.txt
+++ b/src/common/config/CMakeLists.txt
@@ -23,18 +23,18 @@
################################################################################
# Create a config target for all configuration code
-add_library(config STATIC)
+add_library(_config STATIC)
# Since some of the sources will be auto-generated, we need to search for
# includes in ${CMAKE_CURRENT_BINARY_DIR}
target_include_directories(
- config PRIVATE ${CMAKE_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR}
+ _config PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_BINARY_DIR}
)
-set_property(TARGET config PROPERTY POSITION_INDEPENDENT_CODE ON)
+set_property(TARGET _config PROPERTY POSITION_INDEPENDENT_CODE ON)
target_sources(
- config
+ _config
PRIVATE config.hpp
defaults.hpp
${CMAKE_CURRENT_BINARY_DIR}/defaults.cpp
@@ -46,7 +46,7 @@ target_sources(
settings.hpp
)
-target_link_libraries(config PRIVATE utils file_options::file_options)
+target_link_libraries(_config PRIVATE common::utils file_options::file_options)
# ##############################################################################
# Produce several auto-generated files for 'config'
diff --git a/src/config/config.hpp b/src/common/config/config.hpp
similarity index 100%
rename from src/config/config.hpp
rename to src/common/config/config.hpp
diff --git a/src/config/defaults.cpp.in b/src/common/config/defaults.cpp.in
similarity index 100%
rename from src/config/defaults.cpp.in
rename to src/common/config/defaults.cpp.in
diff --git a/src/config/defaults.hpp b/src/common/config/defaults.hpp
similarity index 100%
rename from src/config/defaults.hpp
rename to src/common/config/defaults.hpp
diff --git a/src/config/file_options.yml b/src/common/config/file_options.yml
similarity index 100%
rename from src/config/file_options.yml
rename to src/common/config/file_options.yml
diff --git a/src/config/genopts.yml.in b/src/common/config/genopts.yml.in
similarity index 100%
rename from src/config/genopts.yml.in
rename to src/common/config/genopts.yml.in
diff --git a/src/config/parsers.cpp b/src/common/config/parsers.cpp
similarity index 99%
rename from src/config/parsers.cpp
rename to src/common/config/parsers.cpp
index 0fdc97067b37c6929534ebd0b609cb89baacbc30..a45d853a1aa0eac0508f4c1a3aa09e5aa12ace55 100644
--- a/src/config/parsers.cpp
+++ b/src/common/config/parsers.cpp
@@ -29,7 +29,7 @@
#include
#include
#include
-#include
+#include "parsers.hpp"
namespace fs = std::filesystem;
diff --git a/src/config/parsers.hpp b/src/common/config/parsers.hpp
similarity index 100%
rename from src/config/parsers.hpp
rename to src/common/config/parsers.hpp
diff --git a/src/config/settings.cpp b/src/common/config/settings.cpp
similarity index 100%
rename from src/config/settings.cpp
rename to src/common/config/settings.cpp
diff --git a/src/config/settings.hpp b/src/common/config/settings.hpp
similarity index 100%
rename from src/config/settings.hpp
rename to src/common/config/settings.hpp
diff --git a/src/logger/CMakeLists.txt b/src/common/logger/CMakeLists.txt
similarity index 89%
rename from src/logger/CMakeLists.txt
rename to src/common/logger/CMakeLists.txt
index a56254f66145ba311d2b712169d101a516a9bd3a..6836b1244fbdc87d6692341bdc6c1f6220971e20 100644
--- a/src/logger/CMakeLists.txt
+++ b/src/common/logger/CMakeLists.txt
@@ -22,10 +22,8 @@
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
-add_library(logger INTERFACE)
+add_library(_logger INTERFACE)
-target_include_directories(logger INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+target_sources(_logger INTERFACE logger.hpp)
-target_sources(logger INTERFACE logger.hpp)
-
-target_link_libraries(logger INTERFACE spdlog::spdlog fmt::fmt)
+target_link_libraries(_logger INTERFACE spdlog::spdlog fmt::fmt)
diff --git a/src/logger/logger.hpp b/src/common/logger/logger.hpp
similarity index 100%
rename from src/logger/logger.hpp
rename to src/common/logger/logger.hpp
diff --git a/src/network/CMakeLists.txt b/src/common/network/CMakeLists.txt
similarity index 82%
rename from src/network/CMakeLists.txt
rename to src/common/network/CMakeLists.txt
index 38e7e5b68570b232d120dd4792399cf18697f54c..5aa5930d8f503db018442e559298c0468453c441 100644
--- a/src/network/CMakeLists.txt
+++ b/src/common/network/CMakeLists.txt
@@ -22,16 +22,24 @@
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
-add_library(network_engine STATIC)
+add_library(_network_engine STATIC)
target_sources(
- network_engine
+ _network_engine
INTERFACE engine.hpp
- PRIVATE rpcs.hpp rpcs.cpp detail/address.hpp
+ PRIVATE detail/address.hpp
)
-target_include_directories(network_engine INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(
- network_engine PUBLIC logger transport_library Mercury::Mercury
+ _network_engine PUBLIC common::logger transport_library Mercury::Mercury
Argobots::Argobots Margo::Margo
)
-set_property(TARGET network_engine PROPERTY POSITION_INDEPENDENT_CODE ON)
+set_property(TARGET _network_engine PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+add_library(_rpc_server STATIC)
+target_sources(
+ _rpc_server
+ INTERFACE server.hpp
+ PRIVATE server.cpp
+)
+
+target_link_libraries(_rpc_server PUBLIC common::config _network_engine)
diff --git a/src/network/detail/address.hpp b/src/common/network/detail/address.hpp
similarity index 100%
rename from src/network/detail/address.hpp
rename to src/common/network/detail/address.hpp
diff --git a/src/common/network/engine.hpp b/src/common/network/engine.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..79c36d8ea72a6d05c963571250f7fa93fec945c2
--- /dev/null
+++ b/src/common/network/engine.hpp
@@ -0,0 +1,282 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#ifndef SCORD_NETWORK_ENGINE_HPP
+#define SCORD_NETWORK_ENGINE_HPP
+
+#include
+#include
+#include
+#include
+#include
+#include "detail/address.hpp"
+
+
+namespace scord::network {
+
+namespace detail {
+
+#define REGISTER_RPC(__engine, __func_name, __in_t, __out_t, __handler, \
+ requires_response) \
+ { \
+ REGISTER_RPC_IMPL((__engine)->m_context->m_mid, \
+ (__engine)->m_context->m_rpc_names, __func_name, \
+ __in_t, __out_t, __handler, requires_response); \
+ }
+
+#define REGISTER_RPC_IMPL(__mid, __rpc_names, __func_name, __in_t, __out_t, \
+ __handler, requires_response) \
+ { \
+ hg_id_t id = margo_provider_register_name( \
+ __mid, __func_name, BOOST_PP_CAT(hg_proc_, __in_t), \
+ BOOST_PP_CAT(hg_proc_, __out_t), _handler_for_##__handler, \
+ MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL); \
+ (__rpc_names).emplace(__func_name, id); \
+ if(!(requires_response)) { \
+ ::margo_registered_disable_response(__mid, id, HG_TRUE); \
+ } \
+ }
+
+
+struct margo_context {
+
+ explicit margo_context(::margo_instance_id mid) : m_mid(mid) {}
+
+ margo_instance_id m_mid;
+ std::unordered_map m_rpc_names;
+};
+
+} // namespace detail
+
+// forward declarations
+struct endpoint;
+
+struct engine {
+
+ enum class execution_mode : bool {
+ server = MARGO_SERVER_MODE,
+ client = MARGO_CLIENT_MODE
+ };
+
+ explicit engine(std::string_view address,
+ execution_mode = execution_mode::client) {
+ struct margo_init_info info = MARGO_INIT_INFO_INITIALIZER;
+
+ m_context = std::make_shared(
+ margo_init_ext(address.data(), MARGO_SERVER_MODE, &info));
+
+ if(m_context->m_mid == MARGO_INSTANCE_NULL) {
+ throw std::runtime_error("Margo initialization failed");
+ }
+ }
+
+ ~engine() {
+ if(m_context) {
+ ::margo_finalize(m_context->m_mid);
+ }
+ }
+
+ void
+ listen() const {
+
+ /* NOTE: there isn't anything else for the server to do at this point
+ * except wait for itself to be shut down. The
+ * margo_wait_for_finalize() call here yields to let Margo drive
+ * progress until that happens.
+ */
+ ::margo_wait_for_finalize(m_context->m_mid);
+ }
+
+ void
+ stop() {
+ ::margo_finalize(m_context->m_mid);
+
+ // It is not safe to access m_margo_context->m_mid after the
+ // margo_finalize() call. Make sure that no other threads can do a
+ // double margo_finalize() (e.g when calling ~engine()) by resetting
+ // m_margo_context.
+ m_context.reset();
+ }
+
+ endpoint
+ lookup(const std::string& address) const;
+
+ std::shared_ptr m_context;
+};
+
+struct endpoint {
+private:
+ // Endpoints should only be created by calling engine::lookup()
+ friend class engine;
+
+ endpoint(std::shared_ptr context,
+ std::shared_ptr address)
+ : m_margo_context(std::move(context)), m_address(std::move(address)) {}
+
+public:
+ endpoint(const endpoint& /*other*/) = default;
+ endpoint&
+ operator=(const endpoint& /*other*/) = default;
+ endpoint(endpoint&& /*rhs*/) = default;
+ endpoint&
+ operator=(endpoint&& /*rhs*/) = default;
+
+ template
+ void
+ call(const std::string& id, Args&&... args) {
+
+ const auto it = m_margo_context->m_rpc_names.find(id);
+
+ if(it == m_margo_context->m_rpc_names.end()) {
+ throw std::runtime_error(
+ fmt::format("Unknown remote procedure: {}", id));
+ }
+
+ hg_handle_t handle;
+ auto ret = ::margo_create(m_margo_context->m_mid,
+ m_address->mercury_address(), it->second,
+ &handle);
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during endpoint::call(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+
+ ret = ::margo_forward(handle, nullptr);
+
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during endpoint::call(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+
+ ret = ::margo_destroy(handle);
+
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during endpoint::call(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+ }
+
+ /**
+ * Deprecated call, used to support Margo directly
+ *
+ **/
+ template
+ [[deprecated("It should be eventually replaced by a generic call")]] void
+ call(const std::string& id, T1 input = nullptr, T2 output = nullptr) {
+
+ const auto it = m_margo_context->m_rpc_names.find(id);
+
+ if(it == m_margo_context->m_rpc_names.end()) {
+ throw std::runtime_error(
+ fmt::format("Unknown remote procedure: {}", id));
+ }
+
+ hg_handle_t handle;
+ auto ret = ::margo_create(m_margo_context->m_mid,
+ m_address->mercury_address(), it->second,
+ &handle);
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during endpoint::call(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+
+ ret = ::margo_forward(handle, input);
+
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during endpoint::call(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+
+ if(output != nullptr) {
+ ret = ::margo_get_output(handle, output);
+ }
+
+
+ ret = ::margo_destroy(handle);
+
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during endpoint::call(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+ }
+
+private:
+ std::shared_ptr m_margo_context;
+ std::shared_ptr m_address;
+};
+
+// now that we have the complete definition of engine and endpoint, we can
+// finally define engine::lookup completely
+inline endpoint
+engine::lookup(const std::string& address) const {
+
+ hg_addr_t svr_addr;
+ auto ret =
+ ::margo_addr_lookup(m_context->m_mid, address.c_str(), &svr_addr);
+ if(ret != HG_SUCCESS) {
+ throw std::runtime_error(
+ fmt::format("Error during engine::lookup(): {}",
+ ::HG_Error_to_string(ret)));
+ }
+
+ return {m_context, std::make_shared(
+ ::margo_get_class(m_context->m_mid), svr_addr)};
+}
+
+
+struct rpc_client : engine {
+ explicit rpc_client(const std::string& protocol)
+ : engine(protocol, execution_mode::client) {}
+
+ template
+ rpc_client(const std::string& protocol,
+ Callback&& rpc_registration_callback)
+ : engine(protocol, execution_mode::client) {
+ rpc_registration_callback(this);
+ }
+};
+
+struct rpc_acceptor : engine {
+
+ static std::string
+ format_address(const std::string& protocol, const std::string& address,
+ int port) {
+ return fmt::format("{}://{}:{}", protocol, address, port);
+ }
+
+ rpc_acceptor(const std::string& protocol, const std::string& bind_address,
+ int port)
+ : engine(format_address(protocol, bind_address, port)) {}
+};
+
+
+} // namespace scord::network
+
+#endif // SCORD_NETWORK_ENGINE_HPP
diff --git a/src/server.cpp b/src/common/network/server.cpp
similarity index 98%
rename from src/server.cpp
rename to src/common/network/server.cpp
index c8899f6dc82a57665681e82a096fdea2fcd05707..1818650af3da629ec3073d616fc9eaf395fe936f 100644
--- a/src/server.cpp
+++ b/src/common/network/server.cpp
@@ -33,11 +33,11 @@
#include
#include
-#include
#include
#include
-#include
#include
+#include "engine.hpp"
+#include "server.hpp"
namespace scord {
@@ -264,8 +264,10 @@ server::install_rpc_handlers() {
m_settings->transport_protocol(), m_settings->bind_address(),
m_settings->remote_port());
- m_network_engine->register_rpcs();
-} // namespace scord
+ if(m_rpc_registration_callback) {
+ m_rpc_registration_callback(m_network_engine);
+ }
+}
void
server::check_configuration() {
diff --git a/src/server.hpp b/src/common/network/server.hpp
similarity index 76%
rename from src/server.hpp
rename to src/common/network/server.hpp
index 7c767323af01f0b824cf9599448bbf2c0c5f74dd..930cc84727e9b086feed88a4fa69270b1bc7eb40 100644
--- a/src/server.hpp
+++ b/src/common/network/server.hpp
@@ -27,7 +27,7 @@
#include
#include
-#include
+#include "engine.hpp"
namespace scord {
@@ -46,6 +46,15 @@ public:
~server();
void
configure(const config::settings& settings);
+
+ template
+ void
+ configure(const config::settings& settings,
+ Callback rpc_registration_callback) {
+ configure(settings);
+ m_rpc_registration_callback = rpc_registration_callback;
+ }
+
config::settings
get_configuration() const;
int
@@ -57,6 +66,18 @@ public:
void
teardown_and_exit();
+
+ template
+ void
+ install_rpc_handlers(Callable fun) {
+
+ install_rpc_handlers();
+
+ // FIXME: improve network_engine so that we don't need to rely on
+ // calling a lambda here to register RPCs
+ fun(m_network_engine);
+ }
+
private:
int
daemonize();
@@ -82,6 +103,8 @@ private:
std::unique_ptr m_settings;
std::unique_ptr m_network_engine;
std::unique_ptr m_signal_listener;
+ std::function&)>
+ m_rpc_registration_callback;
};
diff --git a/src/utils/CMakeLists.txt b/src/common/utils/CMakeLists.txt
similarity index 89%
rename from src/utils/CMakeLists.txt
rename to src/common/utils/CMakeLists.txt
index 37360b29fe5f315157715be5b7a55569e8976cfa..576d08a1e36108f10cdcbf95288bd2b486ccb203 100644
--- a/src/utils/CMakeLists.txt
+++ b/src/common/utils/CMakeLists.txt
@@ -22,9 +22,8 @@
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
-add_library(utils STATIC)
+add_library(_utils STATIC)
-target_include_directories(utils PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
-set_property(TARGET utils PROPERTY POSITION_INDEPENDENT_CODE ON)
+set_property(TARGET _utils PROPERTY POSITION_INDEPENDENT_CODE ON)
-target_sources(utils PRIVATE utils.hpp utils.cpp signal_listener.hpp)
+target_sources(_utils PRIVATE utils.hpp utils.cpp signal_listener.hpp)
diff --git a/src/utils/signal_listener.hpp b/src/common/utils/signal_listener.hpp
similarity index 100%
rename from src/utils/signal_listener.hpp
rename to src/common/utils/signal_listener.hpp
diff --git a/src/utils/utils.cpp b/src/common/utils/utils.cpp
similarity index 99%
rename from src/utils/utils.cpp
rename to src/common/utils/utils.cpp
index 05284e19fe5a8fe9fb6dfda57a154539546e867c..59d992f820435de8bb75ff730a0b45eaad85266b 100644
--- a/src/utils/utils.cpp
+++ b/src/common/utils/utils.cpp
@@ -31,7 +31,7 @@
#include
#include
-#include
+#include "utils.hpp"
namespace scord::utils {
diff --git a/src/utils/utils.hpp b/src/common/utils/utils.hpp
similarity index 100%
rename from src/utils/utils.hpp
rename to src/common/utils/utils.hpp
diff --git a/src/api/CMakeLists.txt b/src/lib/CMakeLists.txt
similarity index 80%
rename from src/api/CMakeLists.txt
rename to src/lib/CMakeLists.txt
index e6a5eddb86126c00280e421cb1be2dc76b8f453c..94a90638ef9edbb8836414f768bf5f14aae7452e 100644
--- a/src/api/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -22,6 +22,16 @@
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
+# target for the public rpcs defined by the API that need to be shared by scord
+add_library(api_rpcs STATIC)
+target_sources(api_rpcs
+ PRIVATE rpcs/public.cpp rpcs/public.hpp)
+
+target_include_directories(api_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/rpcs)
+target_link_libraries(api_rpcs PUBLIC common::logger Margo::Margo)
+set_property(TARGET api_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON)
+
+# the client library implementing the actual API
add_library(adm_iosched SHARED)
target_sources(adm_iosched
@@ -30,7 +40,8 @@ target_sources(adm_iosched
target_include_directories(adm_iosched PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
-target_link_libraries(adm_iosched PRIVATE network_engine PUBLIC tl::expected)
+target_link_libraries(adm_iosched PRIVATE common::network::engine api_rpcs PUBLIC
+ tl::expected)
install(
TARGETS adm_iosched
diff --git a/src/api/admire.cpp b/src/lib/admire.cpp
similarity index 73%
rename from src/api/admire.cpp
rename to src/lib/admire.cpp
index 39113f4082d139d73eb371787b36e4e67f0876ec..ebc64a8ae54289ab172dbd9ba78b89b68025ca5a 100644
--- a/src/api/admire.cpp
+++ b/src/lib/admire.cpp
@@ -23,8 +23,9 @@
*****************************************************************************/
#include
-#include
-#include
+#include
+#include
+#include "rpcs/public.hpp"
#include "detail/impl.hpp"
@@ -48,6 +49,105 @@ init_logger() {
scord::logger::create_global_logger("libadm_iosched", "console color");
}
+void
+rpc_registration_cb(scord::network::rpc_client* client) {
+
+ REGISTER_RPC(client, "ADM_ping", void, void, ADM_ping, false);
+ REGISTER_RPC(client, "ADM_input", ADM_input_in_t, ADM_input_out_t,
+ ADM_input, true);
+
+
+ REGISTER_RPC(client, "ADM_output", ADM_output_in_t, ADM_output_out_t,
+ ADM_output, true);
+
+ REGISTER_RPC(client, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t,
+ ADM_inout, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_context", ADM_adhoc_context_in_t,
+ ADM_adhoc_context_out_t, ADM_adhoc_context, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
+ ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
+ ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
+ ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_access", ADM_adhoc_access_in_t,
+ ADM_adhoc_access_out_t, ADM_adhoc_access, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
+ ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_background_flush",
+ ADM_adhoc_background_flush_in_t,
+ ADM_adhoc_background_flush_out_t, ADM_adhoc_background_flush,
+ true);
+
+ REGISTER_RPC(client, "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
+ ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);
+
+ REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
+ ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);
+
+ REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
+ ADM_transfer_dataset_out_t, ADM_transfer_dataset, true);
+
+ REGISTER_RPC(client, "ADM_set_dataset_information",
+ ADM_set_dataset_information_in_t,
+ ADM_set_dataset_information_out_t, ADM_set_dataset_information,
+ true);
+
+ REGISTER_RPC(client, "ADM_set_io_resources", ADM_set_io_resources_in_t,
+ ADM_set_io_resources_out_t, ADM_set_io_resources, true);
+
+ REGISTER_RPC(
+ client, "ADM_get_transfer_priority", ADM_get_transfer_priority_in_t,
+ ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, true);
+
+ REGISTER_RPC(
+ client, "ADM_set_transfer_priority", ADM_set_transfer_priority_in_t,
+ ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, true);
+
+ REGISTER_RPC(client, "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
+ ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);
+
+ REGISTER_RPC(
+ client, "ADM_get_pending_transfers", ADM_get_pending_transfers_in_t,
+ ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, true);
+
+ REGISTER_RPC(client, "ADM_set_qos_constraints",
+ ADM_set_qos_constraints_in_t, ADM_set_qos_constraints_out_t,
+ ADM_set_qos_constraints, true);
+
+ REGISTER_RPC(client, "ADM_get_qos_constraints",
+ ADM_get_qos_constraints_in_t, ADM_get_qos_constraints_out_t,
+ ADM_get_qos_constraints, true);
+
+ REGISTER_RPC(
+ client, "ADM_define_data_operation", ADM_define_data_operation_in_t,
+ ADM_define_data_operation_out_t, ADM_define_data_operation, true);
+
+ REGISTER_RPC(client, "ADM_connect_data_operation",
+ ADM_connect_data_operation_in_t,
+ ADM_connect_data_operation_out_t, ADM_connect_data_operation,
+ true);
+
+ REGISTER_RPC(client, "ADM_finalize_data_operation",
+ ADM_finalize_data_operation_in_t,
+ ADM_finalize_data_operation_out_t, ADM_finalize_data_operation,
+ true);
+
+ REGISTER_RPC(client, "ADM_link_transfer_to_data_operation",
+ ADM_link_transfer_to_data_operation_in_t,
+ ADM_link_transfer_to_data_operation_out_t,
+ ADM_link_transfer_to_data_operation, true);
+
+ REGISTER_RPC(client, "ADM_get_statistics", ADM_get_statistics_in_t,
+ ADM_get_statistics_out_t, ADM_get_statistics, true);
+}
} // namespace
@@ -83,8 +183,7 @@ update_job(const server& srv, ADM_job_t job, ADM_job_requirements_t reqs) {
(void) job;
(void) reqs;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -109,8 +208,7 @@ remove_job(const server& srv, ADM_job_t job) {
(void) srv;
(void) job;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -139,8 +237,7 @@ register_adhoc_storage(const server& srv, ADM_job_t job,
(void) ctx;
(void) adhoc_handle;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -168,8 +265,7 @@ update_adhoc_storage(const server& srv, ADM_job_t job, ADM_adhoc_context_t ctx,
(void) ctx;
(void) adhoc_handle;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -196,8 +292,7 @@ remove_adhoc_storage(const server& srv, ADM_job_t job,
(void) job;
(void) adhoc_handle;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -224,8 +319,7 @@ deploy_adhoc_storage(const server& srv, ADM_job_t job,
(void) job;
(void) adhoc_handle;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -258,8 +352,7 @@ transfer_dataset(const server& srv, ADM_job_t job,
(void) mapping;
(void) tx_handle;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -287,8 +380,7 @@ set_dataset_information(const server& srv, ADM_job_t job,
(void) target;
(void) info;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -316,8 +408,7 @@ set_io_resources(const server& srv, ADM_job_t job, ADM_storage_handle_t tier,
(void) tier;
(void) resources;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -346,8 +437,7 @@ get_transfer_priority(const server& srv, ADM_job_t job,
(void) tx_handle;
(void) priority;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -375,8 +465,7 @@ set_transfer_priority(const server& srv, ADM_job_t job,
(void) tx_handle;
(void) incr;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -403,8 +492,7 @@ cancel_transfer(const server& srv, ADM_job_t job,
(void) job;
(void) tx_handle;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -432,8 +520,7 @@ get_pending_transfers(const server& srv, ADM_job_t job,
(void) job;
(void) pending_transfers;
- scord::network::rpc_client rpc_client{srv.m_protocol};
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -460,9 +547,7 @@ set_qos_constraints(const server& srv, ADM_job_t job, ADM_limit_t limit) {
(void) job;
(void) limit;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -492,9 +577,7 @@ get_qos_constraints(const server& srv, ADM_job_t job, ADM_qos_scope_t scope,
(void) entity;
(void) limits;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -524,9 +607,7 @@ define_data_operation(const server& srv, ADM_job_t job, const char* path,
(void) op;
(void) args;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -558,9 +639,7 @@ connect_data_operation(const server& srv, ADM_job_t job,
(void) should_stream;
(void) args;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -590,9 +669,7 @@ finalize_data_operation(const server& srv, ADM_job_t job,
(void) op;
(void) status;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -623,9 +700,7 @@ link_transfer_to_data_operation(const server& srv, ADM_job_t job,
(void) should_stream;
(void) args;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
@@ -652,9 +727,7 @@ get_statistics(const server& srv, ADM_job_t job, ADM_job_stats_t** stats) {
(void) job;
(void) stats;
- scord::network::rpc_client rpc_client{srv.m_protocol};
-
- rpc_client.register_rpcs();
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
auto endp = rpc_client.lookup(srv.m_address);
diff --git a/src/api/admire.h b/src/lib/admire.h
similarity index 100%
rename from src/api/admire.h
rename to src/lib/admire.h
diff --git a/src/api/admire.hpp b/src/lib/admire.hpp
similarity index 100%
rename from src/api/admire.hpp
rename to src/lib/admire.hpp
diff --git a/src/api/c_wrapper.cpp b/src/lib/c_wrapper.cpp
similarity index 99%
rename from src/api/c_wrapper.cpp
rename to src/lib/c_wrapper.cpp
index 1a4e03c2dbdb17ab6069076146169abe32efbc77..1ffffe0678c1ea3bc599cb95eb444e08e7504b73 100644
--- a/src/api/c_wrapper.cpp
+++ b/src/lib/c_wrapper.cpp
@@ -24,7 +24,7 @@
#include
#include
-#include
+#include
#include "detail/impl.hpp"
struct adm_server {
diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..116d2112ab9190a50f0f212d9e7030c2ec15dcd1
--- /dev/null
+++ b/src/lib/detail/impl.cpp
@@ -0,0 +1,171 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include
+#include "rpcs/public.hpp"
+#include "impl.hpp"
+
+void
+rpc_registration_cb(scord::network::rpc_client* client) {
+
+ REGISTER_RPC(client, "ADM_ping", void, void, ADM_ping, false);
+ REGISTER_RPC(client, "ADM_input", ADM_input_in_t, ADM_input_out_t,
+ ADM_input, true);
+
+
+ REGISTER_RPC(client, "ADM_output", ADM_output_in_t, ADM_output_out_t,
+ ADM_output, true);
+
+ REGISTER_RPC(client, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t,
+ ADM_inout, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_context", ADM_adhoc_context_in_t,
+ ADM_adhoc_context_out_t, ADM_adhoc_context, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
+ ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
+ ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
+ ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_access", ADM_adhoc_access_in_t,
+ ADM_adhoc_access_out_t, ADM_adhoc_access, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
+ ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true);
+
+ REGISTER_RPC(client, "ADM_adhoc_background_flush",
+ ADM_adhoc_background_flush_in_t,
+ ADM_adhoc_background_flush_out_t, ADM_adhoc_background_flush,
+ true);
+
+ REGISTER_RPC(client, "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
+ ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);
+
+ REGISTER_RPC(client, "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
+ ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);
+
+ REGISTER_RPC(client, "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
+ ADM_transfer_dataset_out_t, ADM_transfer_dataset, true);
+
+ REGISTER_RPC(client, "ADM_set_dataset_information",
+ ADM_set_dataset_information_in_t,
+ ADM_set_dataset_information_out_t, ADM_set_dataset_information,
+ true);
+
+ REGISTER_RPC(client, "ADM_set_io_resources", ADM_set_io_resources_in_t,
+ ADM_set_io_resources_out_t, ADM_set_io_resources, true);
+
+ REGISTER_RPC(
+ client, "ADM_get_transfer_priority", ADM_get_transfer_priority_in_t,
+ ADM_get_transfer_priority_out_t, ADM_get_transfer_priority, true);
+
+ REGISTER_RPC(
+ client, "ADM_set_transfer_priority", ADM_set_transfer_priority_in_t,
+ ADM_set_transfer_priority_out_t, ADM_set_transfer_priority, true);
+
+ REGISTER_RPC(client, "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
+ ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);
+
+ REGISTER_RPC(
+ client, "ADM_get_pending_transfers", ADM_get_pending_transfers_in_t,
+ ADM_get_pending_transfers_out_t, ADM_get_pending_transfers, true);
+
+ REGISTER_RPC(client, "ADM_set_qos_constraints",
+ ADM_set_qos_constraints_in_t, ADM_set_qos_constraints_out_t,
+ ADM_set_qos_constraints, true);
+
+ REGISTER_RPC(client, "ADM_get_qos_constraints",
+ ADM_get_qos_constraints_in_t, ADM_get_qos_constraints_out_t,
+ ADM_get_qos_constraints, true);
+
+ REGISTER_RPC(
+ client, "ADM_define_data_operation", ADM_define_data_operation_in_t,
+ ADM_define_data_operation_out_t, ADM_define_data_operation, true);
+
+ REGISTER_RPC(client, "ADM_connect_data_operation",
+ ADM_connect_data_operation_in_t,
+ ADM_connect_data_operation_out_t, ADM_connect_data_operation,
+ true);
+
+ REGISTER_RPC(client, "ADM_finalize_data_operation",
+ ADM_finalize_data_operation_in_t,
+ ADM_finalize_data_operation_out_t, ADM_finalize_data_operation,
+ true);
+
+ REGISTER_RPC(client, "ADM_link_transfer_to_data_operation",
+ ADM_link_transfer_to_data_operation_in_t,
+ ADM_link_transfer_to_data_operation_out_t,
+ ADM_link_transfer_to_data_operation, true);
+
+ REGISTER_RPC(client, "ADM_get_statistics", ADM_get_statistics_in_t,
+ ADM_get_statistics_out_t, ADM_get_statistics, true);
+}
+
+namespace admire::detail {
+
+admire::error_code
+ping(const server& srv) {
+
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
+
+ auto endp = rpc_client.lookup(srv.m_address);
+
+ LOGGER_INFO("ADM_ping()");
+ endp.call("ADM_ping");
+
+ LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS);
+ return ADM_SUCCESS;
+}
+
+tl::expected
+register_job(const admire::server& srv, ADM_job_requirements_t reqs) {
+ (void) srv;
+ (void) reqs;
+
+ scord::network::rpc_client rpc_client{srv.m_protocol, rpc_registration_cb};
+
+ auto endp = rpc_client.lookup(srv.m_address);
+
+ LOGGER_INFO("ADM_register_job(...)");
+
+ ADM_register_job_in_t in{};
+ ADM_register_job_out_t out;
+
+ endp.call("ADM_register_job", &in, &out);
+
+ if(out.ret < 0) {
+ LOGGER_ERROR("ADM_register_job() = {}", out.ret);
+ return tl::make_unexpected(static_cast(out.ret));
+ }
+
+ LOGGER_INFO("ADM_register_job() = {}", ADM_SUCCESS);
+ return admire::job{42};
+}
+
+} // namespace admire::detail
diff --git a/src/api/detail/impl.hpp b/src/lib/detail/impl.hpp
similarity index 100%
rename from src/api/detail/impl.hpp
rename to src/lib/detail/impl.hpp
diff --git a/src/api/errors.c b/src/lib/errors.c
similarity index 100%
rename from src/api/errors.c
rename to src/lib/errors.c
diff --git a/src/network/rpcs.cpp b/src/lib/rpcs/public.cpp
similarity index 99%
rename from src/network/rpcs.cpp
rename to src/lib/rpcs/public.cpp
index 3f6dd76645eedd720db719821a7975ab92bf3ace..83dbfae8eabcfef4800ea1387a8ec6189ac421ce 100644
--- a/src/network/rpcs.cpp
+++ b/src/lib/rpcs/public.cpp
@@ -22,7 +22,7 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
-#include "rpcs.hpp"
+#include "public.hpp"
static void
ADM_ping(hg_handle_t h) {
diff --git a/src/network/rpcs.hpp b/src/lib/rpcs/public.hpp
similarity index 98%
rename from src/network/rpcs.hpp
rename to src/lib/rpcs/public.hpp
index 23931fb01ff420e0face78368fb591ed34e1127a..6b9f9d4f47baa243d91c8e4eae522430ec7d6e7b 100644
--- a/src/network/rpcs.hpp
+++ b/src/lib/rpcs/public.hpp
@@ -22,15 +22,15 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
-#ifndef SCORD_NETWORK_RPCS_HPP
-#define SCORD_NETWORK_RPCS_HPP
+// clang-format off
+#ifndef SCORD_RPCS_PUBLIC_HPP
+#define SCORD_RPCS_PUBLIC_HPP
#include
-#include
#include
#include
#include
-
+#include
// FIXME: cannot be in a namespace due to Margo limitations
// namespace scord::network::rpc {
@@ -342,4 +342,5 @@ DECLARE_MARGO_RPC_HANDLER(ADM_get_statistics);
//} // namespace scord::network::rpc
-#endif // SCORD_NETWORK_RPCS_HPP
+#endif // SCORD_RPCS_PUBLIC_HPP
+// clang-format on
diff --git a/src/network/engine.hpp b/src/network/engine.hpp
deleted file mode 100644
index 3ecd59633d25bb948e2ed414c7cdf937c38131f1..0000000000000000000000000000000000000000
--- a/src/network/engine.hpp
+++ /dev/null
@@ -1,441 +0,0 @@
-/******************************************************************************
- * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
- *
- * This software was partially supported by the EuroHPC-funded project ADMIRE
- * (Project ID: 956748, https://www.admire-eurohpc.eu).
- *
- * This file is part of scord.
- *
- * scord is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * scord is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with scord. If not, see .
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *****************************************************************************/
-
-#ifndef SCORD_NETWORK_ENGINE_HPP
-#define SCORD_NETWORK_ENGINE_HPP
-
-#include
-#include
-#include
-#include
-#include
-#include
-#include "rpcs.hpp"
-
-
-namespace scord::network {
-
-namespace detail {
-
-#define REGISTER_RPC(__mid, __m_rpc_names, __func_name, __in_t, __out_t, \
- __handler, requires_response) \
- { \
- hg_id_t id = margo_provider_register_name( \
- __mid, __func_name, BOOST_PP_CAT(hg_proc_, __in_t), \
- BOOST_PP_CAT(hg_proc_, __out_t), _handler_for_##__handler, \
- MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL); \
- __m_rpc_names.emplace(__func_name, id); \
- if(!requires_response) { \
- ::margo_registered_disable_response(__mid, id, HG_TRUE); \
- } \
- }
-
-
-struct margo_context {
-
- explicit margo_context(::margo_instance_id mid) : m_mid(mid) {}
-
- void
- register_rpc(const std::string& name, bool requires_response) {
- auto id = MARGO_REGISTER(m_mid, name.c_str(), void, void, ADM_ping);
- m_rpc_names.emplace(name, id);
-
- if(!requires_response) {
- ::margo_registered_disable_response(m_mid, id, HG_TRUE);
- }
- }
-
- margo_instance_id m_mid;
- std::unordered_map m_rpc_names;
-};
-
-} // namespace detail
-
-// forward declarations
-struct endpoint;
-
-struct engine {
-
- enum class execution_mode : bool {
- server = MARGO_SERVER_MODE,
- client = MARGO_CLIENT_MODE
- };
-
- explicit engine(std::string_view address,
- execution_mode = execution_mode::client) {
- struct margo_init_info info = MARGO_INIT_INFO_INITIALIZER;
-
- m_context = std::make_shared(
- margo_init_ext(address.data(), MARGO_SERVER_MODE, &info));
-
- if(m_context->m_mid == MARGO_INSTANCE_NULL) {
- throw std::runtime_error("Margo initialization failed");
- }
- }
-
- ~engine() {
- if(m_context) {
- ::margo_finalize(m_context->m_mid);
- }
- }
-
- void
- register_rpcs() {
-
- // register RPCs manually for now
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_ping", void,
- void, ADM_ping, false);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_register_job", ADM_register_job_in_t,
- ADM_register_job_out_t, ADM_register_job, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_update_job",
- ADM_update_job_in_t, ADM_update_job_out_t, ADM_update_job,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_remove_job",
- ADM_remove_job_in_t, ADM_remove_job_out_t, ADM_remove_job,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_register_adhoc_storage",
- ADM_register_adhoc_storage_in_t,
- ADM_register_adhoc_storage_out_t,
- ADM_register_adhoc_storage, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_update_adhoc_storage", ADM_update_adhoc_storage_in_t,
- ADM_update_adhoc_storage_out_t, ADM_update_adhoc_storage,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_remove_adhoc_storage", ADM_remove_adhoc_storage_in_t,
- ADM_remove_adhoc_storage_out_t, ADM_remove_adhoc_storage,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_deploy_adhoc_storage", ADM_deploy_adhoc_storage_in_t,
- ADM_deploy_adhoc_storage_out_t, ADM_deploy_adhoc_storage,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_input",
- ADM_input_in_t, ADM_input_out_t, ADM_input, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_output",
- ADM_output_in_t, ADM_output_out_t, ADM_output, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names, "ADM_inout",
- ADM_inout_in_t, ADM_inout_out_t, ADM_inout, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_context", ADM_adhoc_context_in_t,
- ADM_adhoc_context_out_t, ADM_adhoc_context, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
- ADM_adhoc_context_id_out_t, ADM_adhoc_context_id, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
- ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
- ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_access", ADM_adhoc_access_in_t,
- ADM_adhoc_access_out_t, ADM_adhoc_access, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
- ADM_adhoc_distribution_out_t, ADM_adhoc_distribution,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_adhoc_background_flush",
- ADM_adhoc_background_flush_in_t,
- ADM_adhoc_background_flush_out_t,
- ADM_adhoc_background_flush, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
- ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
- ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
- ADM_transfer_dataset_out_t, ADM_transfer_dataset, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_set_dataset_information",
- ADM_set_dataset_information_in_t,
- ADM_set_dataset_information_out_t,
- ADM_set_dataset_information, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_set_io_resources", ADM_set_io_resources_in_t,
- ADM_set_io_resources_out_t, ADM_set_io_resources, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_get_transfer_priority",
- ADM_get_transfer_priority_in_t,
- ADM_get_transfer_priority_out_t, ADM_get_transfer_priority,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_set_transfer_priority",
- ADM_set_transfer_priority_in_t,
- ADM_set_transfer_priority_out_t, ADM_set_transfer_priority,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
- ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_get_pending_transfers",
- ADM_get_pending_transfers_in_t,
- ADM_get_pending_transfers_out_t, ADM_get_pending_transfers,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_set_qos_constraints", ADM_set_qos_constraints_in_t,
- ADM_set_qos_constraints_out_t, ADM_set_qos_constraints,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_get_qos_constraints", ADM_get_qos_constraints_in_t,
- ADM_get_qos_constraints_out_t, ADM_get_qos_constraints,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_define_data_operation",
- ADM_define_data_operation_in_t,
- ADM_define_data_operation_out_t, ADM_define_data_operation,
- true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_connect_data_operation",
- ADM_connect_data_operation_in_t,
- ADM_connect_data_operation_out_t,
- ADM_connect_data_operation, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_finalize_data_operation",
- ADM_finalize_data_operation_in_t,
- ADM_finalize_data_operation_out_t,
- ADM_finalize_data_operation, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_link_transfer_to_data_operation",
- ADM_link_transfer_to_data_operation_in_t,
- ADM_link_transfer_to_data_operation_out_t,
- ADM_link_transfer_to_data_operation, true);
-
- REGISTER_RPC(m_context->m_mid, m_context->m_rpc_names,
- "ADM_get_statistics", ADM_get_statistics_in_t,
- ADM_get_statistics_out_t, ADM_get_statistics, true);
- }
-
- void
- listen() const {
-
- /* NOTE: there isn't anything else for the server to do at this point
- * except wait for itself to be shut down. The
- * margo_wait_for_finalize() call here yields to let Margo drive
- * progress until that happens.
- */
- ::margo_wait_for_finalize(m_context->m_mid);
- }
-
- void
- stop() {
- ::margo_finalize(m_context->m_mid);
-
- // It is not safe to access m_margo_context->m_mid after the
- // margo_finalize() call. Make sure that no other threads can do a
- // double margo_finalize() (e.g when calling ~engine()) by resetting
- // m_margo_context.
- m_context.reset();
- }
-
- endpoint
- lookup(const std::string& address) const;
-
- std::shared_ptr m_context;
-};
-
-struct endpoint {
-private:
- // Endpoints should only be created by calling engine::lookup()
- friend class engine;
-
- endpoint(std::shared_ptr context,
- std::shared_ptr address)
- : m_margo_context(std::move(context)), m_address(std::move(address)) {}
-
-public:
- endpoint(const endpoint& /*other*/) = default;
- endpoint&
- operator=(const endpoint& /*other*/) = default;
- endpoint(endpoint&& /*rhs*/) = default;
- endpoint&
- operator=(endpoint&& /*rhs*/) = default;
-
- template
- void
- call(const std::string& id, Args&&... args) {
-
- const auto it = m_margo_context->m_rpc_names.find(id);
-
- if(it == m_margo_context->m_rpc_names.end()) {
- throw std::runtime_error(
- fmt::format("Unknown remote procedure: {}", id));
- }
-
- hg_handle_t handle;
- auto ret = ::margo_create(m_margo_context->m_mid,
- m_address->mercury_address(), it->second,
- &handle);
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during endpoint::call(): {}",
- ::HG_Error_to_string(ret)));
- }
-
- ret = ::margo_forward(handle, nullptr);
-
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during endpoint::call(): {}",
- ::HG_Error_to_string(ret)));
- }
-
- ret = ::margo_destroy(handle);
-
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during endpoint::call(): {}",
- ::HG_Error_to_string(ret)));
- }
- }
-
- /**
- * Deprecated call, used to support Margo directly
- *
- **/
- template
- [[deprecated("It should be eventually replaced by a generic call")]] void
- call(const std::string& id, T1 input = nullptr, T2 output = nullptr) {
-
- const auto it = m_margo_context->m_rpc_names.find(id);
-
- if(it == m_margo_context->m_rpc_names.end()) {
- throw std::runtime_error(
- fmt::format("Unknown remote procedure: {}", id));
- }
-
- hg_handle_t handle;
- auto ret = ::margo_create(m_margo_context->m_mid,
- m_address->mercury_address(), it->second,
- &handle);
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during endpoint::call(): {}",
- ::HG_Error_to_string(ret)));
- }
-
- ret = ::margo_forward(handle, input);
-
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during endpoint::call(): {}",
- ::HG_Error_to_string(ret)));
- }
-
- if(output != nullptr) {
- ret = ::margo_get_output(handle, output);
- }
-
-
- ret = ::margo_destroy(handle);
-
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during endpoint::call(): {}",
- ::HG_Error_to_string(ret)));
- }
- }
-
-private:
- std::shared_ptr m_margo_context;
- std::shared_ptr m_address;
-};
-
-// now that we have the complete definition of engine and endpoint, we can
-// finally define engine::lookup completely
-inline endpoint
-engine::lookup(const std::string& address) const {
-
- hg_addr_t svr_addr;
- auto ret =
- ::margo_addr_lookup(m_context->m_mid, address.c_str(), &svr_addr);
- if(ret != HG_SUCCESS) {
- throw std::runtime_error(
- fmt::format("Error during engine::lookup(): {}",
- ::HG_Error_to_string(ret)));
- }
-
- return {m_context, std::make_shared(
- ::margo_get_class(m_context->m_mid), svr_addr)};
-}
-
-
-struct rpc_client : engine {
- explicit rpc_client(const std::string& protocol)
- : engine(protocol, execution_mode::client) {}
-};
-
-struct rpc_acceptor : engine {
-
- static std::string
- format_address(const std::string& protocol, const std::string& address,
- int port) {
- return fmt::format("{}://{}:{}", protocol, address, port);
- }
-
- rpc_acceptor(const std::string& protocol, const std::string& bind_address,
- int port)
- : engine(format_address(protocol, bind_address, port)) {}
-};
-
-
-} // namespace scord::network
-
-#endif // SCORD_NETWORK_ENGINE_HPP
diff --git a/src/scord-ctl/CMakeLists.txt b/src/scord-ctl/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..26dfc86b2fac4342ebd342c0736c427b141b8e8d
--- /dev/null
+++ b/src/scord-ctl/CMakeLists.txt
@@ -0,0 +1,42 @@
+################################################################################
+# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+# import rpc definitions for scord-ctl
+add_subdirectory(rpcs)
+
+# scord-ctl daemon
+add_executable(scord-ctl scord-ctl.cpp)
+
+target_include_directories(
+ scord-ctl
+ PUBLIC ${CMAKE_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_BINARY_DIR}/src ${CMAKE_CURRENT_BINARY_DIR}
+)
+
+target_link_libraries(
+ scord-ctl PRIVATE common::config common::logger common::network::rpc_server
+ scord_ctl_private_rpcs fmt::fmt Boost::program_options
+)
+
+install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/scord-ctl/rpcs/CMakeLists.txt b/src/scord-ctl/rpcs/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..ae011dd902d58b27184e3eae3df59649c7c09742
--- /dev/null
+++ b/src/scord-ctl/rpcs/CMakeLists.txt
@@ -0,0 +1,31 @@
+################################################################################
+# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+add_library(scord_private_rpcs STATIC)
+target_sources(scord_private_rpcs
+ PRIVATE private.cpp private.hpp)
+
+target_include_directories(scord_private_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+target_link_libraries(scord_private_rpcs PUBLIC common::logger Margo::Margo)
+set_property(TARGET scord_private_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/src/scord-ctl/rpcs/private.cpp b/src/scord-ctl/rpcs/private.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..d472d44ec0dab2230738f7799449a82e628c1411
--- /dev/null
+++ b/src/scord-ctl/rpcs/private.cpp
@@ -0,0 +1,41 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include "private.hpp"
+
+static void
+ADM_ping(hg_handle_t h) {
+
+ hg_return_t ret;
+
+ [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);
+
+ LOGGER_INFO("PING(noargs)");
+
+ ret = margo_destroy(h);
+ assert(ret == HG_SUCCESS);
+}
+
+DEFINE_MARGO_RPC_HANDLER(ADM_ping);
diff --git a/src/scord-ctl/rpcs/private.hpp b/src/scord-ctl/rpcs/private.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..6429eade640b78d47aaa91700c55359616102aaf
--- /dev/null
+++ b/src/scord-ctl/rpcs/private.hpp
@@ -0,0 +1,32 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#ifndef SCORD_CTL_RPCS_PRIVATE_HPP
+#define SCORD_CTL_RPCS_PRIVATE_HPP
+
+#include
+
+DECLARE_MARGO_RPC_HANDLER(ADM_ping);
+
+#endif // SCORD_CTL_RPCS_PRIVATE_HPP
diff --git a/src/main.cpp b/src/scord-ctl/scord-ctl.cpp
similarity index 93%
rename from src/main.cpp
rename to src/scord-ctl/scord-ctl.cpp
index 8686cd3785af20360ea83bd1ef0bf9e84019db32..b99caff9aff2180422a09250cddf095eabf694aa 100644
--- a/src/main.cpp
+++ b/src/scord-ctl/scord-ctl.cpp
@@ -32,8 +32,10 @@
#include
#include
-#include
+#include
#include
+#include
+#include "rpcs/private.hpp"
namespace fs = std::filesystem;
namespace bpo = boost::program_options;
@@ -150,7 +152,15 @@ main(int argc, char* argv[]) {
try {
scord::server daemon;
- daemon.configure(cfg);
+ const auto rpc_registration_cb = [](auto&& ctx) {
+ LOGGER_INFO(" * Registering RPCs handlers...");
+
+ REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false);
+
+ // TODO: add internal RPCs for communication with scord
+ };
+
+ daemon.configure(cfg, rpc_registration_cb);
return daemon.run();
} catch(const std::exception& ex) {
fmt::print(stderr,
diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..b7d7d6d134332229c77aa71e931696ac24c4f068
--- /dev/null
+++ b/src/scord/CMakeLists.txt
@@ -0,0 +1,72 @@
+################################################################################
+# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+################################################################################
+# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+# import rpc definitions for scord
+add_subdirectory(rpcs)
+
+# scord daemon
+add_executable(scord scord.cpp)
+
+target_include_directories(
+ scord
+ PUBLIC ${CMAKE_SOURCE_DIR}/src ${CMAKE_CURRENT_SOURCE_DIR}
+ PRIVATE ${CMAKE_BINARY_DIR}/src ${CMAKE_CURRENT_BINARY_DIR}
+)
+
+target_link_libraries(
+ scord
+ PRIVATE common::config
+ common::logger
+ common::network::rpc_server
+ api_rpcs
+ scord_private_rpcs
+ fmt::fmt
+ Boost::program_options
+)
+
+install(TARGETS scord DESTINATION ${CMAKE_INSTALL_BINDIR})
diff --git a/src/scord/rpcs/CMakeLists.txt b/src/scord/rpcs/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..38b0acf513f7b63de7ec76dc2e59d86999062793
--- /dev/null
+++ b/src/scord/rpcs/CMakeLists.txt
@@ -0,0 +1,31 @@
+################################################################################
+# Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+add_library(scord_ctl_private_rpcs STATIC)
+target_sources(scord_ctl_private_rpcs
+ PRIVATE private.cpp private.hpp)
+
+target_include_directories(scord_ctl_private_rpcs INTERFACE ${CMAKE_CURRENT_SOURCE_DIR})
+target_link_libraries(scord_ctl_private_rpcs PUBLIC common::logger Margo::Margo)
+set_property(TARGET scord_ctl_private_rpcs PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/src/scord/rpcs/private.cpp b/src/scord/rpcs/private.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..d472d44ec0dab2230738f7799449a82e628c1411
--- /dev/null
+++ b/src/scord/rpcs/private.cpp
@@ -0,0 +1,41 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include "private.hpp"
+
+static void
+ADM_ping(hg_handle_t h) {
+
+ hg_return_t ret;
+
+ [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);
+
+ LOGGER_INFO("PING(noargs)");
+
+ ret = margo_destroy(h);
+ assert(ret == HG_SUCCESS);
+}
+
+DEFINE_MARGO_RPC_HANDLER(ADM_ping);
diff --git a/src/scord/rpcs/private.hpp b/src/scord/rpcs/private.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..3af7df82cfa03fdc9c391de65758c7c53068b953
--- /dev/null
+++ b/src/scord/rpcs/private.hpp
@@ -0,0 +1,32 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#ifndef SCORD_RPCS_PRIVATE_HPP
+#define SCORD_RPCS_PRIVATE_HPP
+
+#include
+
+DECLARE_MARGO_RPC_HANDLER(ADM_ping);
+
+#endif // SCORD_RPCS_PRIVATE_HPP
diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..b1430ff99f678572c7f5a49a434a0627e448e52a
--- /dev/null
+++ b/src/scord/scord.cpp
@@ -0,0 +1,277 @@
+/******************************************************************************
+ * Copyright 2021, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include "rpcs/private.hpp"
+
+namespace fs = std::filesystem;
+namespace bpo = boost::program_options;
+
+void
+print_version(const std::string& progname) {
+ fmt::print("{} {}\n", progname, scord::version_string);
+}
+
+void
+print_help(const std::string& progname,
+ const bpo::options_description& opt_desc) {
+ fmt::print("Usage: {} [options]\n\n", progname);
+ fmt::print("{}", opt_desc);
+}
+
+
+int
+main(int argc, char* argv[]) {
+
+ scord::config::settings cfg;
+
+ // define the command line options allowed
+ bpo::options_description opt_desc("Options");
+ opt_desc.add_options()
+ // run in foreground
+ (",f",
+ bpo::bool_switch()->default_value(false)->notifier(
+ [&](const bool& flag_value) {
+ cfg.daemonize(!flag_value);
+ }),
+ "foreground operation")
+
+ // force logging messages to the console
+ ("force-console,C",
+ bpo::value()
+ ->implicit_value("")
+ ->zero_tokens()
+ ->notifier([&](const std::string&) {
+ cfg.use_console(true);
+ }),
+ "override any logging options defined in configuration files and "
+ "send all daemon output to the console")
+
+ // use provided configuration file instead of the system-wide
+ // configuration file defined when building the daemon
+ ("config-file,c",
+ bpo::value()
+ ->value_name("FILENAME")
+ ->implicit_value("")
+ ->notifier([&](const std::string& filename) {
+ cfg.config_file(filename);
+ }),
+ "override the system-wide configuration file with FILENAME")
+
+ // print the daemon version
+ ("version,v",
+ bpo::value()->implicit_value("")->zero_tokens(),
+ "print version string")
+
+ // print help
+ ("help,h",
+ bpo::value()->implicit_value("")->zero_tokens(),
+ "produce help message");
+
+ // parse the command line
+ bpo::variables_map vm;
+
+ try {
+ bpo::store(bpo::parse_command_line(argc, argv, opt_desc), vm);
+
+ // the --help and --version arguments are special, since we want
+ // to process them even if the global configuration file doesn't exist
+ if(vm.count("help")) {
+ print_help(cfg.progname(), opt_desc);
+ return EXIT_SUCCESS;
+ }
+
+ if(vm.count("version")) {
+ print_version(cfg.progname());
+ return EXIT_SUCCESS;
+ }
+
+ const fs::path config_file = (vm.count("config-file") == 0)
+ ? cfg.config_file()
+ : vm["config-file"].as();
+
+ if(!fs::exists(config_file)) {
+ fmt::print(stderr,
+ "Failed to access daemon configuration file {}\n",
+ config_file);
+ return EXIT_FAILURE;
+ }
+
+ try {
+ cfg.load_from_file(config_file);
+ } catch(const std::exception& ex) {
+ fmt::print(stderr,
+ "Failed reading daemon configuration file:\n"
+ " {}\n",
+ ex.what());
+ return EXIT_FAILURE;
+ }
+
+ // calling notify() here basically invokes all define notifiers, thus
+ // overriding any configuration loaded from the global configuration
+ // file with its command-line counterparts if provided (for those
+ // options where this is available)
+ bpo::notify(vm);
+ } catch(const bpo::error& ex) {
+ fmt::print(stderr, "ERROR: {}\n\n", ex.what());
+ return EXIT_FAILURE;
+ }
+
+ try {
+ scord::server daemon;
+
+ const auto rpc_registration_cb = [](auto&& ctx) {
+ LOGGER_INFO(" * Registering RPCs handlers...");
+
+ REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false);
+ REGISTER_RPC(ctx, "ADM_input", ADM_input_in_t, ADM_input_out_t,
+ ADM_input, true);
+
+
+ REGISTER_RPC(ctx, "ADM_output", ADM_output_in_t, ADM_output_out_t,
+ ADM_output, true);
+
+ REGISTER_RPC(ctx, "ADM_inout", ADM_inout_in_t, ADM_inout_out_t,
+ ADM_inout, true);
+
+ REGISTER_RPC(ctx, "ADM_adhoc_context", ADM_adhoc_context_in_t,
+ ADM_adhoc_context_out_t, ADM_adhoc_context, true);
+
+ REGISTER_RPC(ctx, "ADM_adhoc_context_id", ADM_adhoc_context_id_in_t,
+ ADM_adhoc_context_id_out_t, ADM_adhoc_context_id,
+ true);
+
+ REGISTER_RPC(ctx, "ADM_adhoc_nodes", ADM_adhoc_nodes_in_t,
+ ADM_adhoc_nodes_out_t, ADM_adhoc_nodes, true);
+
+ REGISTER_RPC(ctx, "ADM_adhoc_walltime", ADM_adhoc_walltime_in_t,
+ ADM_adhoc_walltime_out_t, ADM_adhoc_walltime, true);
+
+ REGISTER_RPC(ctx, "ADM_adhoc_access", ADM_adhoc_access_in_t,
+ ADM_adhoc_access_out_t, ADM_adhoc_access, true);
+
+ REGISTER_RPC(
+ ctx, "ADM_adhoc_distribution", ADM_adhoc_distribution_in_t,
+ ADM_adhoc_distribution_out_t, ADM_adhoc_distribution, true);
+
+ REGISTER_RPC(ctx, "ADM_adhoc_background_flush",
+ ADM_adhoc_background_flush_in_t,
+ ADM_adhoc_background_flush_out_t,
+ ADM_adhoc_background_flush, true);
+
+ REGISTER_RPC(ctx, "ADM_in_situ_ops", ADM_in_situ_ops_in_t,
+ ADM_in_situ_ops_out_t, ADM_in_situ_ops, true);
+
+ REGISTER_RPC(ctx, "ADM_in_transit_ops", ADM_in_transit_ops_in_t,
+ ADM_in_transit_ops_out_t, ADM_in_transit_ops, true);
+
+ REGISTER_RPC(ctx, "ADM_transfer_dataset", ADM_transfer_dataset_in_t,
+ ADM_transfer_dataset_out_t, ADM_transfer_dataset,
+ true);
+
+ REGISTER_RPC(ctx, "ADM_set_dataset_information",
+ ADM_set_dataset_information_in_t,
+ ADM_set_dataset_information_out_t,
+ ADM_set_dataset_information, true);
+
+ REGISTER_RPC(ctx, "ADM_set_io_resources", ADM_set_io_resources_in_t,
+ ADM_set_io_resources_out_t, ADM_set_io_resources,
+ true);
+
+ REGISTER_RPC(ctx, "ADM_get_transfer_priority",
+ ADM_get_transfer_priority_in_t,
+ ADM_get_transfer_priority_out_t,
+ ADM_get_transfer_priority, true);
+
+ REGISTER_RPC(ctx, "ADM_set_transfer_priority",
+ ADM_set_transfer_priority_in_t,
+ ADM_set_transfer_priority_out_t,
+ ADM_set_transfer_priority, true);
+
+ REGISTER_RPC(ctx, "ADM_cancel_transfer", ADM_cancel_transfer_in_t,
+ ADM_cancel_transfer_out_t, ADM_cancel_transfer, true);
+
+ REGISTER_RPC(ctx, "ADM_get_pending_transfers",
+ ADM_get_pending_transfers_in_t,
+ ADM_get_pending_transfers_out_t,
+ ADM_get_pending_transfers, true);
+
+ REGISTER_RPC(ctx, "ADM_set_qos_constraints",
+ ADM_set_qos_constraints_in_t,
+ ADM_set_qos_constraints_out_t, ADM_set_qos_constraints,
+ true);
+
+ REGISTER_RPC(ctx, "ADM_get_qos_constraints",
+ ADM_get_qos_constraints_in_t,
+ ADM_get_qos_constraints_out_t, ADM_get_qos_constraints,
+ true);
+
+ REGISTER_RPC(ctx, "ADM_define_data_operation",
+ ADM_define_data_operation_in_t,
+ ADM_define_data_operation_out_t,
+ ADM_define_data_operation, true);
+
+ REGISTER_RPC(ctx, "ADM_connect_data_operation",
+ ADM_connect_data_operation_in_t,
+ ADM_connect_data_operation_out_t,
+ ADM_connect_data_operation, true);
+
+ REGISTER_RPC(ctx, "ADM_finalize_data_operation",
+ ADM_finalize_data_operation_in_t,
+ ADM_finalize_data_operation_out_t,
+ ADM_finalize_data_operation, true);
+
+ REGISTER_RPC(ctx, "ADM_link_transfer_to_data_operation",
+ ADM_link_transfer_to_data_operation_in_t,
+ ADM_link_transfer_to_data_operation_out_t,
+ ADM_link_transfer_to_data_operation, true);
+
+ REGISTER_RPC(ctx, "ADM_get_statistics", ADM_get_statistics_in_t,
+ ADM_get_statistics_out_t, ADM_get_statistics, true);
+
+ // TODO: add internal RPCs for communication with scord-ctl
+ };
+
+ daemon.configure(cfg, rpc_registration_cb);
+ return daemon.run();
+ } catch(const std::exception& ex) {
+ fmt::print(stderr,
+ "An unhandled exception reached the top of main(), "
+ "{} will exit:\n what(): {}\n",
+ cfg.progname(), ex.what());
+ return EXIT_FAILURE;
+ }
+}