Skip to content
Snippets Groups Projects

Resolve "adhoc_storage instances should keep track of jobs using them"

2 files
+ 16
10
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 24
42
@@ -26,6 +26,7 @@
#define SCORD_JOB_MANAGER_HPP
#include <admire_types.hpp>
#include <internal_types.hpp>
#include <atomic>
#include <utility>
#include <utils/utils.hpp>
@@ -36,37 +37,11 @@
namespace scord {
struct job_info {
explicit job_info(admire::job job) : m_job(std::move(job)) {}
job_info(admire::job job, admire::job::resources resources,
admire::job_requirements requirements)
: m_job(std::move(job)), m_resources(std::move(resources)),
m_requirements(std::move(requirements)) {}
admire::job
job() const {
return m_job;
}
std::optional<admire::job::resources>
resources() const {
return m_resources;
}
std::optional<admire::job_requirements>
requirements() const {
return m_requirements;
}
admire::job m_job;
std::optional<admire::job::resources> m_resources;
std::optional<admire::job_requirements> m_requirements;
};
struct job_manager : scord::utils::singleton<job_manager> {
tl::expected<job_info, admire::error_code>
tl::expected<std::shared_ptr<admire::internal::job_info>,
admire::error_code>
create(admire::slurm_job_id slurm_id, admire::job::resources job_resources,
admire::job_requirements job_requirements) {
@@ -76,10 +51,11 @@ struct job_manager : scord::utils::singleton<job_manager> {
abt::unique_lock lock(m_jobs_mutex);
if(const auto it = m_jobs.find(id); it == m_jobs.end()) {
const auto& [it_job, inserted] =
m_jobs.emplace(id, job_info{admire::job{id, slurm_id},
std::move(job_resources),
std::move(job_requirements)});
const auto& [it_job, inserted] = m_jobs.emplace(
id,
std::make_shared<admire::internal::job_info>(
admire::job{id, slurm_id}, std::move(job_resources),
std::move(job_requirements)));
if(!inserted) {
LOGGER_ERROR("{}: Emplace failed", __FUNCTION__);
@@ -102,9 +78,11 @@ struct job_manager : scord::utils::singleton<job_manager> {
if(const auto it = m_jobs.find(id); it != m_jobs.end()) {
const auto& current_job_info = it->second;
it->second =
job_info{current_job_info.job(), std::move(job_resources),
std::move(job_requirements)};
const auto new_job_info = admire::internal::job_info{
current_job_info->job(), std::move(job_resources),
std::move(job_requirements)};
*it->second = new_job_info;
return ADM_SUCCESS;
}
@@ -112,7 +90,8 @@ struct job_manager : scord::utils::singleton<job_manager> {
return ADM_ENOENT;
}
tl::expected<job_info, admire::error_code>
tl::expected<std::shared_ptr<admire::internal::job_info>,
admire::error_code>
find(admire::job_id id) {
abt::shared_lock lock(m_jobs_mutex);
@@ -125,19 +104,20 @@ struct job_manager : scord::utils::singleton<job_manager> {
return tl::make_unexpected(ADM_ENOENT);
}
admire::error_code
tl::expected<std::shared_ptr<admire::internal::job_info>,
admire::error_code>
remove(admire::job_id id) {
abt::unique_lock lock(m_jobs_mutex);
if(m_jobs.count(id) != 0) {
m_jobs.erase(id);
return ADM_SUCCESS;
if(const auto it = m_jobs.find(id); it != m_jobs.end()) {
auto nh = m_jobs.extract(it);
return nh.mapped();
}
LOGGER_ERROR("Job '{}' was not registered or was already deleted", id);
return ADM_ENOENT;
return tl::make_unexpected(ADM_ENOENT);
}
private:
@@ -145,7 +125,9 @@ private:
job_manager() = default;
mutable abt::shared_mutex m_jobs_mutex;
std::unordered_map<admire::job_id, job_info> m_jobs;
std::unordered_map<admire::job_id,
std::shared_ptr<admire::internal::job_info>>
m_jobs;
};
} // namespace scord
Loading