Skip to content
Snippets Groups Projects
Commit 1d610a0b authored by Alberto Miranda's avatar Alberto Miranda :hotsprings:
Browse files

Merge branch...

Merge branch 'amiranda/82-improve-adhoc_storage_manager-in-scord-to-keep-information-about-registered-instances' into 'main'

Resolve "Improve `adhoc_storage_manager` in `scord` to keep information about registered instances"

Closes #82

See merge request !55
parents 8ca3dcef 9959831b
No related branches found
No related tags found
1 merge request!55Resolve "Improve `adhoc_storage_manager` in `scord` to keep information about registered instances"
Pipeline #3253 passed
......@@ -26,7 +26,7 @@
add_executable(scord)
target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp
env.hpp job_manager.hpp)
env.hpp job_manager.hpp adhoc_storage_manager.hpp)
target_include_directories(
scord
......
/******************************************************************************
* Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
*
* This software was partially supported by the EuroHPC-funded project ADMIRE
* (Project ID: 956748, https://www.admire-eurohpc.eu).
*
* This file is part of scord.
*
* scord is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* scord is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with scord. If not, see <https://www.gnu.org/licenses/>.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#ifndef SCORD_ADHOC_STORAGE_MANAGER_HPP
#define SCORD_ADHOC_STORAGE_MANAGER_HPP
#include <admire_types.hpp>
#include <utils/utils.hpp>
#include <unordered_map>
#include <abt_cxx/shared_mutex.hpp>
#include <tl/expected.hpp>
#include <atomic>
#include <logger/logger.hpp>
namespace scord {
struct adhoc_storage_manager : scord::utils::singleton<adhoc_storage_manager> {
tl::expected<admire::adhoc_storage, admire::error_code>
create(enum admire::adhoc_storage::type type, const std::string& name,
const admire::adhoc_storage::ctx& ctx) {
static std::atomic_uint64_t current_id;
admire::job_id id = current_id++;
abt::unique_lock lock(m_adhoc_storages_mutex);
if(const auto it = m_adhoc_storages.find(id);
it == m_adhoc_storages.end()) {
const auto& [it_adhoc, inserted] = m_adhoc_storages.emplace(
id, admire::adhoc_storage{type, name, current_id++, ctx});
if(!inserted) {
LOGGER_ERROR("{}: Emplace failed", __FUNCTION__);
return tl::make_unexpected(ADM_ESNAFU);
}
return it_adhoc->second;
}
LOGGER_ERROR("{}: Adhoc storage '{}' already exists", __FUNCTION__, id);
return tl::make_unexpected(ADM_EEXISTS);
}
admire::error_code
update(std::uint64_t id, admire::adhoc_storage::ctx ctx) {
abt::unique_lock lock(m_adhoc_storages_mutex);
if(const auto it = m_adhoc_storages.find(id);
it != m_adhoc_storages.end()) {
const auto& current_adhoc = it->second;
it->second = admire::adhoc_storage{
current_adhoc.type(), current_adhoc.name(),
current_adhoc.id(), std::move(ctx)};
return ADM_SUCCESS;
}
LOGGER_ERROR("{}: Adhoc storage '{}' does not exist", __FUNCTION__, id);
return ADM_ENOENT;
}
tl::expected<admire::adhoc_storage, admire::error_code>
find(admire::job_id id) {
abt::shared_lock lock(m_adhoc_storages_mutex);
if(auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) {
return it->second;
}
LOGGER_ERROR("Adhoc storage '{}' was not registered or was already "
"deleted",
id);
return tl::make_unexpected(ADM_ENOENT);
}
admire::error_code
remove(admire::job_id id) {
abt::unique_lock lock(m_adhoc_storages_mutex);
if(m_adhoc_storages.count(id) != 0) {
m_adhoc_storages.erase(id);
return ADM_SUCCESS;
}
LOGGER_ERROR("Adhoc storage '{}' was not registered or was already "
"deleted",
id);
return ADM_ENOENT;
}
private:
friend class scord::utils::singleton<adhoc_storage_manager>;
adhoc_storage_manager() = default;
mutable abt::shared_mutex m_adhoc_storages_mutex;
std::unordered_map<std::uint64_t, admire::adhoc_storage> m_adhoc_storages;
};
} // namespace scord
#endif // SCORD_ADHOC_STORAGE_MANAGER_HPP
......@@ -29,6 +29,7 @@
#include <api/convert.hpp>
#include "rpc_handlers.hpp"
#include "job_manager.hpp"
#include "adhoc_storage_manager.hpp"
struct remote_procedure {
static std::uint64_t
......@@ -38,17 +39,6 @@ struct remote_procedure {
}
};
struct adhoc_storage_manager {
template <typename... Args>
static admire::adhoc_storage
create(enum admire::adhoc_storage::type type, const std::string& name,
const admire::adhoc_storage::ctx& ctx) {
static std::atomic_uint64_t current_id;
return admire::adhoc_storage(type, name, current_id++, ctx);
}
};
static void
ADM_ping(hg_handle_t h) {
......@@ -274,18 +264,29 @@ ADM_register_adhoc_storage(hg_handle_t h) {
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
name, type, ctx);
const auto adhoc_storage = adhoc_storage_manager::create(type, name, ctx);
auto& adhoc_manager = scord::adhoc_storage_manager::instance();
const auto rv = adhoc_manager.create(type, name, ctx);
admire::error_code rv = ADM_SUCCESS;
admire::error_code ec = ADM_SUCCESS;
out.op_id = rpc_id;
out.retval = rv;
out.id = adhoc_storage.id();
if(rv) {
const auto& adhoc_storage = rv.value();
out.op_id = rpc_id;
out.retval = ec;
out.id = adhoc_storage.id();
} else {
LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: "
"{}\"",
rpc_id, rv.error());
out.op_id = rpc_id;
out.retval = ec;
out.id = 0;
}
LOGGER_INFO("rpc id: {} name: {} to: {} => "
"body: {{retval: {}, id: {}}}",
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
rv, out.id);
ec, out.id);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment