From 421c5ed1678e3c898f3295a8565ba19f042cc94f Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Tue, 6 Jun 2023 12:46:53 +0200 Subject: [PATCH 1/3] Heavy refactor to `logger` module - Move as much code as possible to `logger.cpp` - Create `logger_base` with common code - Create `logger_sync` and `logger_async` classes to allow creating synchronous and asynchronous loggers as needed. - Replace all mentions to `global_logger` by `default_logger` --- CMakeLists.txt | 2 +- src/common/logger/logger.cpp | 145 ++++++++++++++++++- src/common/logger/logger.hpp | 262 ++++++++++++++++++++--------------- src/common/logger/macros.h | 44 +++--- src/common/net/server.cpp | 32 +---- src/common/net/server.hpp | 6 +- src/lib/libscord.cpp | 17 +-- 7 files changed, 323 insertions(+), 185 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 95aa757e..c7fa1f14 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -299,7 +299,7 @@ find_package(RedisPlusPlus 1.3.3 REQUIRED) # set compile flags add_compile_options("-Wall" "-Wextra" "-Werror" "$<$:-O3>") add_compile_definitions("$<$:SCORD_DEBUG_BUILD>") -add_compile_definitions("$<$:__LOGGER_ENABLE_DEBUG__>") +add_compile_definitions("$<$:LOGGER_ENABLE_DEBUG>") add_subdirectory(etc) add_subdirectory(src) diff --git a/src/common/logger/logger.cpp b/src/common/logger/logger.cpp index 391dd300..f6171bfd 100644 --- a/src/common/logger/logger.cpp +++ b/src/common/logger/logger.cpp @@ -22,13 +22,25 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ +#include +#include +#include +#include +#include +#include +#include #include #include #include "logger.hpp" #include "logger.h" +//////////////////////////////////////////////////////////////////////////////// +// Function implementations for the C API +//////////////////////////////////////////////////////////////////////////////// + void logger_setup(const char* ident, logger_type type, const char* log_file) { + constexpr auto get_cxx_type = [](logger_type t) { switch(t) { case CONSOLE_LOGGER: @@ -43,15 +55,17 @@ logger_setup(const char* ident, logger_type type, const char* log_file) { return logger::logger_type::console; } }; - logger::create_global_logger(ident, get_cxx_type(type), log_file); + + logger::create_default_logger( + logger::logger_config{ident, get_cxx_type(type), log_file}); } void logger_log(enum logger_level level, const char* fmt, ...) { - using logger::logger; + using logger::logger_base; - if(const auto logger = logger::get_global_logger(); logger) { + if(const auto logger = logger_base::get_default_logger(); logger) { std::array msg; // NOLINT va_list args; @@ -81,9 +95,128 @@ logger_log(enum logger_level level, const char* fmt, ...) { void logger_destroy() { - using logger::logger; + using logger::logger_base; + + if(logger_base::get_default_logger()) { + ::logger::destroy_default_logger(); + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Function implementations for the C++ API +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +/** + * @brief Creates a logger of the given type. + * + * @tparam Logger Type of the logger to create. + * @param config Configuration for the logger. + * @return std::shared_ptr Pointer to the created logger. + */ +template +std::shared_ptr +create_logger(const logger::logger_config& config) + requires(std::is_same_v || + std::is_same_v) { + + const auto create_helper = [&config]() { + switch(config.type()) { + case logger::console: { + if constexpr(std::is_same_v) { + return spdlog::stdout_logger_st(config.ident()); + } + return spdlog::stdout_logger_mt( + config.ident()); + } + case logger::console_color: + if constexpr(std::is_same_v) { + return spdlog::stdout_color_st(config.ident()); + } + return spdlog::stdout_color_mt( + config.ident()); + case logger::file: + if constexpr(std::is_same_v) { + return spdlog::basic_logger_st( + config.ident(), config.log_file().value_or(""), + true); + } + return spdlog::basic_logger_mt( + config.ident(), config.log_file().value_or(""), true); + case logger::syslog: + if constexpr(std::is_same_v) { + return spdlog::syslog_logger_st("syslog", config.ident(), + LOG_PID); + } + return spdlog::syslog_logger_mt("syslog", config.ident(), + LOG_PID); + default: + throw std::invalid_argument("Unknown logger type"); + } + }; - if(logger::get_global_logger()) { - ::logger::destroy_global_logger(); + try { + auto logger = create_helper(); + assert(logger != nullptr); + logger->set_pattern(logger::default_pattern); + +#ifdef LOGGER_ENABLE_DEBUG + logger->set_level(spdlog::level::debug); +#endif + spdlog::drop_all(); + return logger; + } catch(const spdlog::spdlog_ex& ex) { + throw std::runtime_error("logger initialization failed: " + + std::string(ex.what())); } } + +} // namespace + +namespace logger { + +logger_base::logger_base(logger::logger_config config) + : m_config(std::move(config)), + m_internal_logger(::create_logger(m_config)) {} + +const logger_config& +logger_base::config() const { + return m_config; +} + +std::shared_ptr& +logger_base::get_default_logger() { + static std::shared_ptr s_global_logger; + return s_global_logger; +} + +void +logger_base::enable_debug() const { + m_internal_logger->set_level(spdlog::level::debug); +} + +void +logger_base::flush() { + m_internal_logger->flush(); +} + +async_logger::async_logger(const logger_config& config) : logger_base(config) { + try { + m_internal_logger = ::create_logger(config); + } catch(const spdlog::spdlog_ex& ex) { + throw std::runtime_error("logger initialization failed: " + + std::string(ex.what())); + } +} + +sync_logger::sync_logger(const logger_config& config) : logger_base(config) { + try { + m_internal_logger = ::create_logger(config); + } catch(const spdlog::spdlog_ex& ex) { + throw std::runtime_error("logger initialization failed: " + + std::string(ex.what())); + } +} + +} // namespace logger diff --git a/src/common/logger/logger.hpp b/src/common/logger/logger.hpp index 09611fb2..7730a821 100644 --- a/src/common/logger/logger.hpp +++ b/src/common/logger/logger.hpp @@ -25,12 +25,7 @@ #ifndef SCORD_LOGGER_HPP #define SCORD_LOGGER_HPP -#include -#include -#include -#include -#include -#include +#include #include #include #include @@ -48,8 +43,6 @@ ptr(const T* p) { } // namespace fmt #endif // FMT_VERSION -namespace fs = std::filesystem; - namespace logger { enum logger_type { @@ -65,7 +58,7 @@ public: logger_config() = default; explicit logger_config(std::string ident, logger_type type, - std::optional log_file = {}) + std::optional log_file = {}) : m_ident(std::move(ident)), m_type(type), m_log_file(std::move(log_file)) {} @@ -79,7 +72,7 @@ public: return m_type; } - const std::optional& + const std::optional& log_file() const { return m_log_file; } @@ -87,101 +80,91 @@ public: private: std::string m_ident; logger_type m_type = console_color; - std::optional m_log_file; + std::optional m_log_file; }; -class logger { + +/** + * @brief The default log pattern + * + * This is the default log pattern used by the logger. + * It can be used to create new loggers with the same + * configuration. + * + * The default log pattern is: + * + * @code + * %^[%Y-%m-%d %T.%f] [%n] [%t] [%l]%$ %v + * @endcode + * + * The output of the default log pattern is: + * + * @code + * [2021-01-01 00:00:00.000000] [scord] [12345] [info] Message + * @endcode + * + * Where: + * - 2021-01-01 00:00:00.000000 is the current date and time + * - scord is the name of the logger + * - 12345 is the thread id + * - info is the log level + * - Message is the log message + * + * The following format specifiers are available: + * %Y - Year in 4 digits + * %m - month 1-12 + * %d - day 1-31 + * %T - ISO 8601 time format (HH:MM:SS) + * %f - microsecond part of the current second + * %E - epoch (microseconds precision) + * %n - logger's name + * %t - thread id + * %l - log level + * %v - message + */ +static constexpr auto default_pattern = + "%^[%Y-%m-%d %T.%f] [%n] [%t] [%l]%$ %v"; + +/** + * @brief The logger_base class + * + * This class is a wrapper around spdlog::logger, and provides common + * functionality to all the logger implementations. It also provides a + * default logger that can be used by the rest of the code by using the + * static member function get_default_logger(). + * + * @note This class should not be used directly. It is intended to serve as a + * base class for the different logger implementations. + */ +class logger_base { + +protected: + logger_base() = default; + explicit logger_base(logger_config config); public: - logger(const std::string& ident, logger_type type, - const fs::path& log_file = "") { - - try { - - switch(type) { - case console: - m_internal_logger = - spdlog::stdout_logger_mt( - ident); - break; - case console_color: - m_internal_logger = - spdlog::stdout_color_mt( - ident); - break; - case file: - m_internal_logger = - spdlog::basic_logger_mt( - ident, log_file.string(), true); - break; - case syslog: - m_internal_logger = - spdlog::syslog_logger_mt("syslog", ident, LOG_PID); - break; - default: - throw std::invalid_argument("Unknown logger type"); - } - - assert(m_internal_logger != nullptr); - - // %Y - Year in 4 digits - // %m - month 1-12 - // %d - day 1-31 - // %T - ISO 8601 time format (HH:MM:SS) - // %f - microsecond part of the current second - // %E - epoch (microseconds precision) - // %n - logger's name - // %t - thread id - // %l - log level - // %v - message - // m_internal_logger->set_pattern("[%Y-%m-%d %T.%f] [%E] [%n] [%t] - // [%l] %v"); - m_internal_logger->set_pattern( - "%^[%Y-%m-%d %T.%f] [%n] [%t] [%l]%$ %v"); - -#ifdef __LOGGER_ENABLE_DEBUG__ - enable_debug(); -#endif - - spdlog::drop_all(); - - // globally register the logger so that it can be accessed - // using spdlog::get(logger_name) - // spdlog::register_logger(m_internal_logger); - } catch(const spdlog::spdlog_ex& ex) { - throw std::runtime_error("logger initialization failed: " + - std::string(ex.what())); - } - } + logger_base(const logger_base& /*rhs*/) = delete; + logger_base& + operator=(const logger_base& /*rhs*/) = delete; - logger(const logger& /*rhs*/) = delete; - logger& - operator=(const logger& /*rhs*/) = delete; - logger(logger&& /*other*/) = default; - logger& - operator=(logger&& /*other*/) = default; +protected: + logger_base(logger_base&& /*other*/) = default; + logger_base& + operator=(logger_base&& /*other*/) = default; + ~logger_base() = default; - ~logger() { - spdlog::shutdown(); - } +public: + const logger_config& + config() const; - static std::shared_ptr& - get_global_logger() { - static std::shared_ptr s_global_logger; - return s_global_logger; - } + static std::shared_ptr& + get_default_logger(); - // the following member functions can be used to interact - // with a specific logger instance - inline void - enable_debug() const { - m_internal_logger->set_level(spdlog::level::debug); - } + void + enable_debug() const; - inline void - flush() { - m_internal_logger->flush(); - } + void + flush(); template inline void @@ -207,7 +190,7 @@ public: m_internal_logger->error(fmt, std::forward(args)...); } - static inline std::string + [[maybe_unused]] static inline std::string errno_message(int errno_value) { // 1024 should be more than enough for most locales constexpr const std::size_t MAX_ERROR_MSG = 1024; @@ -272,7 +255,7 @@ public: } template - static inline std::string + [[deprecated]] [[maybe_unused]] static inline std::string build_message(Args&&... args) { // see: @@ -289,37 +272,88 @@ public: return ss.str(); } -private: +protected: + logger_config m_config; std::shared_ptr m_internal_logger; - std::string m_type; +}; + +/** + * @brief Synchronous logger implementation + * + * This class is a wrapper around spdlog::logger. It provides + * a synchronous interface to the spdlog logger. + */ +class sync_logger : public logger_base { +public: + explicit sync_logger(const logger_config& config); +}; + +/** + * @brief Asynchronous logger implementation + * + * This class is a wrapper around spdlog::async_logger. It + * provides an asynchronous interface to the spdlog logger. + */ +class async_logger : public logger_base { +public: + explicit async_logger(const logger_config& config); }; // the following static functions can be used to interact -// with a globally registered logger instance +// with a globally registered async logger instance +/** + * @brief Create a default logger instance + * + * @tparam Args variadic template parameter pack for the logger constructor + * arguments + * @param args arguments for the logger constructor + */ template static inline void -create_global_logger(Args&&... args) { - logger::get_global_logger() = std::make_shared(args...); +create_default_logger(Args&&... args) { + async_logger::get_default_logger() = + std::make_shared(args...); } -static inline void -register_global_logger(logger&& lg) { - logger::get_global_logger() = std::make_shared(std::move(lg)); +/** + * @brief Register an existing logger instance as the default logger + * + * @param config logger configuration + */ +[[maybe_unused]] static inline void +set_default_logger(async_logger&& lg) { + async_logger::get_default_logger() = + std::make_shared(std::move(lg)); } -static inline void -destroy_global_logger() { - logger::get_global_logger().reset(); +/** + * @brief Destroy the default logger instance + */ +[[maybe_unused]] static inline void +destroy_default_logger() { + async_logger::get_default_logger().reset(); } -static inline void -flush_global_logger() { - if(logger::get_global_logger()) { - logger::get_global_logger()->flush(); - } +/** + * @brief Get the default logger instance + * + * @return A shared pointer to the default logger instance + */ +[[maybe_unused]] static inline auto +get_default_logger() { + return async_logger::get_default_logger(); } +/** + * @brief Flush the default logger instance + */ +[[maybe_unused]] static inline void +flush_default_logger() { + if(auto lg = async_logger::get_default_logger(); lg) { + lg->flush(); + } +} } // namespace logger diff --git a/src/common/logger/macros.h b/src/common/logger/macros.h index 1da2c6d3..3871761b 100644 --- a/src/common/logger/macros.h +++ b/src/common/logger/macros.h @@ -31,32 +31,29 @@ #define LOGGER_INFO(...) \ do { \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->info(__VA_ARGS__); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->info(__VA_ARGS__); \ } \ } while(0); -#ifdef __LOGGER_ENABLE_DEBUG__ +#ifdef LOGGER_ENABLE_DEBUG #define LOGGER_DEBUG(...) \ do { \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->debug(__VA_ARGS__); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->debug(__VA_ARGS__); \ } \ } while(0); #define LOGGER_FLUSH() \ do { \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->flush(); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->flush(); \ } \ } while(0); -#else // ! __LOGGER_ENABLE_DEBUG__ +#else // ! LOGGER_ENABLE_DEBUG #define LOGGER_DEBUG(...) \ do { \ @@ -65,38 +62,33 @@ do { \ } while(0); -#endif // __LOGGER_ENABLE_DEBUG__ +#endif // LOGGER_ENABLE_DEBUG #define LOGGER_WARN(...) \ do { \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->warn(__VA_ARGS__); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->warn(__VA_ARGS__); \ } \ } while(0); #define LOGGER_ERROR(...) \ do { \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->error(__VA_ARGS__); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->error(__VA_ARGS__); \ } \ } while(0); #define LOGGER_ERRNO(...) \ do { \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->error_errno(__VA_ARGS__); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->error_errno(__VA_ARGS__); \ } \ } while(0); #define LOGGER_CRITICAL(...) \ do { \ - using logger::logger; \ - using logger::logger; \ - if(logger::get_global_logger()) { \ - logger::get_global_logger()->critical(__VA_ARGS__); \ + if(logger::get_default_logger()) { \ + logger::get_default_logger()->critical(__VA_ARGS__); \ } \ } while(0); @@ -136,7 +128,7 @@ #define LOGGER_INFO(fmt, ...) LOGGER_LOG(info, fmt, ##__VA_ARGS__); -#ifdef __LOGGER_ENABLE_DEBUG__ +#ifdef LOGGER_ENABLE_DEBUG #define LOGGER_DEBUG(fmt, ...) LOGGER_LOG(debug, fmt, ##__VA_ARGS__); #endif diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp index c7261744..27ddbf24 100644 --- a/src/common/net/server.cpp +++ b/src/common/net/server.cpp @@ -47,7 +47,7 @@ using namespace std::literals; namespace network { server::server(std::string name, std::string address, bool daemonize, - fs::path rundir) + std::filesystem::path rundir) : m_name(std::move(name)), m_address(std::move(address)), m_daemonize(daemonize), m_rundir(std::move(rundir)), m_pidfile(daemonize ? std::make_optional(m_rundir / (m_name + ".pid")) @@ -88,7 +88,7 @@ server::daemonize() { // this (and since we want to be able to output messages from all // processes), we destroy it now and recreate it post-fork() both in the // parent process and in the child. - logger::destroy_global_logger(); + logger::destroy_default_logger(); /* Fork off the parent process */ m_signal_listener.notify_fork(fork_event::fork_prepare); @@ -213,7 +213,7 @@ server::signal_handler(int signum) { case SIGHUP: LOGGER_WARN("A signal (SIGHUP) occurred."); - logger::flush_global_logger(); + logger::flush_default_logger(); break; default: @@ -223,29 +223,7 @@ server::signal_handler(int signum) { void server::init_logger() { - - switch(m_logger_config.type()) { - case logger::logger_type::console_color: - logger::create_global_logger(m_logger_config.ident(), - logger::logger_type::console_color); - break; - case logger::logger_type::syslog: - logger::create_global_logger(m_logger_config.ident(), - logger::logger_type::syslog); - break; - case logger::logger_type::file: - if(m_logger_config.log_file().has_value()) { - logger::create_global_logger(m_logger_config.ident(), - logger::logger_type::file, - *m_logger_config.log_file()); - break; - } - [[fallthrough]]; - case logger::logger_type::console: - logger::create_global_logger(m_logger_config.ident(), - logger::logger_type::console); - break; - } + logger::create_default_logger(m_logger_config); } void @@ -382,7 +360,7 @@ server::teardown() { LOGGER_INFO("* Removing pidfile..."); std::error_code ec; - fs::remove(*m_pidfile, ec); + std::filesystem::remove(*m_pidfile, ec); if(ec) { LOGGER_ERROR("Failed to remove pidfile {}: {}", *m_pidfile, diff --git a/src/common/net/server.hpp b/src/common/net/server.hpp index 6b7787d9..7f577469 100644 --- a/src/common/net/server.hpp +++ b/src/common/net/server.hpp @@ -46,7 +46,7 @@ class server { public: server(std::string name, std::string address, bool daemonize, - fs::path rundir); + std::filesystem::path rundir); ~server(); @@ -103,8 +103,8 @@ private: std::string m_name; std::string m_address; bool m_daemonize; - fs::path m_rundir; - std::optional m_pidfile; + std::filesystem::path m_rundir; + std::optional m_pidfile; logger::logger_config m_logger_config; protected: diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index 909c270c..cb62cf06 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -48,17 +48,18 @@ void init_logger() { try { - - if(const auto p = std::getenv(scord::env::LOG); p && !std::string{p}.empty() && std::string{p} != "0") { - if(const auto log_file = std::getenv(scord::env::LOG_OUTPUT)) { - logger::create_global_logger( - "libscord", logger::logger_type::file, log_file); - } else { - logger::create_global_logger( - "libscord", logger::logger_type::console_color); + + if(const auto log_file = std::getenv(scord::env::LOG_OUTPUT); + log_file) { + logger::create_default_logger(logger::logger_config{ + "libscord", logger::logger_type::file, log_file}); + return; } + + logger::create_default_logger(logger::logger_config{ + "libscord", logger::logger_type::console_color}); } } catch(const std::exception& ex) { std::cerr << fmt::format("WARNING: Error initializing logger: {}", -- GitLab From 9f4134ac5faca2e92f26b8fbb367522243c40f8e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 31 May 2023 11:28:35 +0200 Subject: [PATCH 2/3] Update `ADM_deploy_adhoc_storage` RPC - scord-ctl: The new implementation for the RPC handler makes use of the new configuration files to construct the command line for deployment. The RPC handler will now reply with the generated path for the adhoc storage system (`adhoc_dir`) in addition to an appropriate error code. - scord: The RPC handler has been refactored and will now propagate `adhoc_dir` to the client API. - API: The newly generated `adhoc_dir` is propagated to client code. - examples: Updated to account for the new `adhoc_storage_path` argument in the APIs. foo --- examples/c/ADM_deploy_adhoc_storage.c | 11 +- examples/c/ADM_tear_down_adhoc_storage.c | 5 +- examples/cxx/ADM_deploy_adhoc_storage.cpp | 5 +- src/common/net/request.hpp | 5 + src/lib/c_wrapper.cpp | 25 ++++- src/lib/detail/impl.cpp | 21 ++-- src/lib/detail/impl.hpp | 2 +- src/lib/errors.c | 7 +- src/lib/libscord.cpp | 16 +-- src/lib/scord/scord.h | 10 +- src/lib/scord/scord.hpp | 2 +- src/lib/scord/types.h | 7 +- src/lib/scord/types.hpp | 63 ++++++----- src/scord-ctl/command.cpp | 103 +++++++++++++++++ src/scord-ctl/command.hpp | 5 + src/scord-ctl/rpc_server.cpp | 130 +++++++++++----------- src/scord-ctl/rpc_server.hpp | 3 +- src/scord/adhoc_storage_manager.hpp | 36 ++++++ src/scord/internal_types.cpp | 12 +- src/scord/internal_types.hpp | 8 +- src/scord/rpc_server.cpp | 87 +++++++++------ 21 files changed, 398 insertions(+), 165 deletions(-) diff --git a/examples/c/ADM_deploy_adhoc_storage.c b/examples/c/ADM_deploy_adhoc_storage.c index 4d6d91dd..b5317eb1 100644 --- a/examples/c/ADM_deploy_adhoc_storage.c +++ b/examples/c/ADM_deploy_adhoc_storage.c @@ -57,6 +57,7 @@ main(int argc, char* argv[]) { ADM_adhoc_context_t adhoc_ctx = NULL; ADM_adhoc_context_t new_adhoc_ctx = NULL; ADM_adhoc_storage_t adhoc_storage = NULL; + char* adhoc_storage_path = NULL; // Let's prepare all the information required by the API calls. @@ -101,9 +102,9 @@ main(int argc, char* argv[]) { } // 2. Register the adhoc storage - if(ADM_register_adhoc_storage( - server, adhoc_name, ADM_ADHOC_STORAGE_DATACLAY, adhoc_ctx, - adhoc_resources, &adhoc_storage) != ADM_SUCCESS) { + if(ADM_register_adhoc_storage(server, adhoc_name, ADM_ADHOC_STORAGE_GEKKOFS, + adhoc_ctx, adhoc_resources, + &adhoc_storage) != ADM_SUCCESS) { fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n", ADM_strerror(ret)); goto cleanup; @@ -124,7 +125,8 @@ main(int argc, char* argv[]) { } // We can now request the deployment to the server - if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) { + if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage, + &adhoc_storage_path)) != ADM_SUCCESS) { fprintf(stderr, "ADM_deploy_adhoc_storage() failed: %s\n", ADM_strerror(ret)); goto cleanup; @@ -144,6 +146,7 @@ main(int argc, char* argv[]) { cleanup: + free(adhoc_storage_path); ADM_server_destroy(server); ADM_adhoc_context_destroy(new_adhoc_ctx); ADM_adhoc_context_destroy(adhoc_ctx); diff --git a/examples/c/ADM_tear_down_adhoc_storage.c b/examples/c/ADM_tear_down_adhoc_storage.c index 996d03e1..d18b99f4 100644 --- a/examples/c/ADM_tear_down_adhoc_storage.c +++ b/examples/c/ADM_tear_down_adhoc_storage.c @@ -57,6 +57,7 @@ main(int argc, char* argv[]) { ADM_adhoc_context_t adhoc_ctx = NULL; ADM_adhoc_context_t new_adhoc_ctx = NULL; ADM_adhoc_storage_t adhoc_storage = NULL; + char* adhoc_storage_path = NULL; // Let's prepare all the information required by the API calls. @@ -124,7 +125,8 @@ main(int argc, char* argv[]) { } // We can now request the deployment to the server - if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) { + if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage, + &adhoc_storage_path)) != ADM_SUCCESS) { fprintf(stderr, "ADM_deploy_adhoc_storage() failed: %s\n", ADM_strerror(ret)); goto cleanup; @@ -152,6 +154,7 @@ main(int argc, char* argv[]) { cleanup: + free(adhoc_storage_path); ADM_server_destroy(server); ADM_adhoc_context_destroy(new_adhoc_ctx); ADM_adhoc_context_destroy(adhoc_ctx); diff --git a/examples/cxx/ADM_deploy_adhoc_storage.cpp b/examples/cxx/ADM_deploy_adhoc_storage.cpp index ac37c49f..93030a27 100644 --- a/examples/cxx/ADM_deploy_adhoc_storage.cpp +++ b/examples/cxx/ADM_deploy_adhoc_storage.cpp @@ -57,14 +57,15 @@ main(int argc, char* argv[]) { try { const auto adhoc_storage = scord::register_adhoc_storage( - server, name, scord::adhoc_storage::type::dataclay, + server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); fmt::print(stdout, "ADM_register_adhoc_storage() remote procedure completed " "successfully\n"); - scord::deploy_adhoc_storage(server, adhoc_storage); + [[maybe_unused]] const auto adhoc_storage_path = + scord::deploy_adhoc_storage(server, adhoc_storage); } catch(const std::exception& e) { fmt::print(stderr, diff --git a/src/common/net/request.hpp b/src/common/net/request.hpp index 3154db26..109e34f2 100644 --- a/src/common/net/request.hpp +++ b/src/common/net/request.hpp @@ -83,6 +83,11 @@ public: return m_value.value(); } + constexpr auto + value_or(Value&& default_value) const noexcept { + return m_value.value_or(std::move(default_value)); + } + constexpr auto has_value() const noexcept { return m_value.has_value(); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index e383a86e..c830be21 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -152,13 +152,32 @@ ADM_remove_adhoc_storage(ADM_server_t server, } ADM_return_t -ADM_deploy_adhoc_storage(ADM_server_t server, - ADM_adhoc_storage_t adhoc_storage) { +ADM_deploy_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage, + char** adhoc_storage_path) { const scord::server srv{server}; - return scord::detail::deploy_adhoc_storage( + const auto rv = scord::detail::deploy_adhoc_storage( srv, scord::adhoc_storage{adhoc_storage}); + + if(!rv) { + *adhoc_storage_path = nullptr; + return rv.error(); + } + + const auto s = rv.value().string(); + char* buf = static_cast(std::malloc(s.size() + 1)); + + if(!buf) { + *adhoc_storage_path = nullptr; + return ADM_ENOMEM; + } + + s.copy(buf, s.size()); + buf[s.size()] = '\0'; + *adhoc_storage_path = buf; + + return ADM_SUCCESS; } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 4ffb890d..ad1be8ff 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -381,9 +381,11 @@ remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage) { return scord::error_code::other; } -scord::error_code +tl::expected deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { + using response_type = network::response_with_value; + network::client rpc_client{srv.protocol()}; const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); @@ -397,18 +399,23 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { if(const auto& call_rv = endp.call(rpc.name(), adhoc_storage.id()); call_rv.has_value()) { - const network::generic_response resp{call_rv.value()}; + const response_type resp{call_rv.value()}; - LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, - resp.error_code(), resp.op_id()); + LOGGER_EVAL( + resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}, adhoc_dir: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); - return resp.error_code(); + if(!resp.error_code()) { + return tl::make_unexpected(resp.error_code()); + } + + return resp.value(); } } LOGGER_ERROR("rpc call failed"); - return scord::error_code::other; + return tl::make_unexpected(scord::error_code::other); } scord::error_code diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index d8753908..7d7f7a9f 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -59,7 +59,7 @@ update_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage, scord::error_code remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); -scord::error_code +tl::expected deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); scord::error_code diff --git a/src/lib/errors.c b/src/lib/errors.c index 5c4b2bd8..e4ca5c87 100644 --- a/src/lib/errors.c +++ b/src/lib/errors.c @@ -32,10 +32,13 @@ const char* const adm_errlist[ADM_ERR_MAX + 1] = { [ADM_EEXISTS] = "Entity already exists", [ADM_ENOENT] = "Entity does not exist", [ADM_EADHOC_BUSY] = "Adhoc storage already in use", - + [ADM_EADHOC_TYPE_UNSUPPORTED] = "Unsupported adhoc storage type", + [ADM_EADHOC_DIR_CREATE_FAILED] = + "Cannot create adhoc storage directory", + [ADM_EADHOC_DIR_EXISTS] = "Adhoc storage directory already exists", + [ADM_ESUBPROCESS_ERROR] = "Subprocess error", [ADM_EOTHER] = "Undetermined error", - /* fallback */ [ADM_ERR_MAX] = "Unknown error", diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index cb62cf06..ad892944 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -289,15 +289,15 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } } -void +std::string deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { - - const auto ec = detail::deploy_adhoc_storage(srv, adhoc_storage); - - if(!ec) { - throw std::runtime_error(fmt::format( - "ADM_deploy_adhoc_storage() error: {}", ec.message())); - } + return detail::deploy_adhoc_storage(srv, adhoc_storage) + .or_else([](auto ec) { + throw std::runtime_error(fmt::format( + "ADM_deploy_adhoc_storage() error: {}", ec.message())); + }) + .transform([](auto&& path) { return path.string(); }) + .value(); } void diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index 4733624a..0c024893 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -148,15 +148,19 @@ ADM_remove_adhoc_storage(ADM_server_t server, /** * Initiate the deployment of an adhoc storage system instance. * + * @remark The caller is responsible for freeing the path returned in + * adhoc_storage_path. + * * @param[in] server The server to which the request is directed - * @param[in] job An ADM_JOB identifying the originating job. * @param[in] adhoc_storage An ADM_STORAGE referring to the adhoc storage * instance of interest. + * @param[out] adhoc_storage_path A dynamically-allocated string that will + * be set to the path where the adhoc storage system data will be stored. * @return Returns ADM_SUCCESS if the remote procedure has completed */ ADM_return_t -ADM_deploy_adhoc_storage(ADM_server_t server, - ADM_adhoc_storage_t adhoc_storage); +ADM_deploy_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage, + char** adhoc_storage_path); /** * Tear down a previously deployed adhoc storage system instance diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index 1325b5c6..e5e07b43 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -70,7 +70,7 @@ update_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage, void remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); -void +std::string deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); void diff --git a/src/lib/scord/types.h b/src/lib/scord/types.h index badda31d..5d14e7dd 100644 --- a/src/lib/scord/types.h +++ b/src/lib/scord/types.h @@ -49,6 +49,10 @@ typedef enum { ADM_EEXISTS, ADM_ENOENT, ADM_EADHOC_BUSY, + ADM_EADHOC_TYPE_UNSUPPORTED, + ADM_EADHOC_DIR_CREATE_FAILED, + ADM_EADHOC_DIR_EXISTS, + ADM_ESUBPROCESS_ERROR, ADM_EOTHER, ADM_ERR_MAX = 512 } ADM_return_t; @@ -495,8 +499,7 @@ ADM_adhoc_resources_destroy(ADM_adhoc_resources_t res); * @return A valid ADM_ADHOC_CONTEXT if successful. NULL otherwise. */ ADM_adhoc_context_t -ADM_adhoc_context_create(const char* ctl_address, - ADM_adhoc_mode_t exec_mode, +ADM_adhoc_context_create(const char* ctl_address, ADM_adhoc_mode_t exec_mode, ADM_adhoc_access_t access_type, uint32_t walltime, bool should_flush); diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 910d4835..e9c5ac5e 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -46,6 +46,10 @@ struct error_code { static const error_code entity_exists; static const error_code no_such_entity; static const error_code adhoc_in_use; + static const error_code adhoc_type_unsupported; + static const error_code adhoc_dir_create_failed; + static const error_code adhoc_dir_exists; + static const error_code subprocess_error; static const error_code other; constexpr error_code() : m_value(ADM_SUCCESS) {} @@ -68,26 +72,27 @@ struct error_code { constexpr std::string_view name() const { + // clang-format off +#define ADM_ERROR_CASE(x) case x: return #x +#define ADM_ERROR_DEFAULT_MSG(x) default: return x + // clang-format on switch(m_value) { - case ADM_SUCCESS: - return "ADM_SUCCESS"; - case ADM_ESNAFU: - return "ADM_ESNAFU"; - case ADM_EBADARGS: - return "ADM_EBADARGS"; - case ADM_ENOMEM: - return "ADM_ENOMEM"; - case ADM_EEXISTS: - return "ADM_EEXISTS"; - case ADM_ENOENT: - return "ADM_ENOENT"; - case ADM_EADHOC_BUSY: - return "ADM_EADHOC_BUSY"; - case ADM_EOTHER: - return "ADM_EOTHER"; - default: - return "INVALID_ERROR_VALUE"; + ADM_ERROR_CASE(ADM_SUCCESS); + ADM_ERROR_CASE(ADM_ESNAFU); + ADM_ERROR_CASE(ADM_EBADARGS); + ADM_ERROR_CASE(ADM_ENOMEM); + ADM_ERROR_CASE(ADM_EEXISTS); + ADM_ERROR_CASE(ADM_ENOENT); + ADM_ERROR_CASE(ADM_EADHOC_BUSY); + ADM_ERROR_CASE(ADM_EADHOC_TYPE_UNSUPPORTED); + ADM_ERROR_CASE(ADM_EADHOC_DIR_CREATE_FAILED); + ADM_ERROR_CASE(ADM_EADHOC_DIR_EXISTS); + ADM_ERROR_CASE(ADM_ESUBPROCESS_ERROR); + ADM_ERROR_CASE(ADM_EOTHER); + ADM_ERROR_DEFAULT_MSG("INVALID_ERROR_VALUE"); } +#undef ADM_ERROR_CASE +#undef ADM_ERROR_DEFAULT_MSG } std::string_view @@ -103,14 +108,20 @@ private: ADM_return_t m_value; }; -constexpr error_code error_code::success = error_code{ADM_SUCCESS}; -constexpr error_code error_code::snafu = error_code{ADM_ESNAFU}; -constexpr error_code error_code::bad_args = error_code{ADM_EBADARGS}; -constexpr error_code error_code::out_of_memory = error_code{ADM_ENOMEM}; -constexpr error_code error_code::entity_exists = error_code{ADM_EEXISTS}; -constexpr error_code error_code::no_such_entity = error_code{ADM_ENOENT}; -constexpr error_code error_code::adhoc_in_use = error_code{ADM_EADHOC_BUSY}; -constexpr error_code error_code::other = error_code{ADM_EOTHER}; +constexpr error_code error_code::success{ADM_SUCCESS}; +constexpr error_code error_code::snafu{ADM_ESNAFU}; +constexpr error_code error_code::bad_args{ADM_EBADARGS}; +constexpr error_code error_code::out_of_memory{ADM_ENOMEM}; +constexpr error_code error_code::entity_exists{ADM_EEXISTS}; +constexpr error_code error_code::no_such_entity{ADM_ENOENT}; +constexpr error_code error_code::adhoc_in_use{ADM_EADHOC_BUSY}; +constexpr error_code error_code::adhoc_type_unsupported{ + ADM_EADHOC_TYPE_UNSUPPORTED}; +constexpr error_code error_code::adhoc_dir_create_failed{ + ADM_EADHOC_DIR_CREATE_FAILED}; +constexpr error_code error_code::adhoc_dir_exists{ADM_EADHOC_DIR_EXISTS}; +constexpr error_code error_code::subprocess_error{ADM_ESUBPROCESS_ERROR}; +constexpr error_code error_code::other{ADM_EOTHER}; using job_id = std::uint64_t; using slurm_job_id = std::uint64_t; diff --git a/src/scord-ctl/command.cpp b/src/scord-ctl/command.cpp index 55327b5b..65f282fb 100644 --- a/src/scord-ctl/command.cpp +++ b/src/scord-ctl/command.cpp @@ -1,8 +1,40 @@ #include #include #include +#include +#include +#include #include "command.hpp" + +namespace { +/** + * @brief Convert a vector of strings into a vector of C strings. + * The returned vector is null-terminated. + * + * @note The const char* stored into the resulting vector are valid only as + * long as the input vector is not modified or destroyed. The caller is + * responsible for ensuring this. + * + * @param v Vector of strings. + * + * @return Vector of C strings. + */ +[[nodiscard]] std::unique_ptr +as_char_array(const std::vector& v) { + + auto tmp = std::make_unique(v.size() + 1); + tmp[v.size()] = nullptr; + + for(const auto i : std::views::iota(0u, v.size())) { + tmp[i] = v[i].c_str(); + } + + return tmp; +} + +} // namespace + namespace scord_ctl { void @@ -118,4 +150,75 @@ command::as_vector() const { return tmp; } +void +command::exec() const { + + const auto envs = m_env.value_or(environment{}).as_vector(); + const auto args = this->as_vector(); + + const auto argv = ::as_char_array(args); + const auto envp = ::as_char_array(envs); + + switch(const auto pid = ::fork()) { + case 0: { + ::execvpe(argv[0], const_cast(argv.get()), + const_cast(envp.get())); + // We cannot use the default logger in the child process because it + // is not fork-safe, and even though we received a copy of the + // global logger, it is not valid because the child process does + // not have the same threads as the parent process. + // Instead, we create a new logger with the same configuration as + // the global logger, and use it to log the error. This new logger + // must be synchronous to avoid issues with accessing the + // (existing but invalid) internal thread pool of the global logger. + const auto msg = fmt::format("Failed to execute command: {}", + ::strerror(errno)); + if(const auto logger = logger::get_default_logger(); logger) { + auto cfg = logger->config(); + logger::sync_logger lg{cfg}; + lg.error("{}", msg); + } else { + fmt::print(stderr, "{}", msg); + } + + // We cannot call ::exit() here because it will attempt to destruct + // the global logger, and as we mentioned the global logger in + // the child process contains the same information as the global + // logger of the parent process, but without the proper threads + // (since fork() only copies the calling thread). Thus, attempting + // to call its destructor will end up hanging the process while it + // tries to join() a non-existing thread. Instead, we call ::_exit() + // which will terminate the process without calling any functions + // registered with atexit() or similar. This is safe because we know + // the process is about to be destroyed anyway... + ::_exit(EXIT_FAILURE); + } + case -1: + throw std::runtime_error{fmt::format( + "Failed to create subprocess: {}", ::strerror(errno))}; + default: { + int wstatus = 0; + do { + if(const auto retwait = ::waitpid(pid, &wstatus, 0); + retwait == -1) { + throw std::runtime_error{ + fmt::format("Failed to wait for subprocess: {}", + ::strerror(errno))}; + } + + if(!WIFEXITED(wstatus)) { + throw std::runtime_error{ + "Subprocess did not exit normally"}; + } + + if(WEXITSTATUS(wstatus) != 0) { + throw std::runtime_error{ + fmt::format("Subprocess exited with status {}", + WEXITSTATUS(wstatus))}; + } + } while(!WIFEXITED(wstatus)); + } + } +} + } // namespace scord_ctl diff --git a/src/scord-ctl/command.hpp b/src/scord-ctl/command.hpp index 9ed7f6d0..73ada0e8 100644 --- a/src/scord-ctl/command.hpp +++ b/src/scord-ctl/command.hpp @@ -162,6 +162,11 @@ public: std::vector as_vector() const; + /** + * @brief Execute the command and wait for it to finish. + */ + void + exec() const; private: std::string m_cmdline; diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index f9b3b988..a1d86555 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -22,14 +22,12 @@ * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ +#include #include #include #include #include "rpc_server.hpp" -// required for ::waitpid() -#include -#include using namespace std::literals; @@ -113,81 +111,85 @@ rpc_server::ping(const network::request& req) { void rpc_server::deploy_adhoc_storage( - const network::request& req, - const enum scord::adhoc_storage::type adhoc_type, - const scord::adhoc_storage::ctx& adhoc_ctx, + const network::request& req, const std::string& adhoc_uuid, + enum scord::adhoc_storage::type adhoc_type, const scord::adhoc_storage::resources& adhoc_resources) { - using network::generic_response; using network::get_address; + using network::response_with_value; using network::rpc_info; const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + std::optional adhoc_dir; - LOGGER_INFO("rpc {:>} body: {{type: {}, ctx: {}, resources: {}}}", rpc, - adhoc_type, adhoc_ctx, adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}, resources: {}}}", rpc, + std::quoted(adhoc_uuid), adhoc_type, adhoc_resources); auto ec = scord::error_code::success; - if(adhoc_type == scord::adhoc_storage::type::gekkofs) { - /* Number of nodes */ - const std::string nodes = - std::to_string(adhoc_resources.nodes().size()); - - /* Walltime */ - const std::string walltime = std::to_string(adhoc_ctx.walltime()); - - /* Launch script */ - switch(const auto pid = fork()) { - case 0: { - std::vector args; - args.push_back("gkfs"); - // args.push_back("-c"); - // args.push_back("gkfs.conf"); - args.push_back("-n"); - args.push_back(nodes.c_str()); - // args.push_back("-w"); - // args.push_back(walltime.c_str()); - args.push_back("--srun"); - args.push_back("start"); - args.push_back(NULL); - std::vector env; - env.push_back(NULL); - - execvpe("gkfs", const_cast(args.data()), - const_cast(env.data())); - LOGGER_INFO("ADM_deploy_adhoc_storage() script didn't execute"); - exit(EXIT_FAILURE); - break; - } - case -1: { - ec = scord::error_code::other; - LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, ec); - break; - } - default: { - int wstatus = 0; - pid_t retwait = ::waitpid(pid, &wstatus, 0); - if(retwait == -1) { - LOGGER_ERROR( - "rpc id: {} error_msg: \"Error waitpid code: {}\"", - rpc.id(), retwait); - ec = scord::error_code::other; - } else { - if(WEXITSTATUS(wstatus) != 0) { - ec = scord::error_code::other; - } else { - ec = scord::error_code::success; - } - } - break; - } + if(!m_config.has_value() || m_config->adhoc_storage_configs().empty()) { + LOGGER_WARN("No adhoc storage configurations available"); + ec = scord::error_code::snafu; + goto respond; + } + + if(const auto it = m_config->adhoc_storage_configs().find(adhoc_type); + it != m_config->adhoc_storage_configs().end()) { + const auto& adhoc_cfg = it->second; + + LOGGER_DEBUG("deploy \"{:e}\" (ID: {})", adhoc_type, adhoc_uuid); + + // 1. Create a working directory for the adhoc storage instance + adhoc_dir = adhoc_cfg.working_directory() / adhoc_uuid; + + LOGGER_DEBUG("[{}] mkdir {}", adhoc_uuid, adhoc_dir); + std::error_code err; + + if(exists(*adhoc_dir)) { + LOGGER_ERROR("[{}] Adhoc directory {} already exists", adhoc_uuid, + adhoc_cfg.working_directory()); + ec = scord::error_code::adhoc_dir_exists; + goto respond; + } + + if(!create_directories(*adhoc_dir, err)) { + LOGGER_ERROR("[{}] Failed to create adhoc directory {}: {}", + adhoc_uuid, adhoc_cfg.working_directory(), + err.message()); + ec = scord::error_code::adhoc_dir_create_failed; + goto respond; + } + + // 3. Construct the startup command for the adhoc storage instance + std::vector hostnames; + std::ranges::transform( + adhoc_resources.nodes(), std::back_inserter(hostnames), + [](const auto& node) { return node.hostname(); }); + + const auto cmd = adhoc_cfg.startup_command().eval( + adhoc_uuid, *adhoc_dir, hostnames); + + // 4. Execute the startup command + try { + LOGGER_DEBUG("[{}] exec: {}", adhoc_uuid, cmd); + cmd.exec(); + } catch(const std::exception& ex) { + LOGGER_ERROR("[{}] Failed to execute startup command: {}", + adhoc_uuid, ex.what()); + ec = scord::error_code::subprocess_error; } + } else { + LOGGER_WARN( + "Failed to find adhoc storage configuration for type '{:e}'", + adhoc_type); + ec = scord::error_code::adhoc_type_unsupported; } - const auto resp = generic_response{rpc.id(), ec}; +respond: + const auto resp = response_with_value(rpc.id(), ec, adhoc_dir); - LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}, adhoc_dir: {}}}", rpc, ec, + adhoc_dir.value_or(std::filesystem::path{})); req.respond(resp); } diff --git a/src/scord-ctl/rpc_server.hpp b/src/scord-ctl/rpc_server.hpp index e466001e..2be18c56 100644 --- a/src/scord-ctl/rpc_server.hpp +++ b/src/scord-ctl/rpc_server.hpp @@ -51,9 +51,8 @@ private: void deploy_adhoc_storage( - const network::request& req, + const network::request& req, const std::string& adhoc_uuid, enum scord::adhoc_storage::type adhoc_type, - const scord::adhoc_storage::ctx& adhoc_ctx, const scord::adhoc_storage::resources& adhoc_resources); std::optional m_config; diff --git a/src/scord/adhoc_storage_manager.hpp b/src/scord/adhoc_storage_manager.hpp index 848248b2..c504151b 100644 --- a/src/scord/adhoc_storage_manager.hpp +++ b/src/scord/adhoc_storage_manager.hpp @@ -32,9 +32,44 @@ #include #include #include +#include #include #include "internal_types.hpp" +namespace { + +[[nodiscard]] std::string +generate_adhoc_uuid(enum scord::adhoc_storage::type adhoc_type) { + + using namespace std::literals; + + /** + * @brief Generate a random string of the given length. + * + * @param length The length of the string to generate. + * + * @return A random string of the given length. + */ + const auto generate_random_string = [](int length = 32) -> std::string { + constexpr auto dice = []() { + constexpr auto chars = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"sv; + thread_local std::mt19937 generator{std::random_device{}()}; + std::uniform_int_distribution<> distribution(0, chars.length() - 1); + return chars.at(distribution(generator)); + }; + + std::string result; + result.reserve(length); + std::ranges::generate_n(std::back_inserter(result), length, dice); + return result; + }; + + return fmt::format("{:e}-{}", adhoc_type, generate_random_string()); +} + +} // namespace + namespace scord { struct adhoc_storage_manager { @@ -54,6 +89,7 @@ struct adhoc_storage_manager { it == m_adhoc_storages.end()) { const auto& [it_adhoc, inserted] = m_adhoc_storages.emplace( id, std::make_shared( + ::generate_adhoc_uuid(type), scord::adhoc_storage{type, name, id, ctx, resources})); diff --git a/src/scord/internal_types.cpp b/src/scord/internal_types.cpp index 599edae1..2a87141f 100644 --- a/src/scord/internal_types.cpp +++ b/src/scord/internal_types.cpp @@ -49,14 +49,20 @@ job_info::update(scord::job::resources resources) { m_resources = std::move(resources); } -adhoc_storage_info::adhoc_storage_info(scord::adhoc_storage adhoc_storage) - : m_adhoc_storage(std::move(adhoc_storage)) {} +adhoc_storage_info::adhoc_storage_info(std::string uuid, + scord::adhoc_storage adhoc_storage) + : m_uuid(std::move(uuid)), m_adhoc_storage(std::move(adhoc_storage)) {} -scord::adhoc_storage +scord::adhoc_storage const& adhoc_storage_info::adhoc_storage() const { return m_adhoc_storage; } +std::string const& +adhoc_storage_info::uuid() const { + return m_uuid; +} + void adhoc_storage_info::update(scord::adhoc_storage::resources new_resources) { m_adhoc_storage.update(std::move(new_resources)); diff --git a/src/scord/internal_types.hpp b/src/scord/internal_types.hpp index f6b3948f..f141a81e 100644 --- a/src/scord/internal_types.hpp +++ b/src/scord/internal_types.hpp @@ -58,11 +58,14 @@ struct job_info { struct adhoc_storage_info { - explicit adhoc_storage_info(scord::adhoc_storage adhoc_storage); + adhoc_storage_info(std::string uuid, scord::adhoc_storage adhoc_storage); - scord::adhoc_storage + scord::adhoc_storage const& adhoc_storage() const; + std::string const& + uuid() const; + void update(scord::adhoc_storage::resources new_resources); @@ -75,6 +78,7 @@ struct adhoc_storage_info { std::shared_ptr client_info() const; + std::string m_uuid; scord::adhoc_storage m_adhoc_storage; std::shared_ptr m_client_info; mutable scord::abt::shared_mutex m_info_mutex; diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index fd63ea01..37c25d4d 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -292,59 +292,78 @@ void rpc_server::deploy_adhoc_storage(const network::request& req, std::uint64_t adhoc_id) { - using network::generic_response; using network::get_address; + using network::response_with_value; using network::rpc_info; + using response_type = response_with_value; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{adhoc_id: {}}}", rpc, adhoc_id); - auto ec = scord::error_code::success; - - // contact the adhoc controller and ask it to deploy the adhoc storage - if(const auto am_result = m_adhoc_manager.find(adhoc_id); - am_result.has_value()) { - const auto& adhoc_info = am_result.value(); + /** + * @brief Helper lambda to contact the adhoc controller and prompt it to + * deploy an adhoc storage instance + * @param adhoc_storage The relevant `adhoc_storage` object with + * information about the instance to deploy. + * @return + */ + const auto deploy_helper = [&](const auto& adhoc_info) + -> tl::expected { + assert(adhoc_info); const auto adhoc_storage = adhoc_info->adhoc_storage(); + const auto endp = lookup(adhoc_storage.context().controller_address()); + + if(!endp) { + LOGGER_ERROR("endpoint lookup failed"); + return tl::make_unexpected(error_code::snafu); + } - if(const auto lookup_rv = - lookup(adhoc_storage.context().controller_address()); - lookup_rv.has_value()) { - const auto& endp = lookup_rv.value(); + const auto child_rpc = + rpc.add_child(adhoc_storage.context().controller_address()); - const auto child_rpc = - rpc.add_child(adhoc_storage.context().controller_address()); + LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}, resources: {}}}", + child_rpc, std::quoted(adhoc_info->uuid()), + adhoc_storage.type(), adhoc_storage.get_resources()); - LOGGER_INFO("rpc {:<} body: {{type: {}, ctx: {}, resources: {}}}", - child_rpc, adhoc_storage.type(), - adhoc_storage.context(), adhoc_storage.get_resources()); + if(const auto call_rv = endp->call(rpc.name(), adhoc_info->uuid(), + adhoc_storage.type(), + adhoc_storage.get_resources()); + call_rv.has_value()) { - if(const auto call_rv = endp.call(rpc.name(), adhoc_storage.type(), - adhoc_storage.context(), - adhoc_storage.get_resources()); - call_rv.has_value()) { + const response_type resp{call_rv.value()}; - const network::generic_response resp{call_rv.value()}; - ec = resp.error_code(); + LOGGER_EVAL( + resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}, adhoc_dir: {}}} [op_id: {}]", + child_rpc, resp.error_code(), resp.value_or({}), + resp.op_id()); - LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc {:>} body: {{retval: {}}} [op_id: {}]", - child_rpc, ec, resp.op_id()); - } else { - ec = error_code::snafu; - LOGGER_ERROR("rpc call failed"); + if(const auto ec = resp.error_code(); !ec) { + return tl::make_unexpected(ec); } + return resp.value(); } - } else { - ec = am_result.error(); - LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, ec); - } - const auto resp = generic_response{rpc.id(), ec}; + LOGGER_ERROR("rpc call failed"); + return tl::make_unexpected(error_code::snafu); + }; - LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); + const auto rv = + m_adhoc_manager.find(adhoc_id) + .or_else([](auto&&) { + LOGGER_ERROR("adhoc storage instance not found"); + }) + .and_then(deploy_helper); + + const response_type resp{rpc.id(), + rv.has_value() ? error_code::success : rv.error(), + rv.value_or(std::filesystem::path{})}; + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:<} body: {{retval: {}, adhoc_dir: {}}}", rpc, + resp.error_code(), resp.value()); req.respond(resp); } -- GitLab From 74025603a83a3b6c8add1684548cbe4721c805e1 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Thu, 8 Jun 2023 14:02:14 +0200 Subject: [PATCH 3/3] Implment `ADM_terminate_adhoc_storage` RPC - Rename `ADM_tear_down_adhoc_storage` RPC and related functions to `ADM_terminate_adhoc_storage`. - scord-ctl: The new implementation for the RPC handler makes use of the new configuration files to construct the command line for termination. --- ci/check_rpcs.py | 2 +- ...torage.c => ADM_terminate_adhoc_storage.c} | 10 ++-- examples/c/CMakeLists.txt | 2 +- ...ge.cpp => ADM_terminate_adhoc_storage.cpp} | 4 +- examples/cxx/CMakeLists.txt | 2 +- src/common/net/request.hpp | 9 +++ src/lib/c_wrapper.cpp | 4 +- src/lib/detail/impl.cpp | 2 +- src/lib/detail/impl.hpp | 2 +- src/lib/libscord.cpp | 4 +- src/lib/scord/scord.h | 4 +- src/lib/scord/scord.hpp | 2 +- src/scord-ctl/rpc_server.cpp | 57 +++++++++++++++++++ src/scord-ctl/rpc_server.hpp | 5 ++ src/scord/rpc_server.cpp | 56 ++++++++++++++++-- src/scord/rpc_server.hpp | 2 +- 16 files changed, 142 insertions(+), 25 deletions(-) rename examples/c/{ADM_tear_down_adhoc_storage.c => ADM_terminate_adhoc_storage.c} (94%) rename examples/cxx/{ADM_tear_down_adhoc_storage.cpp => ADM_terminate_adhoc_storage.cpp} (95%) diff --git a/ci/check_rpcs.py b/ci/check_rpcs.py index fed1f1de..444bf155 100755 --- a/ci/check_rpcs.py +++ b/ci/check_rpcs.py @@ -14,7 +14,7 @@ RPC_NAMES = { 'ADM_register_job', 'ADM_update_job', 'ADM_remove_job', 'ADM_register_adhoc_storage', 'ADM_update_adhoc_storage', 'ADM_remove_adhoc_storage', 'ADM_deploy_adhoc_storage', - 'ADM_tear_down_adhoc_storage', + 'ADM_terminate_adhoc_storage', 'ADM_register_pfs_storage', 'ADM_update_pfs_storage', 'ADM_remove_pfs_storage', 'ADM_transfer_datasets', 'ADM_get_transfer_priority', diff --git a/examples/c/ADM_tear_down_adhoc_storage.c b/examples/c/ADM_terminate_adhoc_storage.c similarity index 94% rename from examples/c/ADM_tear_down_adhoc_storage.c rename to examples/c/ADM_terminate_adhoc_storage.c index d18b99f4..dafd05e8 100644 --- a/examples/c/ADM_tear_down_adhoc_storage.c +++ b/examples/c/ADM_terminate_adhoc_storage.c @@ -102,9 +102,9 @@ main(int argc, char* argv[]) { } // 2. Register the adhoc storage - if(ADM_register_adhoc_storage( - server, adhoc_name, ADM_ADHOC_STORAGE_DATACLAY, adhoc_ctx, - adhoc_resources, &adhoc_storage) != ADM_SUCCESS) { + if(ADM_register_adhoc_storage(server, adhoc_name, ADM_ADHOC_STORAGE_GEKKOFS, + adhoc_ctx, adhoc_resources, + &adhoc_storage) != ADM_SUCCESS) { fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n", ADM_strerror(ret)); goto cleanup; @@ -133,9 +133,9 @@ main(int argc, char* argv[]) { } // We can noe request the tear down of the adhoc storage - if((ret = ADM_tear_down_adhoc_storage(server, adhoc_storage)) != + if((ret = ADM_terminate_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) { - fprintf(stderr, "ADM_tear_down_adhoc_storage() failed: %s\n", + fprintf(stderr, "ADM_terminate_adhoc_storage() failed: %s\n", ADM_strerror(ret)); goto cleanup; } diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index cbc590b5..22004b2f 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -27,7 +27,7 @@ list(APPEND c_examples_with_controller ADM_register_job ADM_update_job ADM_remove_job # adhoc storage ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage - ADM_deploy_adhoc_storage ADM_tear_down_adhoc_storage + ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers diff --git a/examples/cxx/ADM_tear_down_adhoc_storage.cpp b/examples/cxx/ADM_terminate_adhoc_storage.cpp similarity index 95% rename from examples/cxx/ADM_tear_down_adhoc_storage.cpp rename to examples/cxx/ADM_terminate_adhoc_storage.cpp index 561676ed..e7b26594 100644 --- a/examples/cxx/ADM_tear_down_adhoc_storage.cpp +++ b/examples/cxx/ADM_terminate_adhoc_storage.cpp @@ -57,7 +57,7 @@ main(int argc, char* argv[]) { try { const auto adhoc_storage = scord::register_adhoc_storage( - server, name, scord::adhoc_storage::type::dataclay, + server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); fmt::print(stdout, @@ -66,7 +66,7 @@ main(int argc, char* argv[]) { scord::deploy_adhoc_storage(server, adhoc_storage); - scord::tear_down_adhoc_storage(server, adhoc_storage); + scord::terminate_adhoc_storage(server, adhoc_storage); } catch(const std::exception& e) { fmt::print(stderr, diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 9b828a7a..cadabb2e 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -27,7 +27,7 @@ list(APPEND cxx_examples_with_controller ADM_register_job ADM_update_job ADM_remove_job # adhoc storage ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage - ADM_deploy_adhoc_storage ADM_tear_down_adhoc_storage + ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers diff --git a/src/common/net/request.hpp b/src/common/net/request.hpp index 109e34f2..a6570e76 100644 --- a/src/common/net/request.hpp +++ b/src/common/net/request.hpp @@ -74,6 +74,15 @@ class response_with_value : public generic_response { public: constexpr response_with_value() noexcept = default; + constexpr response_with_value(std::uint64_t op_id, + scord::error_code ec) noexcept + : generic_response(op_id, ec) {} + + constexpr response_with_value(std::uint64_t op_id, scord::error_code ec, + Value value) noexcept + : generic_response(op_id, ec), m_value(std::move(value)) {} + + constexpr response_with_value(std::uint64_t op_id, scord::error_code ec, std::optional value) noexcept : generic_response(op_id, ec), m_value(std::move(value)) {} diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index c830be21..4ba8fda5 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -181,10 +181,10 @@ ADM_deploy_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage, } ADM_return_t -ADM_tear_down_adhoc_storage(ADM_server_t server, +ADM_terminate_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage) { - return scord::detail::tear_down_adhoc_storage( + return scord::detail::terminate_adhoc_storage( scord::server{server}, scord::adhoc_storage{adhoc_storage}); } diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index ad1be8ff..f664d4ab 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -419,7 +419,7 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } scord::error_code -tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { +terminate_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { network::client rpc_client{srv.protocol()}; diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 7d7f7a9f..e102d958 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -63,7 +63,7 @@ tl::expected deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); scord::error_code -tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +terminate_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); tl::expected register_pfs_storage(const server& srv, const std::string& name, diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index ad892944..48fe4eac 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -301,9 +301,9 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } void -tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { +terminate_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { - const auto ec = detail::tear_down_adhoc_storage(srv, adhoc_storage); + const auto ec = detail::terminate_adhoc_storage(srv, adhoc_storage); if(!ec) { throw std::runtime_error(fmt::format( diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index 0c024893..8bfcdfe7 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -163,7 +163,7 @@ ADM_deploy_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage, char** adhoc_storage_path); /** - * Tear down a previously deployed adhoc storage system instance + * Terminate a previously deployed adhoc storage system instance * * @param[in] server The server to which the request is directed * @param[in] adhoc_storage An ADM_STORAGE referring to the adhoc storage @@ -171,7 +171,7 @@ ADM_deploy_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage, * @return Returns ADM_SUCCESS if the remote procedure has completed */ ADM_return_t -ADM_tear_down_adhoc_storage(ADM_server_t server, +ADM_terminate_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage); /** diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index e5e07b43..841da3a8 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -74,7 +74,7 @@ std::string deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); void -tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +terminate_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); scord::pfs_storage register_pfs_storage(const server& srv, const std::string& name, diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index a1d86555..442be085 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -43,6 +43,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(ping)); provider::define(EXPAND(deploy_adhoc_storage)); + provider::define(EXPAND(terminate_adhoc_storage)); #undef EXPAND } @@ -194,4 +195,60 @@ respond: req.respond(resp); } +void +rpc_server::terminate_adhoc_storage( + const network::request& req, const std::string& adhoc_uuid, + enum scord::adhoc_storage::type adhoc_type) { + + using network::generic_response; + using network::get_address; + using network::rpc_info; + + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); + + LOGGER_INFO("rpc {:>} body: {{uuid: {}, type: {}}}", rpc, + std::quoted(adhoc_uuid), adhoc_type); + + auto ec = scord::error_code::success; + + if(!m_config.has_value() || m_config->adhoc_storage_configs().empty()) { + LOGGER_WARN("No adhoc storage configurations available"); + ec = scord::error_code::snafu; + goto respond; + } + + if(const auto it = m_config->adhoc_storage_configs().find(adhoc_type); + it != m_config->adhoc_storage_configs().end()) { + + const auto& adhoc_cfg = it->second; + const auto adhoc_dir = adhoc_cfg.working_directory() / adhoc_uuid; + + // 1. Construct the shutdown command for the adhoc storage instance + const auto cmd = + adhoc_cfg.shutdown_command().eval(adhoc_uuid, adhoc_dir, {}); + + // 2. Execute the shutdown command + try { + LOGGER_DEBUG("[{}] exec: {}", adhoc_uuid, cmd); + cmd.exec(); + } catch(const std::exception& ex) { + LOGGER_ERROR("[{}] Failed to execute shutdown command: {}", + adhoc_uuid, ex.what()); + ec = scord::error_code::subprocess_error; + } + + } else { + LOGGER_WARN( + "Failed to find adhoc storage configuration for type '{:e}'", + adhoc_type); + ec = scord::error_code::adhoc_type_unsupported; + goto respond; + } + +respond: + const generic_response resp{rpc.id(), ec}; + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, resp.error_code()); + req.respond(resp); +} + } // namespace scord_ctl diff --git a/src/scord-ctl/rpc_server.hpp b/src/scord-ctl/rpc_server.hpp index 2be18c56..ca617eb1 100644 --- a/src/scord-ctl/rpc_server.hpp +++ b/src/scord-ctl/rpc_server.hpp @@ -55,6 +55,11 @@ private: enum scord::adhoc_storage::type adhoc_type, const scord::adhoc_storage::resources& adhoc_resources); + void + terminate_adhoc_storage(const network::request& req, + const std::string& adhoc_uuid, + enum scord::adhoc_storage::type adhoc_type); + std::optional m_config; }; diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 37c25d4d..6e9bbd99 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -49,7 +49,7 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, provider::define(EXPAND(update_adhoc_storage)); provider::define(EXPAND(remove_adhoc_storage)); provider::define(EXPAND(deploy_adhoc_storage)); - provider::define(EXPAND(tear_down_adhoc_storage)); + provider::define(EXPAND(terminate_adhoc_storage)); provider::define(EXPAND(register_pfs_storage)); provider::define(EXPAND(update_pfs_storage)); provider::define(EXPAND(remove_pfs_storage)); @@ -368,24 +368,70 @@ rpc_server::deploy_adhoc_storage(const network::request& req, } void -rpc_server::tear_down_adhoc_storage(const network::request& req, +rpc_server::terminate_adhoc_storage(const network::request& req, std::uint64_t adhoc_id) { using network::generic_response; using network::get_address; using network::rpc_info; + using response_type = generic_response; + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); LOGGER_INFO("rpc {:>} body: {{adhoc_id: {}}}", rpc, adhoc_id); - // TODO: actually tear down the adhoc storage instance + /** + * @brief Helper lambda to contact the adhoc controller and prompt it to + * terminate an adhoc storage instance + * @param adhoc_storage The relevant `adhoc_storage` object with + * information about the instance to terminate. + * @return + */ + const auto terminate_helper = [&](const auto& adhoc_info) -> error_code { + assert(adhoc_info); + const auto adhoc_storage = adhoc_info->adhoc_storage(); + const auto endp = lookup(adhoc_storage.context().controller_address()); - const auto resp = generic_response{rpc.id(), scord::error_code::success}; + if(!endp) { + LOGGER_ERROR("endpoint lookup failed"); + return error_code::snafu; + } + + const auto child_rpc = + rpc.add_child(adhoc_storage.context().controller_address()); + + LOGGER_INFO("rpc {:<} body: {{uuid: {}, type: {}}}", child_rpc, + std::quoted(adhoc_info->uuid()), adhoc_storage.type()); + if(const auto call_rv = endp->call(rpc.name(), adhoc_info->uuid(), + adhoc_storage.type()); + call_rv.has_value()) { + + const response_type resp{call_rv.value()}; + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}}} [op_id: {}]", child_rpc, + resp.error_code(), resp.op_id()); + + return resp.error_code(); + } + + LOGGER_ERROR("rpc call failed"); + return error_code::snafu; + }; + + const error_code ec = + m_adhoc_manager.find(adhoc_id) + .or_else([](auto&&) { + LOGGER_ERROR("adhoc storage instance not found"); + }) + .transform(terminate_helper) + .value(); + + const auto resp = response_type{rpc.id(), ec}; LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, scord::error_code::success); - req.respond(resp); } diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 5a0df6f9..0254328f 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -74,7 +74,7 @@ private: deploy_adhoc_storage(const network::request& req, std::uint64_t adhoc_id); void - tear_down_adhoc_storage(const network::request& req, + terminate_adhoc_storage(const network::request& req, std::uint64_t adhoc_id); void -- GitLab