Verified Commit 9959831b authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

scord: Improve adhoc_storage_manager

parent 8ca3dcef
Loading
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -26,7 +26,7 @@
add_executable(scord)

target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp
  env.hpp job_manager.hpp)
  env.hpp job_manager.hpp adhoc_storage_manager.hpp)

target_include_directories(
  scord
+129 −0
Original line number Diff line number Diff line
/******************************************************************************
 * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
 *
 * This software was partially supported by the EuroHPC-funded project ADMIRE
 *   (Project ID: 956748, https://www.admire-eurohpc.eu).
 *
 * This file is part of scord.
 *
 * scord is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * scord is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with scord.  If not, see <https://www.gnu.org/licenses/>.
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *****************************************************************************/


#ifndef SCORD_ADHOC_STORAGE_MANAGER_HPP
#define SCORD_ADHOC_STORAGE_MANAGER_HPP

#include <admire_types.hpp>
#include <utils/utils.hpp>
#include <unordered_map>
#include <abt_cxx/shared_mutex.hpp>
#include <tl/expected.hpp>
#include <atomic>
#include <logger/logger.hpp>

namespace scord {

struct adhoc_storage_manager : scord::utils::singleton<adhoc_storage_manager> {

    tl::expected<admire::adhoc_storage, admire::error_code>
    create(enum admire::adhoc_storage::type type, const std::string& name,
           const admire::adhoc_storage::ctx& ctx) {

        static std::atomic_uint64_t current_id;
        admire::job_id id = current_id++;

        abt::unique_lock lock(m_adhoc_storages_mutex);

        if(const auto it = m_adhoc_storages.find(id);
           it == m_adhoc_storages.end()) {
            const auto& [it_adhoc, inserted] = m_adhoc_storages.emplace(
                    id, admire::adhoc_storage{type, name, current_id++, ctx});

            if(!inserted) {
                LOGGER_ERROR("{}: Emplace failed", __FUNCTION__);
                return tl::make_unexpected(ADM_ESNAFU);
            }

            return it_adhoc->second;
        }

        LOGGER_ERROR("{}: Adhoc storage '{}' already exists", __FUNCTION__, id);
        return tl::make_unexpected(ADM_EEXISTS);
    }

    admire::error_code
    update(std::uint64_t id, admire::adhoc_storage::ctx ctx) {

        abt::unique_lock lock(m_adhoc_storages_mutex);

        if(const auto it = m_adhoc_storages.find(id);
           it != m_adhoc_storages.end()) {

            const auto& current_adhoc = it->second;

            it->second = admire::adhoc_storage{
                    current_adhoc.type(), current_adhoc.name(),
                    current_adhoc.id(), std::move(ctx)};
            return ADM_SUCCESS;
        }

        LOGGER_ERROR("{}: Adhoc storage '{}' does not exist", __FUNCTION__, id);
        return ADM_ENOENT;
    }

    tl::expected<admire::adhoc_storage, admire::error_code>
    find(admire::job_id id) {

        abt::shared_lock lock(m_adhoc_storages_mutex);

        if(auto it = m_adhoc_storages.find(id); it != m_adhoc_storages.end()) {
            return it->second;
        }

        LOGGER_ERROR("Adhoc storage '{}' was not registered or was already "
                     "deleted",
                     id);
        return tl::make_unexpected(ADM_ENOENT);
    }

    admire::error_code
    remove(admire::job_id id) {

        abt::unique_lock lock(m_adhoc_storages_mutex);

        if(m_adhoc_storages.count(id) != 0) {
            m_adhoc_storages.erase(id);
            return ADM_SUCCESS;
        }

        LOGGER_ERROR("Adhoc storage '{}' was not registered or was already "
                     "deleted",
                     id);

        return ADM_ENOENT;
    }

private:
    friend class scord::utils::singleton<adhoc_storage_manager>;
    adhoc_storage_manager() = default;

    mutable abt::shared_mutex m_adhoc_storages_mutex;
    std::unordered_map<std::uint64_t, admire::adhoc_storage> m_adhoc_storages;
};

} // namespace scord

#endif // SCORD_ADHOC_STORAGE_MANAGER_HPP
+18 −17
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@
#include <api/convert.hpp>
#include "rpc_handlers.hpp"
#include "job_manager.hpp"
#include "adhoc_storage_manager.hpp"

struct remote_procedure {
    static std::uint64_t
@@ -38,17 +39,6 @@ struct remote_procedure {
    }
};

struct adhoc_storage_manager {

    template <typename... Args>
    static admire::adhoc_storage
    create(enum admire::adhoc_storage::type type, const std::string& name,
           const admire::adhoc_storage::ctx& ctx) {
        static std::atomic_uint64_t current_id;
        return admire::adhoc_storage(type, name, current_id++, ctx);
    }
};

static void
ADM_ping(hg_handle_t h) {

@@ -274,18 +264,29 @@ ADM_register_adhoc_storage(hg_handle_t h) {
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                name, type, ctx);

    const auto adhoc_storage = adhoc_storage_manager::create(type, name, ctx);
    auto& adhoc_manager = scord::adhoc_storage_manager::instance();
    const auto rv = adhoc_manager.create(type, name, ctx);

    admire::error_code rv = ADM_SUCCESS;
    admire::error_code ec = ADM_SUCCESS;

    if(rv) {
        const auto& adhoc_storage = rv.value();
        out.op_id = rpc_id;
    out.retval = rv;
        out.retval = ec;
        out.id = adhoc_storage.id();
    } else {
        LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: "
                     "{}\"",
                     rpc_id, rv.error());
        out.op_id = rpc_id;
        out.retval = ec;
        out.id = 0;
    }

    LOGGER_INFO("rpc id: {} name: {} to: {} => "
                "body: {{retval: {}, id: {}}}",
                rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
                rv, out.id);
                ec, out.id);

    ret = margo_respond(h, &out);
    assert(ret == HG_SUCCESS);