Skip to content
Snippets Groups Projects
Verified Commit 5efa8129 authored by Alberto Miranda's avatar Alberto Miranda :hotsprings:
Browse files

scord: Add job_manager singleton object

parent c94bd4d4
No related branches found
No related tags found
1 merge request!54Resolve "Add `job_manager` component to scord daemon"
Pipeline #3228 passed
......@@ -54,3 +54,8 @@ add_subdirectory(api)
target_include_directories(_api_types INTERFACE
${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::api::types ALIAS _api_types)
add_subdirectory(abt_cxx)
target_include_directories(_abt_cxx INTERFACE
${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::abt_cxx ALIAS _abt_cxx)
################################################################################
# Copyright 2021, Barcelona Supercomputing Center (BSC), Spain #
# #
# This software was partially supported by the EuroHPC-funded project ADMIRE #
# (Project ID: 956748, https://www.admire-eurohpc.eu). #
# #
# This file is part of scord. #
# #
# scord is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# scord is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with scord. If not, see <https://www.gnu.org/licenses/>. #
# #
# SPDX-License-Identifier: GPL-3.0-or-later #
################################################################################
add_library(_abt_cxx STATIC)
target_sources(
_abt_cxx
INTERFACE shared_mutex.hpp
)
target_link_libraries(
_abt_cxx PUBLIC common::logger Argobots::Argobots
)
set_property(TARGET _abt_cxx PROPERTY POSITION_INDEPENDENT_CODE ON)
/******************************************************************************
* Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
*
* This software was partially supported by the EuroHPC-funded project ADMIRE
* (Project ID: 956748, https://www.admire-eurohpc.eu).
*
* This file is part of scord.
*
* scord is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* scord is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with scord. If not, see <https://www.gnu.org/licenses/>.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#include <abt.h>
#include <fmt/format.h>
#include <bits/functexcept.h>
#ifndef SCORD_ABT_SHARED_MUTEX_HPP
#define SCORD_ABT_SHARED_MUTEX_HPP
namespace scord::abt {
#define ABT_RWLOCK_ASSERT(__expr) \
{ \
if(const auto ret = (__expr); ret != ABT_SUCCESS) { \
size_t n; \
ABT_error_get_str(ret, NULL, &n); \
std::vector<char> tmp; \
tmp.reserve(n + 1); \
ABT_error_get_str(ret, tmp.data(), &n); \
\
throw std::runtime_error(fmt::format("{} failed: {} in {}:{}", \
__FUNCTION__, tmp.data(), \
ret, __FILE__, __LINE__)); \
} \
}
class shared_mutex {
public:
explicit shared_mutex() {
ABT_RWLOCK_ASSERT(ABT_rwlock_create(&m_lock));
}
~shared_mutex() noexcept {
ABT_rwlock_free(&m_lock);
}
// copy constructor and copy assignment operator are disabled
shared_mutex(const shared_mutex&) = delete;
shared_mutex&
operator=(const shared_mutex&) = delete;
// Exclusive ownership
void
lock() {
ABT_RWLOCK_ASSERT(ABT_rwlock_wrlock(m_lock));
}
void
unlock() {
ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock));
}
// Shared ownership
void
lock_shared() {
ABT_RWLOCK_ASSERT(ABT_rwlock_rdlock(m_lock));
}
void
unlock_shared() {
ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock));
}
private:
ABT_rwlock m_lock = ABT_RWLOCK_NULL;
};
#undef ABT_RWLOCK_ASSERT
/// unique_lock
template <typename Mutex>
class unique_lock {
public:
typedef Mutex mutex_type;
unique_lock() noexcept : m_device(0), m_owns(false) {}
explicit unique_lock(mutex_type& m)
: m_device(std::__addressof(m)), m_owns(false) {
lock();
m_owns = true;
}
~unique_lock() {
if(m_owns)
unlock();
}
unique_lock(const unique_lock&) = delete;
unique_lock&
operator=(const unique_lock&) = delete;
unique_lock(unique_lock&& u) noexcept
: m_device(u.m_device), m_owns(u.m_owns) {
u.m_device = 0;
u.m_owns = false;
}
unique_lock&
operator=(unique_lock&& u) noexcept {
if(m_owns)
unlock();
unique_lock(std::move(u)).swap(*this);
u.m_device = 0;
u.m_owns = false;
return *this;
}
void
lock() {
if(!m_device) {
throw std::system_error(int(std::errc::operation_not_permitted),
std::system_category());
} else if(m_owns) {
throw std::system_error(
int(std::errc::resource_deadlock_would_occur),
std::system_category());
} else {
m_device->lock();
m_owns = true;
}
}
void
unlock() {
if(!m_owns) {
throw std::system_error(int(std::errc::operation_not_permitted),
std::system_category());
} else if(m_device) {
m_device->unlock();
m_owns = false;
}
}
void
swap(unique_lock& u) noexcept {
std::swap(m_device, u.m_device);
std::swap(m_owns, u.m_owns);
}
mutex_type*
release() noexcept {
mutex_type* ret = m_device;
m_device = 0;
m_owns = false;
return ret;
}
bool
owns_lock() const noexcept {
return m_owns;
}
explicit operator bool() const noexcept {
return owns_lock();
}
mutex_type*
mutex() const noexcept {
return m_device;
}
private:
mutex_type* m_device;
bool m_owns;
};
/// Swap overload for unique_lock objects.
/// @relates unique_lock
template <typename Mutex>
inline void
swap(unique_lock<Mutex>& x, unique_lock<Mutex>& y) noexcept {
x.swap(y);
}
/// shared_lock
template <typename Mutex>
class shared_lock {
public:
typedef Mutex mutex_type;
// Shared locking
shared_lock() noexcept : m_device(nullptr), m_owns(false) {}
explicit shared_lock(mutex_type& m)
: m_device(std::__addressof(m)), m_owns(true) {
m.lock_shared();
}
~shared_lock() {
if(m_owns) {
m_device->unlock_shared();
}
}
shared_lock(shared_lock const&) = delete;
shared_lock&
operator=(shared_lock const&) = delete;
shared_lock(shared_lock&& sl) noexcept : shared_lock() {
swap(sl);
}
shared_lock&
operator=(shared_lock&& sl) noexcept {
shared_lock(std::move(sl)).swap(*this);
return *this;
}
void
lock() {
lockable();
m_device->lock_shared();
m_owns = true;
}
void
unlock() {
if(!m_owns) {
throw std::system_error(
int(std::errc::resource_deadlock_would_occur),
std::system_category());
}
m_device->unlock_shared();
m_owns = false;
}
// Setters
void
swap(shared_lock& u) noexcept {
std::swap(m_device, u.m_device);
std::swap(m_owns, u.m_owns);
}
mutex_type*
release() noexcept {
m_owns = false;
return std::__exchange(m_device, nullptr);
}
// Getters
bool
owns_lock() const noexcept {
return m_owns;
}
explicit operator bool() const noexcept {
return m_owns;
}
mutex_type*
mutex() const noexcept {
return m_device;
}
private:
void
lockable() const {
if(m_device == nullptr) {
throw std::system_error(int(std::errc::operation_not_permitted),
std::system_category());
}
if(m_owns) {
throw std::system_error(
int(std::errc::resource_deadlock_would_occur),
std::system_category());
}
}
mutex_type* m_device;
bool m_owns;
};
/// Swap specialization for shared_lock
/// @relates shared_mutex
template <typename Mutex>
void
swap(shared_lock<Mutex>& x, shared_lock<Mutex>& y) noexcept {
x.swap(y);
}
} // namespace scord::abt
#endif // SCORD_ABT_SHARED_MUTEX_HPP
......@@ -47,6 +47,8 @@ typedef enum {
ADM_EBADARGS,
ADM_ENOMEM,
ADM_EOTHER,
ADM_EEXISTS,
ADM_ENOENT,
ADM_ERR_MAX = 512
} ADM_return_t;
......
......@@ -80,6 +80,8 @@ private:
std::unique_ptr<impl> m_pimpl;
};
struct job_requirements;
struct job {
struct resources {
......@@ -449,6 +451,12 @@ struct fmt::formatter<admire::error_code> : formatter<std::string_view> {
case ADM_EOTHER:
name = "ADM_EOTHER";
break;
case ADM_EEXISTS:
name = "ADM_EEXISTS";
break;
case ADM_ENOENT:
name = "ADM_ENOENT";
break;
default:
break;
}
......
......@@ -1101,7 +1101,6 @@ class job::impl {
public:
explicit impl(job_id id) : m_id(id) {}
impl(const impl& rhs) = default;
impl(impl&& rhs) = default;
impl&
......
......@@ -33,6 +33,28 @@
namespace scord::utils {
template <typename T>
class singleton {
public:
static T&
instance() {
static T s_instance;
return s_instance;
};
singleton(const singleton&) = delete;
singleton(singleton&&) = delete;
singleton&
operator=(const singleton&) = delete;
singleton&
operator=(singleton&&) = delete;
protected:
struct token {};
singleton() = default;
~singleton() = default;
};
uint64_t
parse_size(const std::string& str);
......
......@@ -219,7 +219,7 @@ register_job(const admire::server& srv,
const auto rpc = endp.call("ADM_register_job", &in, &out);
if(out.retval < 0) {
if(out.retval != ADM_SUCCESS) {
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
......@@ -262,7 +262,7 @@ update_job(const server& srv, const job& job,
const auto rpc = endp.call("ADM_update_job", &in, &out);
if(out.retval < 0) {
if(out.retval != ADM_SUCCESS) {
const auto retval = static_cast<admire::error_code>(out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
......@@ -298,7 +298,7 @@ remove_job(const server& srv, const job& job) {
const auto rpc = endp.call("ADM_remove_job", &in, &out);
if(out.retval < 0) {
if(out.retval != ADM_SUCCESS) {
const auto retval = static_cast<admire::error_code>(out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
......@@ -338,7 +338,7 @@ register_adhoc_storage(const server& srv, const std::string& name,
const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out);
if(out.retval < 0) {
if(out.retval != ADM_SUCCESS) {
const auto retval = static_cast<admire::error_code>(out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
......
......@@ -25,7 +25,8 @@
# scord daemon
add_executable(scord)
target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp)
target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp
env.hpp job_manager.hpp)
target_include_directories(
scord
......@@ -40,6 +41,9 @@ target_link_libraries(
common::network::rpc_server
common::network::rpc_types
common::api::types
common::utils
common::abt_cxx
tl::expected
fmt::fmt
Boost::program_options
RedisPlusPlus::RedisPlusPlus
......
/******************************************************************************
* Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
*
* This software was partially supported by the EuroHPC-funded project ADMIRE
* (Project ID: 956748, https://www.admire-eurohpc.eu).
*
* This file is part of scord.
*
* scord is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* scord is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with scord. If not, see <https://www.gnu.org/licenses/>.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#ifndef SCORD_JOB_MANAGER_HPP
#define SCORD_JOB_MANAGER_HPP
#include <admire_types.hpp>
#include <atomic>
#include <utility>
#include <utils/utils.hpp>
#include <unordered_map>
#include <mutex>
#include <shared_mutex>
#include <tl/expected.hpp>
#include <logger/logger.hpp>
#include <abt_cxx/shared_mutex.hpp>
namespace scord {
struct job_info {
explicit job_info(admire::job job) : m_job(std::move(job)) {}
job_info(admire::job job, admire::job::resources resources,
admire::job_requirements requirements)
: m_job(std::move(job)), m_resources(std::move(resources)),
m_requirements(std::move(requirements)) {}
admire::job
job() const {
return m_job;
}
std::optional<admire::job::resources>
resources() const {
return m_resources;
}
std::optional<admire::job_requirements>
requirements() const {
return m_requirements;
}
admire::job m_job;
std::optional<admire::job::resources> m_resources;
std::optional<admire::job_requirements> m_requirements;
};
struct job_manager : scord::utils::singleton<job_manager> {
tl::expected<job_info, admire::error_code>
create(admire::job::resources job_resources,
admire::job_requirements job_requirements) {
abt::unique_lock lock(m_jobs_mutex);
static std::atomic_uint64_t current_id;
admire::job_id id = current_id++;
if(const auto it = m_jobs.find(id); it == m_jobs.end()) {
const auto& [it_job, inserted] = m_jobs.emplace(
id, job_info{admire::job{id}, std::move(job_resources),
std::move(job_requirements)});
if(!inserted) {
LOGGER_ERROR("{}: Emplace failed", __FUNCTION__);
return tl::make_unexpected(ADM_ESNAFU);
}
return it_job->second;
}
LOGGER_ERROR("{}: Job '{}' already exists", __FUNCTION__, id);
return tl::make_unexpected(ADM_EEXISTS);
}
admire::error_code
update(admire::job_id id, admire::job::resources job_resources,
admire::job_requirements job_requirements) {
abt::unique_lock lock(m_jobs_mutex);
if(const auto it = m_jobs.find(id); it != m_jobs.end()) {
it->second = job_info{admire::job{id}, std::move(job_resources),
std::move(job_requirements)};
return ADM_SUCCESS;
}
LOGGER_ERROR("{}: Job '{}' does not exist", __FUNCTION__, id);
return ADM_ENOENT;
}
tl::expected<job_info, admire::error_code>
find(admire::job_id id) {
abt::shared_lock lock(m_jobs_mutex);
if(auto it = m_jobs.find(id); it != m_jobs.end()) {
return it->second;
}
LOGGER_ERROR("Job '{}' was not registered or was already deleted", id);
return tl::make_unexpected(ADM_ENOENT);
}
admire::error_code
remove(admire::job_id id) {
abt::unique_lock lock(m_jobs_mutex);
if(m_jobs.count(id) != 0) {
m_jobs.erase(id);
return ADM_SUCCESS;
}
LOGGER_ERROR("Job '{}' was not registered or was already deleted", id);
return ADM_ENOENT;
}
private:
friend class scord::utils::singleton<job_manager>;
job_manager() = default;
mutable std::shared_mutex m_jobs_mutex;
std::unordered_map<admire::job_id, job_info> m_jobs;
};
} // namespace scord
#endif // SCORD_JOB_MANAGER_HPP
......@@ -28,6 +28,7 @@
#include <admire.hpp>
#include <api/convert.hpp>
#include "rpc_handlers.hpp"
#include "job_manager.hpp"
struct remote_procedure {
static std::uint64_t
......@@ -99,24 +100,34 @@ ADM_register_job(hg_handle_t h) {
const admire::job_requirements reqs(&in.reqs);
const admire::job::resources job_resources(in.job_resources);
const auto id = remote_procedure::new_id();
const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job_resources: {}, job_requirements: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
job_resources, reqs);
const auto job = admire::job{42};
admire::error_code ec = ADM_SUCCESS;
admire::error_code rv = ADM_SUCCESS;
auto& jm = scord::job_manager::instance();
const auto rv = jm.create(job_resources, reqs);
out.op_id = id;
out.retval = rv;
out.job = admire::api::convert(job).release();
if(rv) {
const auto& job = rv->job();
out.op_id = rpc_id;
out.retval = ec;
out.job = admire::api::convert(job).release();
} else {
LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc_id,
rv.error());
out.op_id = rpc_id;
out.retval = rv.error();
out.job = nullptr;
}
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}, job: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
job);
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ec, rv ? fmt::format("{}", rv->job()) : "none");
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......@@ -150,19 +161,27 @@ ADM_update_job(hg_handle_t h) {
const admire::job::resources job_resources(in.job_resources);
const admire::job_requirements reqs(&in.reqs);
const auto id = remote_procedure::new_id();
const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}, job_resources: {}, job_requirements: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job,
job_resources, reqs);
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
job, job_resources, reqs);
admire::error_code rv = ADM_SUCCESS;
out.op_id = id;
out.retval = rv;
auto& jm = scord::job_manager::instance();
const auto ec = jm.update(job.id(), job_resources, reqs);
if(ec != ADM_SUCCESS) {
LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id,
ec);
}
out.op_id = rpc_id;
out.retval = ec;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv);
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ec);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......@@ -194,19 +213,27 @@ ADM_remove_job(hg_handle_t h) {
const admire::job job(in.job);
const auto id = remote_procedure::new_id();
const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
job);
admire::error_code rv = ADM_SUCCESS;
out.op_id = id;
out.retval = rv;
auto& jm = scord::job_manager::instance();
const auto ec = jm.remove(job.id());
if(ec != ADM_SUCCESS) {
LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id,
ec);
}
out.op_id = rpc_id;
out.retval = ec;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv);
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ec);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment