diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt
index 81604dcee5f3c5987bee3b6c3eb84f1f2e94cb29..01004e9624d9a807d0dc5a75ced1a95e29aa3e48 100644
--- a/src/common/CMakeLists.txt
+++ b/src/common/CMakeLists.txt
@@ -54,3 +54,8 @@ add_subdirectory(api)
target_include_directories(_api_types INTERFACE
${CMAKE_CURRENT_SOURCE_DIR})
add_library(common::api::types ALIAS _api_types)
+
+add_subdirectory(abt_cxx)
+target_include_directories(_abt_cxx INTERFACE
+ ${CMAKE_CURRENT_SOURCE_DIR})
+add_library(common::abt_cxx ALIAS _abt_cxx)
diff --git a/src/common/abt_cxx/CMakeLists.txt b/src/common/abt_cxx/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..9c3ac0975171dcf7c376e65ffd599540aaeb3a89
--- /dev/null
+++ b/src/common/abt_cxx/CMakeLists.txt
@@ -0,0 +1,34 @@
+################################################################################
+# Copyright 2021, Barcelona Supercomputing Center (BSC), Spain #
+# #
+# This software was partially supported by the EuroHPC-funded project ADMIRE #
+# (Project ID: 956748, https://www.admire-eurohpc.eu). #
+# #
+# This file is part of scord. #
+# #
+# scord is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# scord is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with scord. If not, see . #
+# #
+# SPDX-License-Identifier: GPL-3.0-or-later #
+################################################################################
+
+add_library(_abt_cxx STATIC)
+target_sources(
+ _abt_cxx
+ INTERFACE shared_mutex.hpp
+)
+
+target_link_libraries(
+ _abt_cxx PUBLIC common::logger Argobots::Argobots
+)
+set_property(TARGET _abt_cxx PROPERTY POSITION_INDEPENDENT_CODE ON)
diff --git a/src/common/abt_cxx/shared_mutex.hpp b/src/common/abt_cxx/shared_mutex.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..59964fb20e6888e71b0b57ebe1cc9f0d3775b667
--- /dev/null
+++ b/src/common/abt_cxx/shared_mutex.hpp
@@ -0,0 +1,316 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#include
+#include
+#include
+
+#ifndef SCORD_ABT_SHARED_MUTEX_HPP
+#define SCORD_ABT_SHARED_MUTEX_HPP
+
+namespace scord::abt {
+
+#define ABT_RWLOCK_ASSERT(__expr) \
+ { \
+ if(const auto ret = (__expr); ret != ABT_SUCCESS) { \
+ size_t n; \
+ ABT_error_get_str(ret, NULL, &n); \
+ std::vector tmp; \
+ tmp.reserve(n + 1); \
+ ABT_error_get_str(ret, tmp.data(), &n); \
+ \
+ throw std::runtime_error(fmt::format("{} failed: {} in {}:{}", \
+ __FUNCTION__, tmp.data(), \
+ ret, __FILE__, __LINE__)); \
+ } \
+ }
+
+class shared_mutex {
+public:
+ explicit shared_mutex() {
+ ABT_RWLOCK_ASSERT(ABT_rwlock_create(&m_lock));
+ }
+
+ ~shared_mutex() noexcept {
+ ABT_rwlock_free(&m_lock);
+ }
+
+ // copy constructor and copy assignment operator are disabled
+ shared_mutex(const shared_mutex&) = delete;
+
+ shared_mutex&
+ operator=(const shared_mutex&) = delete;
+
+ // Exclusive ownership
+
+ void
+ lock() {
+ ABT_RWLOCK_ASSERT(ABT_rwlock_wrlock(m_lock));
+ }
+
+ void
+ unlock() {
+ ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock));
+ }
+
+ // Shared ownership
+
+ void
+ lock_shared() {
+ ABT_RWLOCK_ASSERT(ABT_rwlock_rdlock(m_lock));
+ }
+
+ void
+ unlock_shared() {
+ ABT_RWLOCK_ASSERT(ABT_rwlock_unlock(m_lock));
+ }
+
+private:
+ ABT_rwlock m_lock = ABT_RWLOCK_NULL;
+};
+
+#undef ABT_RWLOCK_ASSERT
+
+
+/// unique_lock
+template
+class unique_lock {
+public:
+ typedef Mutex mutex_type;
+
+ unique_lock() noexcept : m_device(0), m_owns(false) {}
+
+ explicit unique_lock(mutex_type& m)
+ : m_device(std::__addressof(m)), m_owns(false) {
+ lock();
+ m_owns = true;
+ }
+
+ ~unique_lock() {
+ if(m_owns)
+ unlock();
+ }
+
+ unique_lock(const unique_lock&) = delete;
+ unique_lock&
+ operator=(const unique_lock&) = delete;
+
+ unique_lock(unique_lock&& u) noexcept
+ : m_device(u.m_device), m_owns(u.m_owns) {
+ u.m_device = 0;
+ u.m_owns = false;
+ }
+
+ unique_lock&
+ operator=(unique_lock&& u) noexcept {
+ if(m_owns)
+ unlock();
+
+ unique_lock(std::move(u)).swap(*this);
+
+ u.m_device = 0;
+ u.m_owns = false;
+
+ return *this;
+ }
+
+ void
+ lock() {
+ if(!m_device) {
+ throw std::system_error(int(std::errc::operation_not_permitted),
+ std::system_category());
+ } else if(m_owns) {
+ throw std::system_error(
+ int(std::errc::resource_deadlock_would_occur),
+ std::system_category());
+ } else {
+ m_device->lock();
+ m_owns = true;
+ }
+ }
+
+ void
+ unlock() {
+ if(!m_owns) {
+ throw std::system_error(int(std::errc::operation_not_permitted),
+ std::system_category());
+ } else if(m_device) {
+ m_device->unlock();
+ m_owns = false;
+ }
+ }
+
+ void
+ swap(unique_lock& u) noexcept {
+ std::swap(m_device, u.m_device);
+ std::swap(m_owns, u.m_owns);
+ }
+
+ mutex_type*
+ release() noexcept {
+ mutex_type* ret = m_device;
+ m_device = 0;
+ m_owns = false;
+ return ret;
+ }
+
+ bool
+ owns_lock() const noexcept {
+ return m_owns;
+ }
+
+ explicit operator bool() const noexcept {
+ return owns_lock();
+ }
+
+ mutex_type*
+ mutex() const noexcept {
+ return m_device;
+ }
+
+private:
+ mutex_type* m_device;
+ bool m_owns;
+};
+
+/// Swap overload for unique_lock objects.
+/// @relates unique_lock
+template
+inline void
+swap(unique_lock& x, unique_lock& y) noexcept {
+ x.swap(y);
+}
+
+/// shared_lock
+template
+class shared_lock {
+public:
+ typedef Mutex mutex_type;
+
+ // Shared locking
+
+ shared_lock() noexcept : m_device(nullptr), m_owns(false) {}
+
+ explicit shared_lock(mutex_type& m)
+ : m_device(std::__addressof(m)), m_owns(true) {
+ m.lock_shared();
+ }
+
+ ~shared_lock() {
+ if(m_owns) {
+ m_device->unlock_shared();
+ }
+ }
+
+ shared_lock(shared_lock const&) = delete;
+ shared_lock&
+ operator=(shared_lock const&) = delete;
+
+ shared_lock(shared_lock&& sl) noexcept : shared_lock() {
+ swap(sl);
+ }
+
+ shared_lock&
+ operator=(shared_lock&& sl) noexcept {
+ shared_lock(std::move(sl)).swap(*this);
+ return *this;
+ }
+
+ void
+ lock() {
+ lockable();
+ m_device->lock_shared();
+ m_owns = true;
+ }
+
+ void
+ unlock() {
+ if(!m_owns) {
+ throw std::system_error(
+ int(std::errc::resource_deadlock_would_occur),
+ std::system_category());
+ }
+ m_device->unlock_shared();
+ m_owns = false;
+ }
+
+ // Setters
+
+ void
+ swap(shared_lock& u) noexcept {
+ std::swap(m_device, u.m_device);
+ std::swap(m_owns, u.m_owns);
+ }
+
+ mutex_type*
+ release() noexcept {
+ m_owns = false;
+ return std::__exchange(m_device, nullptr);
+ }
+
+ // Getters
+
+ bool
+ owns_lock() const noexcept {
+ return m_owns;
+ }
+
+ explicit operator bool() const noexcept {
+ return m_owns;
+ }
+
+ mutex_type*
+ mutex() const noexcept {
+ return m_device;
+ }
+
+private:
+ void
+ lockable() const {
+ if(m_device == nullptr) {
+ throw std::system_error(int(std::errc::operation_not_permitted),
+ std::system_category());
+ }
+ if(m_owns) {
+ throw std::system_error(
+ int(std::errc::resource_deadlock_would_occur),
+ std::system_category());
+ }
+ }
+
+ mutex_type* m_device;
+ bool m_owns;
+};
+
+/// Swap specialization for shared_lock
+/// @relates shared_mutex
+template
+void
+swap(shared_lock& x, shared_lock& y) noexcept {
+ x.swap(y);
+}
+
+} // namespace scord::abt
+
+#endif // SCORD_ABT_SHARED_MUTEX_HPP
diff --git a/src/common/api/admire_types.h b/src/common/api/admire_types.h
index 37a61db1dcceac7c6579d7c3ad853e1e1ef5e9d1..cfb056a76dcb32d04f3bef894a06dff3d78c07a3 100644
--- a/src/common/api/admire_types.h
+++ b/src/common/api/admire_types.h
@@ -47,6 +47,8 @@ typedef enum {
ADM_EBADARGS,
ADM_ENOMEM,
ADM_EOTHER,
+ ADM_EEXISTS,
+ ADM_ENOENT,
ADM_ERR_MAX = 512
} ADM_return_t;
diff --git a/src/common/api/admire_types.hpp b/src/common/api/admire_types.hpp
index 3ca979b32889464766d9380bd53450ecb1c84d8f..e8d60befeda3a3f674b016d4cb61472145519252 100644
--- a/src/common/api/admire_types.hpp
+++ b/src/common/api/admire_types.hpp
@@ -80,6 +80,8 @@ private:
std::unique_ptr m_pimpl;
};
+struct job_requirements;
+
struct job {
struct resources {
@@ -449,6 +451,12 @@ struct fmt::formatter : formatter {
case ADM_EOTHER:
name = "ADM_EOTHER";
break;
+ case ADM_EEXISTS:
+ name = "ADM_EEXISTS";
+ break;
+ case ADM_ENOENT:
+ name = "ADM_ENOENT";
+ break;
default:
break;
}
diff --git a/src/common/api/types.cpp b/src/common/api/types.cpp
index 644e06730f4ee769a173ec2b25d04eef09ba2ccb..6dfda147cc9b1e3b07886531c497106d1212abc9 100644
--- a/src/common/api/types.cpp
+++ b/src/common/api/types.cpp
@@ -1101,7 +1101,6 @@ class job::impl {
public:
explicit impl(job_id id) : m_id(id) {}
-
impl(const impl& rhs) = default;
impl(impl&& rhs) = default;
impl&
diff --git a/src/common/utils/utils.hpp b/src/common/utils/utils.hpp
index 06f76fbca9ab841ff6a5a97f3c2953727a941f4a..9a23ea50e794c8789deb895091216c3202510cb8 100644
--- a/src/common/utils/utils.hpp
+++ b/src/common/utils/utils.hpp
@@ -33,6 +33,28 @@
namespace scord::utils {
+template
+class singleton {
+public:
+ static T&
+ instance() {
+ static T s_instance;
+ return s_instance;
+ };
+
+ singleton(const singleton&) = delete;
+ singleton(singleton&&) = delete;
+ singleton&
+ operator=(const singleton&) = delete;
+ singleton&
+ operator=(singleton&&) = delete;
+
+protected:
+ struct token {};
+ singleton() = default;
+ ~singleton() = default;
+};
+
uint64_t
parse_size(const std::string& str);
diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp
index 770db97ffa4a00f218ab3935d2697bd6e43c5180..3782210394a9c2263df11251faa430d114b76257 100644
--- a/src/lib/detail/impl.cpp
+++ b/src/lib/detail/impl.cpp
@@ -219,7 +219,7 @@ register_job(const admire::server& srv,
const auto rpc = endp.call("ADM_register_job", &in, &out);
- if(out.retval < 0) {
+ if(out.retval != ADM_SUCCESS) {
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
@@ -262,7 +262,7 @@ update_job(const server& srv, const job& job,
const auto rpc = endp.call("ADM_update_job", &in, &out);
- if(out.retval < 0) {
+ if(out.retval != ADM_SUCCESS) {
const auto retval = static_cast(out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
@@ -298,7 +298,7 @@ remove_job(const server& srv, const job& job) {
const auto rpc = endp.call("ADM_remove_job", &in, &out);
- if(out.retval < 0) {
+ if(out.retval != ADM_SUCCESS) {
const auto retval = static_cast(out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
@@ -338,7 +338,7 @@ register_adhoc_storage(const server& srv, const std::string& name,
const auto rpc = endp.call("ADM_register_adhoc_storage", &in, &out);
- if(out.retval < 0) {
+ if(out.retval != ADM_SUCCESS) {
const auto retval = static_cast(out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt
index 6dca1f758270c0a72f659240bd024dcbf3585d76..26a4f33e0a64cc6de5c83abd8fe8bb0c68f9a483 100644
--- a/src/scord/CMakeLists.txt
+++ b/src/scord/CMakeLists.txt
@@ -25,7 +25,8 @@
# scord daemon
add_executable(scord)
-target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp)
+target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp
+ env.hpp job_manager.hpp)
target_include_directories(
scord
@@ -40,6 +41,9 @@ target_link_libraries(
common::network::rpc_server
common::network::rpc_types
common::api::types
+ common::utils
+ common::abt_cxx
+ tl::expected
fmt::fmt
Boost::program_options
RedisPlusPlus::RedisPlusPlus
diff --git a/src/scord/job_manager.hpp b/src/scord/job_manager.hpp
new file mode 100644
index 0000000000000000000000000000000000000000..214673960b64a90745ab9d9b6d84c8e6adc4417f
--- /dev/null
+++ b/src/scord/job_manager.hpp
@@ -0,0 +1,152 @@
+/******************************************************************************
+ * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
+ *
+ * This software was partially supported by the EuroHPC-funded project ADMIRE
+ * (Project ID: 956748, https://www.admire-eurohpc.eu).
+ *
+ * This file is part of scord.
+ *
+ * scord is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * scord is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with scord. If not, see .
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *****************************************************************************/
+
+#ifndef SCORD_JOB_MANAGER_HPP
+#define SCORD_JOB_MANAGER_HPP
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace scord {
+
+struct job_info {
+ explicit job_info(admire::job job) : m_job(std::move(job)) {}
+ job_info(admire::job job, admire::job::resources resources,
+ admire::job_requirements requirements)
+ : m_job(std::move(job)), m_resources(std::move(resources)),
+ m_requirements(std::move(requirements)) {}
+
+ admire::job
+ job() const {
+ return m_job;
+ }
+
+ std::optional
+ resources() const {
+ return m_resources;
+ }
+
+ std::optional
+ requirements() const {
+ return m_requirements;
+ }
+
+ admire::job m_job;
+ std::optional m_resources;
+ std::optional m_requirements;
+};
+
+struct job_manager : scord::utils::singleton {
+
+
+ tl::expected
+ create(admire::job::resources job_resources,
+ admire::job_requirements job_requirements) {
+
+ abt::unique_lock lock(m_jobs_mutex);
+
+ static std::atomic_uint64_t current_id;
+ admire::job_id id = current_id++;
+
+ if(const auto it = m_jobs.find(id); it == m_jobs.end()) {
+ const auto& [it_job, inserted] = m_jobs.emplace(
+ id, job_info{admire::job{id}, std::move(job_resources),
+ std::move(job_requirements)});
+
+ if(!inserted) {
+ LOGGER_ERROR("{}: Emplace failed", __FUNCTION__);
+ return tl::make_unexpected(ADM_ESNAFU);
+ }
+
+ return it_job->second;
+ }
+
+ LOGGER_ERROR("{}: Job '{}' already exists", __FUNCTION__, id);
+ return tl::make_unexpected(ADM_EEXISTS);
+ }
+
+ admire::error_code
+ update(admire::job_id id, admire::job::resources job_resources,
+ admire::job_requirements job_requirements) {
+
+ abt::unique_lock lock(m_jobs_mutex);
+
+ if(const auto it = m_jobs.find(id); it != m_jobs.end()) {
+ it->second = job_info{admire::job{id}, std::move(job_resources),
+ std::move(job_requirements)};
+ return ADM_SUCCESS;
+ }
+
+ LOGGER_ERROR("{}: Job '{}' does not exist", __FUNCTION__, id);
+ return ADM_ENOENT;
+ }
+
+ tl::expected
+ find(admire::job_id id) {
+
+ abt::shared_lock lock(m_jobs_mutex);
+
+ if(auto it = m_jobs.find(id); it != m_jobs.end()) {
+ return it->second;
+ }
+
+ LOGGER_ERROR("Job '{}' was not registered or was already deleted", id);
+ return tl::make_unexpected(ADM_ENOENT);
+ }
+
+ admire::error_code
+ remove(admire::job_id id) {
+
+ abt::unique_lock lock(m_jobs_mutex);
+
+ if(m_jobs.count(id) != 0) {
+ m_jobs.erase(id);
+ return ADM_SUCCESS;
+ }
+
+ LOGGER_ERROR("Job '{}' was not registered or was already deleted", id);
+
+ return ADM_ENOENT;
+ }
+
+private:
+ friend class scord::utils::singleton;
+ job_manager() = default;
+
+ mutable std::shared_mutex m_jobs_mutex;
+ std::unordered_map m_jobs;
+};
+
+} // namespace scord
+
+
+#endif // SCORD_JOB_MANAGER_HPP
diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp
index 2c65181fa922b7ebb44682f16046f214ab93c98b..497d728afa3d7550d256b92ff2969921f7b6a088 100644
--- a/src/scord/rpc_handlers.cpp
+++ b/src/scord/rpc_handlers.cpp
@@ -28,6 +28,7 @@
#include
#include
#include "rpc_handlers.hpp"
+#include "job_manager.hpp"
struct remote_procedure {
static std::uint64_t
@@ -99,24 +100,34 @@ ADM_register_job(hg_handle_t h) {
const admire::job_requirements reqs(&in.reqs);
const admire::job::resources job_resources(in.job_resources);
- const auto id = remote_procedure::new_id();
+ const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job_resources: {}, job_requirements: {}}}",
- id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
+ rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
job_resources, reqs);
- const auto job = admire::job{42};
+ admire::error_code ec = ADM_SUCCESS;
- admire::error_code rv = ADM_SUCCESS;
+ auto& jm = scord::job_manager::instance();
+ const auto rv = jm.create(job_resources, reqs);
- out.op_id = id;
- out.retval = rv;
- out.job = admire::api::convert(job).release();
+ if(rv) {
+ const auto& job = rv->job();
+ out.op_id = rpc_id;
+ out.retval = ec;
+ out.job = admire::api::convert(job).release();
+ } else {
+ LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc_id,
+ rv.error());
+ out.op_id = rpc_id;
+ out.retval = rv.error();
+ out.job = nullptr;
+ }
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}, job: {}}}",
- id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
- job);
+ rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
+ ec, rv ? fmt::format("{}", rv->job()) : "none");
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
@@ -150,19 +161,27 @@ ADM_update_job(hg_handle_t h) {
const admire::job::resources job_resources(in.job_resources);
const admire::job_requirements reqs(&in.reqs);
- const auto id = remote_procedure::new_id();
+ const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}, job_resources: {}, job_requirements: {}}}",
- id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job,
- job_resources, reqs);
+ rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
+ job, job_resources, reqs);
- admire::error_code rv = ADM_SUCCESS;
- out.op_id = id;
- out.retval = rv;
+ auto& jm = scord::job_manager::instance();
+ const auto ec = jm.update(job.id(), job_resources, reqs);
+
+ if(ec != ADM_SUCCESS) {
+ LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id,
+ ec);
+ }
+
+ out.op_id = rpc_id;
+ out.retval = ec;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
- id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv);
+ rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
+ ec);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
@@ -194,19 +213,27 @@ ADM_remove_job(hg_handle_t h) {
const admire::job job(in.job);
- const auto id = remote_procedure::new_id();
+ const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}}}",
- id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
+ rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
job);
- admire::error_code rv = ADM_SUCCESS;
- out.op_id = id;
- out.retval = rv;
+ auto& jm = scord::job_manager::instance();
+ const auto ec = jm.remove(job.id());
+
+ if(ec != ADM_SUCCESS) {
+ LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id,
+ ec);
+ }
+
+ out.op_id = rpc_id;
+ out.retval = ec;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
- id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv);
+ rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
+ ec);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);