Commit 82eb26d3 authored by Ramon Nou's avatar Ramon Nou
Browse files

Merge branch 'rnou/redis' into 'main'

Update redis with information about adhocfs

Closes #153

See merge request !116
parents 7944ed3e 1ca93ad0
Loading
Loading
Loading
Loading
Loading
+11 −0
Original line number Diff line number Diff line
@@ -167,6 +167,15 @@ option(SCORD_BUILD_EXAMPLES "Build examples (disabled by default)" OFF)

option(SCORD_BUILD_TESTS "Build tests (disabled by default)" OFF)

### REDIS_ADDRESS
set(REDIS_ADDRESS
  "tcp://127.0.0.1:6379"
  CACHE STRING
  "Define a redis address (default: tcp://127.0.0.1:6379)"
)

message(STATUS "[${PROJECT_NAME}] Redis address: ${REDIS_ADDRESS}")

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

# ##############################################################################
@@ -305,6 +314,8 @@ find_package(RedisPlusPlus 1.3.3 REQUIRED)
message(STATUS "[${PROJECT_NAME}] Checking for Cargo")
find_package(Cargo 0.3.1 REQUIRED)

message(STATUS "[${PROJECT_NAME}] Checking for Hiredis")
find_package(hiredis REQUIRED)

# ##############################################################################
# Process subdirectories
+3 −0
Original line number Diff line number Diff line
@@ -11,3 +11,6 @@ global_settings:

  # address to bind to
  address: "@SCORD_TRANSPORT_PROTOCOL@://@SCORD_BIND_ADDRESS@:@SCORD_BIND_PORT@"

  # redis connection
  redisaddress : "@REDIS_ADDRESS@"
 No newline at end of file
+3 −0
Original line number Diff line number Diff line
@@ -11,3 +11,6 @@ global_settings:

  # address to bind to
  address: "@SCORD_TRANSPORT_PROTOCOL@://@SCORD_BIND_ADDRESS@:@SCORD_BIND_PORT@"

  # redis connection
  redisaddress : "@REDIS_ADDRESS@"
 No newline at end of file
+1 −0
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ target_link_libraries(
  fmt::fmt
  CLI11::CLI11
  RedisPlusPlus::RedisPlusPlus
  hiredis::hiredis
  ryml::ryml
  cargo::cargo
)
+83 −4
Original line number Diff line number Diff line
@@ -71,15 +71,14 @@ dataset_process(std::string id) {
namespace scord {

rpc_server::rpc_server(std::string name, std::string address, bool daemonize,
                       std::filesystem::path rundir)
                       std::filesystem::path rundir, std::string redis_address)
    : server::server(std::move(name), std::move(address), std::move(daemonize),
                     std::move(rundir)),
      provider::provider(m_network_engine, 0),
      m_scheduler_ess(thallium::xstream::create()),
      m_scheduler_ult(
              m_scheduler_ess->make_thread([this]() { scheduler_update(); })) {

    ;
              m_scheduler_ess->make_thread([this]() { scheduler_update(); })),
      m_redis_address(std::move(redis_address)) {


#define EXPAND(rpc_name) "ADM_" #rpc_name##s, &rpc_server::rpc_name
@@ -111,6 +110,18 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize,

#define RPC_NAME() ("ADM_"s + __FUNCTION__)

void
rpc_server::init_redis() {

    try {
        m_redis = sw::redis::Redis(m_redis_address);
        // Try the connection
        m_redis.value().dbsize();
    } catch(const sw::redis::Error& e) {
        LOGGER_CRITICAL("Redis not initialized at address {}", m_redis_address);
        m_redis = std::nullopt;
    }
}
void
rpc_server::ping(const network::request& req) {

@@ -223,6 +234,34 @@ rpc_server::register_job(const network::request& req,
        }

        job_id = job_metadata_ptr->job().id();

        if(m_redis and job_id) {
            const auto adhoc_id = job_requirements.adhoc_storage()->id();
            auto ec = m_adhoc_manager.find(adhoc_id);
            const auto timestamp =
                    std::chrono::system_clock::now().time_since_epoch().count();
            auto name = ec->get()->adhoc_storage().name();
            std::string type =
                    fmt::format("{}", ec->get()->adhoc_storage().type());

            std::unordered_map<std::string, std::string> m = {
                    {"timestamp", std::to_string(timestamp)},
                    {"job_id", std::to_string(job_id.value())},
                    {"AdhocID", std::to_string(adhoc_id)},
                    {"AdhocUUID", ec->get()->uuid()},
                    {"AdhocName", name},
                    {"Type", type},     // Lustre // Gekko
                    {"Deployed", "No"}, // No // Yes
                    {"StartTime", ""},
                    {"EndTime", ""}, // Or Running
                    {"Policies", ""} //

            };

            m_redis.value().hmset(std::to_string(slurm_id), m.begin(),
                                  m.end());
        }

    } else {
        LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"",
                     rpc.id(), jm_result.error());
@@ -548,6 +587,26 @@ rpc_server::deploy_adhoc_storage(const network::request& req,
    LOGGER_EVAL(resp.error_code(), INFO, ERROR,
                "rpc {:<} body: {{retval: {}, adhoc_dir: {}}}", rpc,
                resp.error_code(), resp.value());

    if(m_redis and resp.error_code() == error_code::success) {
        const auto job_id = m_adhoc_manager.find(adhoc_id)
                                    .value()
                                    .get()
                                    ->client_info()
                                    .get()
                                    ->job()
                                    .id();

        const auto timestamp =
                std::chrono::system_clock::now().time_since_epoch().count();

        std::unordered_map<std::string, std::string> m = {
                {"Deployed", "Yes"},
                {"StartTime", std::to_string(timestamp)},
                {"EndTime", "Running"}};

        m_redis.value().hmset(std::to_string(job_id), m.begin(), m.end());
    }
    req.respond(resp);
}

@@ -618,6 +677,26 @@ rpc_server::terminate_adhoc_storage(const network::request& req,
    const auto resp = response_type{rpc.id(), ec};
    LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc,
                scord::error_code::success);


    if(m_redis and resp.error_code() == error_code::success) {
        const auto job_id = m_adhoc_manager.find(adhoc_id)
                                    .value()
                                    .get()
                                    ->client_info()
                                    .get()
                                    ->job()
                                    .id();

        const auto timestamp =
                std::chrono::system_clock::now().time_since_epoch().count();

        std::unordered_map<std::string, std::string> m = {
                {"Deployed", "Yes"},
                {"EndTime", std::to_string(timestamp)}};
        m_redis.value().hmset(std::to_string(job_id), m.begin(), m.end());
    }

    req.respond(resp);
}

Loading