From 72fe3341cfd486941a329a7e257db8e1b6201e1f Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Tue, 24 Oct 2023 15:01:46 +0200 Subject: [PATCH 1/5] Added Redis set --- CMakeLists.txt | 11 ++++++ etc/scord.conf.in | 3 ++ examples/scord.conf.in | 3 ++ src/lib/scord/types.hpp | 10 +++-- src/lib/types.cpp | 17 +++++++++ src/scord/CMakeLists.txt | 1 + src/scord/rpc_server.cpp | 82 ++++++++++++++++++++++++++++++++++++++-- src/scord/rpc_server.hpp | 12 ++++-- src/scord/scord.cpp | 6 ++- 9 files changed, 132 insertions(+), 13 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f2b5c05e..1a11f4ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -167,6 +167,15 @@ option(SCORD_BUILD_EXAMPLES "Build examples (disabled by default)" OFF) option(SCORD_BUILD_TESTS "Build tests (disabled by default)" OFF) +### REDIS_ADDRESS +set(REDIS_ADDRESS + "tcp://127.0.0.1:6379" + CACHE STRING + "Define a redis address (default: tcp://127.0.0.1:6379)" +) + +message(STATUS "[${PROJECT_NAME}] Redis address: ${REDIS_ADDRESS}") + set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # ############################################################################## @@ -305,6 +314,8 @@ find_package(RedisPlusPlus 1.3.3 REQUIRED) message(STATUS "[${PROJECT_NAME}] Checking for Cargo") find_package(Cargo 0.3.1 REQUIRED) +message(STATUS "[${PROJECT_NAME}] Checking for Hiredis") +find_package(hiredis REQUIRED) # ############################################################################## # Process subdirectories diff --git a/etc/scord.conf.in b/etc/scord.conf.in index 0efad292..bd63b41a 100644 --- a/etc/scord.conf.in +++ b/etc/scord.conf.in @@ -11,3 +11,6 @@ global_settings: # address to bind to address: "@SCORD_TRANSPORT_PROTOCOL@://@SCORD_BIND_ADDRESS@:@SCORD_BIND_PORT@" + + # redis connection + redisaddress : "@REDIS_ADDRESS@" \ No newline at end of file diff --git a/examples/scord.conf.in b/examples/scord.conf.in index ca76279a..76118ac7 100644 --- a/examples/scord.conf.in +++ b/examples/scord.conf.in @@ -11,3 +11,6 @@ global_settings: # address to bind to address: "@SCORD_TRANSPORT_PROTOCOL@://@SCORD_BIND_ADDRESS@:@SCORD_BIND_PORT@" + + # redis connection + redisaddress : "@REDIS_ADDRESS@" \ No newline at end of file diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index ec767ccb..08309b89 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -106,7 +106,7 @@ struct error_code { template void serialize(Archive&& ar) { - ar & m_value; + ar& m_value; } private: @@ -231,7 +231,7 @@ struct adhoc_storage { template void serialize(Archive&& ar) { - ar & m_nodes; + ar& m_nodes; } private: @@ -303,6 +303,8 @@ struct adhoc_storage { name() const; type type() const; + std::string + type_tostr() const; std::uint64_t id() const; adhoc_storage::ctx const& @@ -350,7 +352,7 @@ struct pfs_storage { template void serialize(Archive&& ar) { - ar & m_mount_point; + ar& m_mount_point; } private: @@ -412,7 +414,7 @@ struct job { template void serialize(Archive&& ar) { - ar & m_nodes; + ar& m_nodes; } private: diff --git a/src/lib/types.cpp b/src/lib/types.cpp index a889d336..a1b89acb 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -867,6 +867,23 @@ adhoc_storage::type() const { return m_pimpl->type(); } +std::string +adhoc_storage::type_tostr() const { + switch(m_pimpl->type()) { + case adhoc_storage::type::hercules: + return "Hercules"; + case adhoc_storage::type::dataclay: + return "DataClay"; + case adhoc_storage::type::gekkofs: + return "GekkoFS"; + case adhoc_storage::type::expand: + return "Expand"; + default: + return "Unknown"; + } + return "Unknown"; +} + std::uint64_t adhoc_storage::id() const { return m_pimpl->id(); diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 839e283a..af2e1fd6 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -50,6 +50,7 @@ target_link_libraries( fmt::fmt CLI11::CLI11 RedisPlusPlus::RedisPlusPlus + hiredis::hiredis ryml::ryml cargo::cargo ) diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 98d6a62c..9d62ed39 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -71,15 +71,14 @@ dataset_process(std::string id) { namespace scord { rpc_server::rpc_server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir) + std::filesystem::path rundir, std::string redis_address) : server::server(std::move(name), std::move(address), std::move(daemonize), std::move(rundir)), provider::provider(m_network_engine, 0), m_scheduler_ess(thallium::xstream::create()), m_scheduler_ult( - m_scheduler_ess->make_thread([this]() { scheduler_update(); })) { - - ; + m_scheduler_ess->make_thread([this]() { scheduler_update(); })), + m_redis_address(std::move(redis_address)) { #define EXPAND(rpc_name) "ADM_" #rpc_name##s, &rpc_server::rpc_name @@ -111,6 +110,15 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, #define RPC_NAME() ("ADM_"s + __FUNCTION__) +void +rpc_server::init_redis() { + try { + m_redis = sw::redis::Redis(m_redis_address); + } catch(const sw::redis::Error& e) { + LOGGER_ERROR("Redis not initialized at address {}", m_redis_address); + m_redis = {}; + } +} void rpc_server::ping(const network::request& req) { @@ -223,6 +231,32 @@ rpc_server::register_job(const network::request& req, } job_id = job_metadata_ptr->job().id(); + + if(m_redis and job_id) { + const auto adhoc_id = job_requirements.adhoc_storage()->id(); + auto ec = m_adhoc_manager.find(adhoc_id); + const auto timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + auto name = ec->get()->adhoc_storage().name(); + auto type = ec->get()->adhoc_storage().type_tostr(); + + std::unordered_map m = { + {"timestamp", std::to_string(timestamp)}, + {"job_id", std::to_string(job_id.value())}, + {"AdhocID", std::to_string(adhoc_id)}, + {"AdhocName", name}, + {"Type", type}, // Lustre // Gekko + {"Deployed", "No"}, // No // Yes + {"StartTime", ""}, + {"EndTime", ""}, // Or Running + {"Policies", ""} // + + }; + + m_redis.value().hmset(std::to_string(slurm_id.value()), m.begin(), + m.end()); + } + } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc.id(), jm_result.error()); @@ -548,6 +582,26 @@ rpc_server::deploy_adhoc_storage(const network::request& req, LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:<} body: {{retval: {}, adhoc_dir: {}}}", rpc, resp.error_code(), resp.value()); + + if(m_redis and resp.error_code() == error_code::success) { + const auto job_id = m_adhoc_manager.find(adhoc_id) + .value() + .get() + ->client_info() + .get() + ->job() + .id(); + + const auto timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + std::unordered_map m = { + {"Deployed", "Yes"}, + {"StartTime", std::to_string(timestamp)}, + {"EndTime", "Running"}}; + + m_redis.value().hmset(std::to_string(job_id), m.begin(), m.end()); + } req.respond(resp); } @@ -618,6 +672,26 @@ rpc_server::terminate_adhoc_storage(const network::request& req, const auto resp = response_type{rpc.id(), ec}; LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, scord::error_code::success); + + + if(m_redis and resp.error_code() == error_code::success) { + const auto job_id = m_adhoc_manager.find(adhoc_id) + .value() + .get() + ->client_info() + .get() + ->job() + .id(); + + const auto timestamp = + std::chrono::system_clock::now().time_since_epoch().count(); + + std::unordered_map m = { + {"Deployed", "Yes"}, + {"EndTime", std::to_string(timestamp)}}; + m_redis.value().hmset(std::to_string(job_id), m.begin(), m.end()); + } + req.respond(resp); } diff --git a/src/scord/rpc_server.hpp b/src/scord/rpc_server.hpp index 08788fcc..e4257081 100644 --- a/src/scord/rpc_server.hpp +++ b/src/scord/rpc_server.hpp @@ -33,11 +33,13 @@ #include "adhoc_storage_manager.hpp" #include "pfs_storage_manager.hpp" #include "transfer_manager.hpp" +#include namespace cargo { class transfer; } + namespace scord { class rpc_server : public network::server, @@ -45,7 +47,9 @@ class rpc_server : public network::server, public: rpc_server(std::string name, std::string address, bool daemonize, - std::filesystem::path rundir); + std::filesystem::path rundir, std::string redis_address); + void + init_redis(); private: void @@ -106,7 +110,6 @@ private: const std::vector& limits, enum scord::transfer::mapping mapping); - job_manager m_job_manager; adhoc_storage_manager m_adhoc_manager; pfs_storage_manager m_pfs_manager; @@ -120,8 +123,7 @@ private: public: /** - * @brief Generates scheduling information, a set of pairs (contact point, - * and action) + * @brief Generates scheduling information, * * It causes a lock-unlock of the transfer_manager structure. * Is a thread @@ -130,6 +132,8 @@ public: */ void scheduler_update(); + std::string m_redis_address; + std::optional m_redis; }; } // namespace scord diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp index f4cbdac6..b5acc4bf 100644 --- a/src/scord/scord.cpp +++ b/src/scord/scord.cpp @@ -115,6 +115,7 @@ main(int argc, char* argv[]) { std::optional output_file; std::optional rundir; std::optional address; + std::optional redis_address; } cli_args; const auto progname = fs::path{argv[0]}.filename().string(); @@ -171,6 +172,8 @@ main(int argc, char* argv[]) { global_settings->add_option("--rundir", cli_args.rundir); global_settings->add_option("--address", cli_args.address); + global_settings->add_option("--redisaddress", cli_args.redis_address); + CLI11_PARSE(app, argc, argv); // ->required(true) doesn't work with configurable subcommands @@ -184,8 +187,9 @@ main(int argc, char* argv[]) { try { scord::rpc_server srv(progname, *cli_args.address, !cli_args.foreground, - cli_args.rundir.value_or(fs::current_path())); + cli_args.rundir.value_or(fs::current_path()), *cli_args.redis_address); srv.configure_logger(cli_args.log_type, cli_args.output_file); + srv.init_redis(); return srv.run(); } catch(const std::exception& ex) { fmt::print(stderr, -- GitLab From 215dc49309fb6a2a75f054d7f411abd5e661fddd Mon Sep 17 00:00:00 2001 From: rnou Date: Wed, 25 Oct 2023 10:38:18 +0200 Subject: [PATCH 2/5] updated docker image --- src/lib/scord/types.hpp | 2 -- src/lib/types.cpp | 17 ----------------- src/scord/rpc_server.cpp | 2 +- 3 files changed, 1 insertion(+), 20 deletions(-) diff --git a/src/lib/scord/types.hpp b/src/lib/scord/types.hpp index 08309b89..6ec23f13 100644 --- a/src/lib/scord/types.hpp +++ b/src/lib/scord/types.hpp @@ -303,8 +303,6 @@ struct adhoc_storage { name() const; type type() const; - std::string - type_tostr() const; std::uint64_t id() const; adhoc_storage::ctx const& diff --git a/src/lib/types.cpp b/src/lib/types.cpp index a1b89acb..a889d336 100644 --- a/src/lib/types.cpp +++ b/src/lib/types.cpp @@ -867,23 +867,6 @@ adhoc_storage::type() const { return m_pimpl->type(); } -std::string -adhoc_storage::type_tostr() const { - switch(m_pimpl->type()) { - case adhoc_storage::type::hercules: - return "Hercules"; - case adhoc_storage::type::dataclay: - return "DataClay"; - case adhoc_storage::type::gekkofs: - return "GekkoFS"; - case adhoc_storage::type::expand: - return "Expand"; - default: - return "Unknown"; - } - return "Unknown"; -} - std::uint64_t adhoc_storage::id() const { return m_pimpl->id(); diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 9d62ed39..8d687868 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -238,7 +238,7 @@ rpc_server::register_job(const network::request& req, const auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto name = ec->get()->adhoc_storage().name(); - auto type = ec->get()->adhoc_storage().type_tostr(); + std::string type = fmt::format("{}",ec->get()->adhoc_storage().type()); std::unordered_map m = { {"timestamp", std::to_string(timestamp)}, -- GitLab From b953169968d19900b8b8831ecbe0cd6cfa3d2538 Mon Sep 17 00:00:00 2001 From: rnou Date: Wed, 25 Oct 2023 13:09:37 +0200 Subject: [PATCH 3/5] Added code to disable redis on error --- src/scord/rpc_server.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 8d687868..22e06097 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -112,11 +112,14 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, void rpc_server::init_redis() { + try { m_redis = sw::redis::Redis(m_redis_address); + // Try the connection + m_redis.value().dbsize(); } catch(const sw::redis::Error& e) { - LOGGER_ERROR("Redis not initialized at address {}", m_redis_address); - m_redis = {}; + LOGGER_CRITICAL("Redis not initialized at address {}", m_redis_address); + m_redis = std::nullopt; } } void @@ -238,7 +241,8 @@ rpc_server::register_job(const network::request& req, const auto timestamp = std::chrono::system_clock::now().time_since_epoch().count(); auto name = ec->get()->adhoc_storage().name(); - std::string type = fmt::format("{}",ec->get()->adhoc_storage().type()); + std::string type = + fmt::format("{}", ec->get()->adhoc_storage().type()); std::unordered_map m = { {"timestamp", std::to_string(timestamp)}, -- GitLab From 0d8ecb944745483661fed0686d43eb74277015db Mon Sep 17 00:00:00 2001 From: rnou Date: Thu, 26 Oct 2023 12:09:13 +0200 Subject: [PATCH 4/5] Added UUID --- src/scord/rpc_server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 22e06097..eebb02ac 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -248,6 +248,7 @@ rpc_server::register_job(const network::request& req, {"timestamp", std::to_string(timestamp)}, {"job_id", std::to_string(job_id.value())}, {"AdhocID", std::to_string(adhoc_id)}, + {"AdhocUUID", ec->get()->uuid()}, {"AdhocName", name}, {"Type", type}, // Lustre // Gekko {"Deployed", "No"}, // No // Yes -- GitLab From 1ca93ad045a32e47b4aecd76c3fc4dec39190b6a Mon Sep 17 00:00:00 2001 From: Ramon Nou Date: Mon, 20 Nov 2023 10:33:05 +0100 Subject: [PATCH 5/5] Change job id to slurm id --- src/scord/rpc_server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index eebb02ac..a883c0e0 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -258,7 +258,7 @@ rpc_server::register_job(const network::request& req, }; - m_redis.value().hmset(std::to_string(slurm_id.value()), m.begin(), + m_redis.value().hmset(std::to_string(slurm_id), m.begin(), m.end()); } -- GitLab