From 9959831b169875ea1caad11971be230918e9bb5e Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Mon, 17 Oct 2022 12:51:25 +0200 Subject: [PATCH] scord: Improve adhoc_storage_manager --- src/scord/CMakeLists.txt | 2 +- src/scord/adhoc_storage_manager.hpp | 129 ++++++++++++++++++++++++++++ src/scord/rpc_handlers.cpp | 35 ++++---- 3 files changed, 148 insertions(+), 18 deletions(-) create mode 100644 src/scord/adhoc_storage_manager.hpp diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 26a4f33e..61d63df6 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -26,7 +26,7 @@ add_executable(scord) target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp - env.hpp job_manager.hpp) + env.hpp job_manager.hpp adhoc_storage_manager.hpp) target_include_directories( scord diff --git a/src/scord/adhoc_storage_manager.hpp b/src/scord/adhoc_storage_manager.hpp new file mode 100644 index 00000000..7f519f08 --- /dev/null +++ b/src/scord/adhoc_storage_manager.hpp @@ -0,0 +1,129 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + + +#ifndef SCORD_ADHOC_STORAGE_MANAGER_HPP +#define SCORD_ADHOC_STORAGE_MANAGER_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace scord { + +struct adhoc_storage_manager : scord::utils::singleton { + + tl::expected + create(enum admire::adhoc_storage::type type, const std::string& name, + const admire::adhoc_storage::ctx& ctx) { + + static std::atomic_uint64_t current_id; + admire::job_id id = current_id++; + + abt::unique_lock lock(m_adhoc_storages_mutex); + + if(const auto it = m_adhoc_storages.find(id); + it == m_adhoc_storages.end()) { + const auto& [it_adhoc, inserted] = m_adhoc_storages.emplace( + id, admire::adhoc_storage{type, name, current_id++, ctx}); + + if(!inserted) { + LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); + return tl::make_unexpected(ADM_ESNAFU); + } + + return it_adhoc->second; + } + + LOGGER_ERROR("{}: Adhoc storage '{}' already exists", __FUNCTION__, id); + return tl::make_unexpected(ADM_EEXISTS); + } + + admire::error_code + update(std::uint64_t id, admire::adhoc_storage::ctx ctx) { + + abt::unique_lock lock(m_adhoc_storages_mutex); + + if(const auto it = m_adhoc_storages.find(id); + it != m_adhoc_storages.end()) { + + const auto& current_adhoc = it->second; + + it->second = admire::adhoc_storage{ + current_adhoc.type(), current_adhoc.name(), + current_adhoc.id(), std::move(ctx)}; + return ADM_SUCCESS; + } + + LOGGER_ERROR("{}: Adhoc storage '{}' does not exist", __FUNCTION__, id); + return ADM_ENOENT; + } + + tl::expected + find(admire::job_id id) { + + abt::shared_lock lock(m_adhoc_storages_mutex); + + if(auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) { + return it->second; + } + + LOGGER_ERROR("Adhoc storage '{}' was not registered or was already " + "deleted", + id); + return tl::make_unexpected(ADM_ENOENT); + } + + admire::error_code + remove(admire::job_id id) { + + abt::unique_lock lock(m_adhoc_storages_mutex); + + if(m_adhoc_storages.count(id) != 0) { + m_adhoc_storages.erase(id); + return ADM_SUCCESS; + } + + LOGGER_ERROR("Adhoc storage '{}' was not registered or was already " + "deleted", + id); + + return ADM_ENOENT; + } + +private: + friend class scord::utils::singleton; + adhoc_storage_manager() = default; + + mutable abt::shared_mutex m_adhoc_storages_mutex; + std::unordered_map m_adhoc_storages; +}; + +} // namespace scord + +#endif // SCORD_ADHOC_STORAGE_MANAGER_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index afc8ae84..6a4987d6 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -29,6 +29,7 @@ #include #include "rpc_handlers.hpp" #include "job_manager.hpp" +#include "adhoc_storage_manager.hpp" struct remote_procedure { static std::uint64_t @@ -38,17 +39,6 @@ struct remote_procedure { } }; -struct adhoc_storage_manager { - - template - static admire::adhoc_storage - create(enum admire::adhoc_storage::type type, const std::string& name, - const admire::adhoc_storage::ctx& ctx) { - static std::atomic_uint64_t current_id; - return admire::adhoc_storage(type, name, current_id++, ctx); - } -}; - static void ADM_ping(hg_handle_t h) { @@ -274,18 +264,29 @@ ADM_register_adhoc_storage(hg_handle_t h) { rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), name, type, ctx); - const auto adhoc_storage = adhoc_storage_manager::create(type, name, ctx); + auto& adhoc_manager = scord::adhoc_storage_manager::instance(); + const auto rv = adhoc_manager.create(type, name, ctx); - admire::error_code rv = ADM_SUCCESS; + admire::error_code ec = ADM_SUCCESS; - out.op_id = rpc_id; - out.retval = rv; - out.id = adhoc_storage.id(); + if(rv) { + const auto& adhoc_storage = rv.value(); + out.op_id = rpc_id; + out.retval = ec; + out.id = adhoc_storage.id(); + } else { + LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: " + "{}\"", + rpc_id, rv.error()); + out.op_id = rpc_id; + out.retval = ec; + out.id = 0; + } LOGGER_INFO("rpc id: {} name: {} to: {} => " "body: {{retval: {}, id: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), - rv, out.id); + ec, out.id); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); -- GitLab