diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 81604dcee5f3c5987bee3b6c3eb84f1f2e94cb29..01004e9624d9a807d0dc5a75ced1a95e29aa3e48 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -54,3 +54,8 @@ add_subdirectory(api) target_include_directories(_api_types INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}) add_library(common::api::types ALIAS _api_types) + +add_subdirectory(abt_cxx) +target_include_directories(_abt_cxx INTERFACE + ${CMAKE_CURRENT_SOURCE_DIR}) +add_library(common::abt_cxx ALIAS _abt_cxx) diff --git a/src/common/abt_cxx/CMakeLists.txt b/src/common/abt_cxx/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..9c3ac0975171dcf7c376e65ffd599540aaeb3a89 --- /dev/null +++ b/src/common/abt_cxx/CMakeLists.txt @@ -0,0 +1,34 @@ +################################################################################ +# Copyright 2021, Barcelona Supercomputing Center (BSC), Spain # +# # +# This software was partially supported by the EuroHPC-funded project ADMIRE # +# (Project ID: 956748, https://www.admire-eurohpc.eu). # +# # +# This file is part of scord. # +# # +# scord is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# scord is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with scord. If not, see . # +# # +# SPDX-License-Identifier: GPL-3.0-or-later # +################################################################################ + +add_library(_abt_cxx STATIC) +target_sources( + _abt_cxx + INTERFACE shared_mutex.hpp +) + +target_link_libraries( + _abt_cxx PUBLIC common::logger Argobots::Argobots +) +set_property(TARGET _abt_cxx PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/src/common/abt_cxx/shared_mutex.hpp b/src/common/abt_cxx/shared_mutex.hpp new file mode 100644 index 0000000000000000000000000000000000000000..59964fb20e6888e71b0b57ebe1cc9f0d3775b667 --- /dev/null +++ b/src/common/abt_cxx/shared_mutex.hpp @@ -0,0 +1,316 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include + +#ifndef SCORD_ABT_SHARED_MUTEX_HPP +#define SCORD_ABT_SHARED_MUTEX_HPP + +namespace scord::abt { + +#define ABT_RWLOCK_ASSERT(__expr) \ + { \ + if(const auto ret = (__expr); ret != ABT_SUCCESS) { \ + size_t n; \ + ABT_error_get_str(ret, NULL, &n); \ + std::vector tmp; \ + tmp.reserve(n + 1); \ + ABT_error_get_str(ret, tmp.data(), &n); \ + \ + throw std::runtime_error(fmt::format("{} failed: {} in {}:{}", \ + __FUNCTION__, tmp.data(), \ + ret, __FILE__, __LINE__)); \ + } \ + } + +class shared_mutex { +public: + explicit shared_mutex() { + ABT_RWLOCK_ASSERT(ABT_rwlock_create(&m_lock)); + } + + ~shared_mutex() noexcept { + ABT_rwlock_free(&m_lock); + } + + // copy constructor and copy assignment operator are disabled + shared_mutex(const shared_mutex&) = delete; + + shared_mutex& + operator=(const shared_mutex&) = delete; + + // Exclusive ownership + + void + lock() { + ABT_RWLOCK_ASSERT(ABT_rwlock_wrlock(m_lock)); + } + + void + unlock() { + ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock)); + } + + // Shared ownership + + void + lock_shared() { + ABT_RWLOCK_ASSERT(ABT_rwlock_rdlock(m_lock)); + } + + void + unlock_shared() { + ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock)); + } + +private: + ABT_rwlock m_lock = ABT_RWLOCK_NULL; +}; + +#undef ABT_RWLOCK_ASSERT + + +/// unique_lock +template +class unique_lock { +public: + typedef Mutex mutex_type; + + unique_lock() noexcept : m_device(0), m_owns(false) {} + + explicit unique_lock(mutex_type& m) + : m_device(std::__addressof(m)), m_owns(false) { + lock(); + m_owns = true; + } + + ~unique_lock() { + if(m_owns) + unlock(); + } + + unique_lock(const unique_lock&) = delete; + unique_lock& + operator=(const unique_lock&) = delete; + + unique_lock(unique_lock&& u) noexcept + : m_device(u.m_device), m_owns(u.m_owns) { + u.m_device = 0; + u.m_owns = false; + } + + unique_lock& + operator=(unique_lock&& u) noexcept { + if(m_owns) + unlock(); + + unique_lock(std::move(u)).swap(*this); + + u.m_device = 0; + u.m_owns = false; + + return *this; + } + + void + lock() { + if(!m_device) { + throw std::system_error(int(std::errc::operation_not_permitted), + std::system_category()); + } else if(m_owns) { + throw std::system_error( + int(std::errc::resource_deadlock_would_occur), + std::system_category()); + } else { + m_device->lock(); + m_owns = true; + } + } + + void + unlock() { + if(!m_owns) { + throw std::system_error(int(std::errc::operation_not_permitted), + std::system_category()); + } else if(m_device) { + m_device->unlock(); + m_owns = false; + } + } + + void + swap(unique_lock& u) noexcept { + std::swap(m_device, u.m_device); + std::swap(m_owns, u.m_owns); + } + + mutex_type* + release() noexcept { + mutex_type* ret = m_device; + m_device = 0; + m_owns = false; + return ret; + } + + bool + owns_lock() const noexcept { + return m_owns; + } + + explicit operator bool() const noexcept { + return owns_lock(); + } + + mutex_type* + mutex() const noexcept { + return m_device; + } + +private: + mutex_type* m_device; + bool m_owns; +}; + +/// Swap overload for unique_lock objects. +/// @relates unique_lock +template +inline void +swap(unique_lock& x, unique_lock& y) noexcept { + x.swap(y); +} + +/// shared_lock +template +class shared_lock { +public: + typedef Mutex mutex_type; + + // Shared locking + + shared_lock() noexcept : m_device(nullptr), m_owns(false) {} + + explicit shared_lock(mutex_type& m) + : m_device(std::__addressof(m)), m_owns(true) { + m.lock_shared(); + } + + ~shared_lock() { + if(m_owns) { + m_device->unlock_shared(); + } + } + + shared_lock(shared_lock const&) = delete; + shared_lock& + operator=(shared_lock const&) = delete; + + shared_lock(shared_lock&& sl) noexcept : shared_lock() { + swap(sl); + } + + shared_lock& + operator=(shared_lock&& sl) noexcept { + shared_lock(std::move(sl)).swap(*this); + return *this; + } + + void + lock() { + lockable(); + m_device->lock_shared(); + m_owns = true; + } + + void + unlock() { + if(!m_owns) { + throw std::system_error( + int(std::errc::resource_deadlock_would_occur), + std::system_category()); + } + m_device->unlock_shared(); + m_owns = false; + } + + // Setters + + void + swap(shared_lock& u) noexcept { + std::swap(m_device, u.m_device); + std::swap(m_owns, u.m_owns); + } + + mutex_type* + release() noexcept { + m_owns = false; + return std::__exchange(m_device, nullptr); + } + + // Getters + + bool + owns_lock() const noexcept { + return m_owns; + } + + explicit operator bool() const noexcept { + return m_owns; + } + + mutex_type* + mutex() const noexcept { + return m_device; + } + +private: + void + lockable() const { + if(m_device == nullptr) { + throw std::system_error(int(std::errc::operation_not_permitted), + std::system_category()); + } + if(m_owns) { + throw std::system_error( + int(std::errc::resource_deadlock_would_occur), + std::system_category()); + } + } + + mutex_type* m_device; + bool m_owns; +}; + +/// Swap specialization for shared_lock +/// @relates shared_mutex +template +void +swap(shared_lock& x, shared_lock& y) noexcept { + x.swap(y); +} + +} // namespace scord::abt + +#endif // SCORD_ABT_SHARED_MUTEX_HPP diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h index 37a61db1dcceac7c6579d7c3ad853e1e1ef5e9d1..cfb056a76dcb32d04f3bef894a06dff3d78c07a3 100644 --- a/src/common/api/admire_types.h +++ b/src/common/api/admire_types.h @@ -47,6 +47,8 @@ typedef enum { ADM_EBADARGS, ADM_ENOMEM, ADM_EOTHER, + ADM_EEXISTS, + ADM_ENOENT, ADM_ERR_MAX = 512 } ADM_return_t; diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp index 3ca979b32889464766d9380bd53450ecb1c84d8f..e8d60befeda3a3f674b016d4cb61472145519252 100644 --- a/src/common/api/admire_types.hpp +++ b/src/common/api/admire_types.hpp @@ -80,6 +80,8 @@ private: std::unique_ptr m_pimpl; }; +struct job_requirements; + struct job { struct resources { @@ -449,6 +451,12 @@ struct fmt::formatter : formatter { case ADM_EOTHER: name = "ADM_EOTHER"; break; + case ADM_EEXISTS: + name = "ADM_EEXISTS"; + break; + case ADM_ENOENT: + name = "ADM_ENOENT"; + break; default: break; } diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp index 644e06730f4ee769a173ec2b25d04eef09ba2ccb..6dfda147cc9b1e3b07886531c497106d1212abc9 100644 --- a/src/common/api/types.cpp +++ b/src/common/api/types.cpp @@ -1101,7 +1101,6 @@ class job::impl { public: explicit impl(job_id id) : m_id(id) {} - impl(const impl& rhs) = default; impl(impl&& rhs) = default; impl& diff --git a/src/common/utils/utils.hpp b/src/common/utils/utils.hpp index 06f76fbca9ab841ff6a5a97f3c2953727a941f4a..9a23ea50e794c8789deb895091216c3202510cb8 100644 --- a/src/common/utils/utils.hpp +++ b/src/common/utils/utils.hpp @@ -33,6 +33,28 @@ namespace scord::utils { +template +class singleton { +public: + static T& + instance() { + static T s_instance; + return s_instance; + }; + + singleton(const singleton&) = delete; + singleton(singleton&&) = delete; + singleton& + operator=(const singleton&) = delete; + singleton& + operator=(singleton&&) = delete; + +protected: + struct token {}; + singleton() = default; + ~singleton() = default; +}; + uint64_t parse_size(const std::string& str); diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 770db97ffa4a00f218ab3935d2697bd6e43c5180..3782210394a9c2263df11251faa430d114b76257 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -219,7 +219,7 @@ register_job(const admire::server& srv, const auto rpc = endp.call("ADM_register_job", &in, &out); - if(out.retval < 0) { + if(out.retval != ADM_SUCCESS) { LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {} [op_id: {}]", rpc_id, std::quoted("ADM_"s + __FUNCTION__), @@ -262,7 +262,7 @@ update_job(const server& srv, const job& job, const auto rpc = endp.call("ADM_update_job", &in, &out); - if(out.retval < 0) { + if(out.retval != ADM_SUCCESS) { const auto retval = static_cast(out.retval); LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", @@ -298,7 +298,7 @@ remove_job(const server& srv, const job& job) { const auto rpc = endp.call("ADM_remove_job", &in, &out); - if(out.retval < 0) { + if(out.retval != ADM_SUCCESS) { const auto retval = static_cast(out.retval); LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", @@ -338,7 +338,7 @@ register_adhoc_storage(const server& srv, const std::string& name, const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out); - if(out.retval < 0) { + if(out.retval != ADM_SUCCESS) { const auto retval = static_cast(out.retval); LOGGER_ERROR("rpc id: {} name: {} from: {} <= " "body: {{retval: {}}} [op_id: {}]", diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 6dca1f758270c0a72f659240bd024dcbf3585d76..26a4f33e0a64cc6de5c83abd8fe8bb0c68f9a483 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -25,7 +25,8 @@ # scord daemon add_executable(scord) -target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp) +target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp + env.hpp job_manager.hpp) target_include_directories( scord @@ -40,6 +41,9 @@ target_link_libraries( common::network::rpc_server common::network::rpc_types common::api::types + common::utils + common::abt_cxx + tl::expected fmt::fmt Boost::program_options RedisPlusPlus::RedisPlusPlus diff --git a/src/scord/job_manager.hpp b/src/scord/job_manager.hpp new file mode 100644 index 0000000000000000000000000000000000000000..214673960b64a90745ab9d9b6d84c8e6adc4417f --- /dev/null +++ b/src/scord/job_manager.hpp @@ -0,0 +1,152 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_JOB_MANAGER_HPP +#define SCORD_JOB_MANAGER_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace scord { + +struct job_info { + explicit job_info(admire::job job) : m_job(std::move(job)) {} + job_info(admire::job job, admire::job::resources resources, + admire::job_requirements requirements) + : m_job(std::move(job)), m_resources(std::move(resources)), + m_requirements(std::move(requirements)) {} + + admire::job + job() const { + return m_job; + } + + std::optional + resources() const { + return m_resources; + } + + std::optional + requirements() const { + return m_requirements; + } + + admire::job m_job; + std::optional m_resources; + std::optional m_requirements; +}; + +struct job_manager : scord::utils::singleton { + + + tl::expected + create(admire::job::resources job_resources, + admire::job_requirements job_requirements) { + + abt::unique_lock lock(m_jobs_mutex); + + static std::atomic_uint64_t current_id; + admire::job_id id = current_id++; + + if(const auto it = m_jobs.find(id); it == m_jobs.end()) { + const auto& [it_job, inserted] = m_jobs.emplace( + id, job_info{admire::job{id}, std::move(job_resources), + std::move(job_requirements)}); + + if(!inserted) { + LOGGER_ERROR("{}: Emplace failed", __FUNCTION__); + return tl::make_unexpected(ADM_ESNAFU); + } + + return it_job->second; + } + + LOGGER_ERROR("{}: Job '{}' already exists", __FUNCTION__, id); + return tl::make_unexpected(ADM_EEXISTS); + } + + admire::error_code + update(admire::job_id id, admire::job::resources job_resources, + admire::job_requirements job_requirements) { + + abt::unique_lock lock(m_jobs_mutex); + + if(const auto it = m_jobs.find(id); it != m_jobs.end()) { + it->second = job_info{admire::job{id}, std::move(job_resources), + std::move(job_requirements)}; + return ADM_SUCCESS; + } + + LOGGER_ERROR("{}: Job '{}' does not exist", __FUNCTION__, id); + return ADM_ENOENT; + } + + tl::expected + find(admire::job_id id) { + + abt::shared_lock lock(m_jobs_mutex); + + if(auto it = m_jobs.find(id); it != m_jobs.end()) { + return it->second; + } + + LOGGER_ERROR("Job '{}' was not registered or was already deleted", id); + return tl::make_unexpected(ADM_ENOENT); + } + + admire::error_code + remove(admire::job_id id) { + + abt::unique_lock lock(m_jobs_mutex); + + if(m_jobs.count(id) != 0) { + m_jobs.erase(id); + return ADM_SUCCESS; + } + + LOGGER_ERROR("Job '{}' was not registered or was already deleted", id); + + return ADM_ENOENT; + } + +private: + friend class scord::utils::singleton; + job_manager() = default; + + mutable std::shared_mutex m_jobs_mutex; + std::unordered_map m_jobs; +}; + +} // namespace scord + + +#endif // SCORD_JOB_MANAGER_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 2c65181fa922b7ebb44682f16046f214ab93c98b..497d728afa3d7550d256b92ff2969921f7b6a088 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -28,6 +28,7 @@ #include #include #include "rpc_handlers.hpp" +#include "job_manager.hpp" struct remote_procedure { static std::uint64_t @@ -99,24 +100,34 @@ ADM_register_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const admire::job::resources job_resources(in.job_resources); - const auto id = remote_procedure::new_id(); + const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job_resources: {}, job_requirements: {}}}", - id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job_resources, reqs); - const auto job = admire::job{42}; + admire::error_code ec = ADM_SUCCESS; - admire::error_code rv = ADM_SUCCESS; + auto& jm = scord::job_manager::instance(); + const auto rv = jm.create(job_resources, reqs); - out.op_id = id; - out.retval = rv; - out.job = admire::api::convert(job).release(); + if(rv) { + const auto& job = rv->job(); + out.op_id = rpc_id; + out.retval = ec; + out.job = admire::api::convert(job).release(); + } else { + LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc_id, + rv.error()); + out.op_id = rpc_id; + out.retval = rv.error(); + out.job = nullptr; + } LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}, job: {}}}", - id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv, - job); + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec, rv ? fmt::format("{}", rv->job()) : "none"); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -150,19 +161,27 @@ ADM_update_job(hg_handle_t h) { const admire::job::resources job_resources(in.job_resources); const admire::job_requirements reqs(&in.reqs); - const auto id = remote_procedure::new_id(); + const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}, job_resources: {}, job_requirements: {}}}", - id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job, - job_resources, reqs); + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + job, job_resources, reqs); - admire::error_code rv = ADM_SUCCESS; - out.op_id = id; - out.retval = rv; + auto& jm = scord::job_manager::instance(); + const auto ec = jm.update(job.id(), job_resources, reqs); + + if(ec != ADM_SUCCESS) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, + ec); + } + + out.op_id = rpc_id; + out.retval = ec; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", - id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv); + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -194,19 +213,27 @@ ADM_remove_job(hg_handle_t h) { const admire::job job(in.job); - const auto id = remote_procedure::new_id(); + const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " "body: {{job: {}}}", - id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job); - admire::error_code rv = ADM_SUCCESS; - out.op_id = id; - out.retval = rv; + auto& jm = scord::job_manager::instance(); + const auto ec = jm.remove(job.id()); + + if(ec != ADM_SUCCESS) { + LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, + ec); + } + + out.op_id = rpc_id; + out.retval = ec; LOGGER_INFO("rpc id: {} name: {} to: {} <= " "body: {{retval: {}}}", - id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv); + rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ec); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS);