diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt
index 26a4f33e0a64cc6de5c83abd8fe8bb0c68f9a483..61d63df6b5456241e4c81fb36f8446863ab49220 100644
--- a/src/scord/CMakeLists.txt
+++ b/src/scord/CMakeLists.txt
@@ -26,7 +26,7 @@
add_executable(scord)
target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp
- env.hpp job_manager.hpp)
+ env.hpp job_manager.hpp adhoc_storage_manager.hpp)
target_include_directories(
scord
diff --git a/src/scord/adhoc_storage_manager.hpp b/src/scord/adhoc_storage_manager.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..7f519f08505a403b4177fc3835fb85ae48120fe1
--- /dev/null
+++ b/src/scord/adhoc_storage_manager.hpp
@@ -0,0 +1,129 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+
+#ifndef SCORD_ADHOC_STORAGE_MANAGER_HPP
+#define SCORD_ADHOC_STORAGE_MANAGER_HPP
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace scord {
+
+struct adhoc_storage_manager : scord::utils::singleton {
+
+ tl::expected
+ create(enum admire::adhoc_storage::type type, const std::string& name,
+ const admire::adhoc_storage::ctx& ctx) {
+
+ static std::atomic_uint64_t current_id;
+ admire::job_id id = current_id++;
+
+ abt::unique_lock lock(m_adhoc_storages_mutex);
+
+ if(const auto it = m_adhoc_storages.find(id);
+ it == m_adhoc_storages.end()) {
+ const auto& [it_adhoc, inserted] = m_adhoc_storages.emplace(
+ id, admire::adhoc_storage{type, name, current_id++, ctx});
+
+ if(!inserted) {
+ LOGGER_ERROR("{}: Emplace failed", __FUNCTION__);
+ return tl::make_unexpected(ADM_ESNAFU);
+ }
+
+ return it_adhoc->second;
+ }
+
+ LOGGER_ERROR("{}: Adhoc storage '{}' already exists", __FUNCTION__, id);
+ return tl::make_unexpected(ADM_EEXISTS);
+ }
+
+ admire::error_code
+ update(std::uint64_t id, admire::adhoc_storage::ctx ctx) {
+
+ abt::unique_lock lock(m_adhoc_storages_mutex);
+
+ if(const auto it = m_adhoc_storages.find(id);
+ it != m_adhoc_storages.end()) {
+
+ const auto& current_adhoc = it->second;
+
+ it->second = admire::adhoc_storage{
+ current_adhoc.type(), current_adhoc.name(),
+ current_adhoc.id(), std::move(ctx)};
+ return ADM_SUCCESS;
+ }
+
+ LOGGER_ERROR("{}: Adhoc storage '{}' does not exist", __FUNCTION__, id);
+ return ADM_ENOENT;
+ }
+
+ tl::expected
+ find(admire::job_id id) {
+
+ abt::shared_lock lock(m_adhoc_storages_mutex);
+
+ if(auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) {
+ return it->second;
+ }
+
+ LOGGER_ERROR("Adhoc storage '{}' was not registered or was already "
+ "deleted",
+ id);
+ return tl::make_unexpected(ADM_ENOENT);
+ }
+
+ admire::error_code
+ remove(admire::job_id id) {
+
+ abt::unique_lock lock(m_adhoc_storages_mutex);
+
+ if(m_adhoc_storages.count(id) != 0) {
+ m_adhoc_storages.erase(id);
+ return ADM_SUCCESS;
+ }
+
+ LOGGER_ERROR("Adhoc storage '{}' was not registered or was already "
+ "deleted",
+ id);
+
+ return ADM_ENOENT;
+ }
+
+private:
+ friend class scord::utils::singleton;
+ adhoc_storage_manager() = default;
+
+ mutable abt::shared_mutex m_adhoc_storages_mutex;
+ std::unordered_map m_adhoc_storages;
+};
+
+} // namespace scord
+
+#endif // SCORD_ADHOC_STORAGE_MANAGER_HPP
diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp
index afc8ae844c546aeaf9d630499308eb2a9fe0bbb4..6a4987d6be9dcdfe5e1b276011639f477440a4b3 100644
--- a/src/scord/rpc_handlers.cpp
+++ b/src/scord/rpc_handlers.cpp
@@ -29,6 +29,7 @@
#include
#include "rpc_handlers.hpp"
#include "job_manager.hpp"
+#include "adhoc_storage_manager.hpp"
struct remote_procedure {
static std::uint64_t
@@ -38,17 +39,6 @@ struct remote_procedure {
}
};
-struct adhoc_storage_manager {
-
- template
- static admire::adhoc_storage
- create(enum admire::adhoc_storage::type type, const std::string& name,
- const admire::adhoc_storage::ctx& ctx) {
- static std::atomic_uint64_t current_id;
- return admire::adhoc_storage(type, name, current_id++, ctx);
- }
-};
-
static void
ADM_ping(hg_handle_t h) {
@@ -274,18 +264,29 @@ ADM_register_adhoc_storage(hg_handle_t h) {
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
name, type, ctx);
- const auto adhoc_storage = adhoc_storage_manager::create(type, name, ctx);
+ auto& adhoc_manager = scord::adhoc_storage_manager::instance();
+ const auto rv = adhoc_manager.create(type, name, ctx);
- admire::error_code rv = ADM_SUCCESS;
+ admire::error_code ec = ADM_SUCCESS;
- out.op_id = rpc_id;
- out.retval = rv;
- out.id = adhoc_storage.id();
+ if(rv) {
+ const auto& adhoc_storage = rv.value();
+ out.op_id = rpc_id;
+ out.retval = ec;
+ out.id = adhoc_storage.id();
+ } else {
+ LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: "
+ "{}\"",
+ rpc_id, rv.error());
+ out.op_id = rpc_id;
+ out.retval = ec;
+ out.id = 0;
+ }
LOGGER_INFO("rpc id: {} name: {} to: {} => "
"body: {{retval: {}, id: {}}}",
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
- rv, out.id);
+ ec, out.id);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);