From 8bb6da6626cd493750d28340b6829c3ecdc0b574 Mon Sep 17 00:00:00 2001 From: Alberto Miranda Date: Wed, 19 Oct 2022 17:34:59 +0200 Subject: [PATCH] ADM_update_job: job_requirements can no longer be updated --- examples/c/ADM_update_job.c | 28 ++++++---------------------- examples/cxx/ADM_update_job.cpp | 5 ++--- src/common/api/internal_types.hpp | 5 +++++ src/common/net/proto/rpc_types.h | 1 - src/lib/admire.cpp | 4 ++-- src/lib/admire.h | 14 +++++++++++++- src/lib/admire.hpp | 3 +-- src/lib/c_wrapper.cpp | 5 ++--- src/lib/detail/impl.cpp | 11 ++++------- src/lib/detail/impl.hpp | 2 +- src/scord/job_manager.hpp | 10 ++-------- src/scord/rpc_handlers.cpp | 7 +++---- 12 files changed, 41 insertions(+), 54 deletions(-) diff --git a/examples/c/ADM_update_job.c b/examples/c/ADM_update_job.c index a9ecf2c1..d3a876fe 100644 --- a/examples/c/ADM_update_job.c +++ b/examples/c/ADM_update_job.c @@ -99,30 +99,14 @@ main(int argc, char* argv[]) { goto cleanup; } - ADM_dataset_t new_inputs[NINPUTS]; + ADM_node_t* new_job_nodes = prepare_nodes(NJOB_NODES * 2); + assert(new_job_nodes); - for(int i = 0; i < NINPUTS; ++i) { - const char* pattern = "input-new-dataset-%d"; - size_t n = snprintf(NULL, 0, pattern, i); - char* id = (char*) alloca(n + 1); - snprintf(id, n + 1, pattern, i); - new_inputs[i] = ADM_dataset_create(id); - } - - ADM_dataset_t new_outputs[NOUTPUTS]; - - for(int i = 0; i < NOUTPUTS; ++i) { - const char* pattern = "output-new-dataset-%d"; - size_t n = snprintf(NULL, 0, pattern, i); - char* id = (char*) alloca(n + 1); - snprintf(id, n + 1, pattern, i); - new_outputs[i] = ADM_dataset_create(id); - } - - ADM_job_requirements_t new_reqs = ADM_job_requirements_create( - new_inputs, NINPUTS, new_outputs, NOUTPUTS, adhoc_storage); + ADM_job_resources_t new_job_resources = + ADM_job_resources_create(new_job_nodes, NJOB_NODES * 2); + assert(job_resources); - ret = ADM_update_job(server, job, job_resources, new_reqs); + ret = ADM_update_job(server, job, new_job_resources); if(ret != ADM_SUCCESS) { fprintf(stderr, diff --git a/examples/cxx/ADM_update_job.cpp b/examples/cxx/ADM_update_job.cpp index bb26ec3e..223bd33a 100644 --- a/examples/cxx/ADM_update_job.cpp +++ b/examples/cxx/ADM_update_job.cpp @@ -43,6 +43,7 @@ main(int argc, char* argv[]) { admire::server server{"tcp", argv[1]}; const auto job_nodes = prepare_nodes(NJOB_NODES); + const auto new_job_nodes = prepare_nodes(NJOB_NODES * 2); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); @@ -60,14 +61,12 @@ main(int argc, char* argv[]) { const auto new_outputs = prepare_datasets("output-new-dataset-{}", NOUTPUTS); - admire::job_requirements new_reqs{new_inputs, new_outputs, gkfs_storage}; - try { [[maybe_unused]] const auto job = admire::register_job( server, admire::job::resources{job_nodes}, reqs, 0); [[maybe_unused]] ADM_return_t ret = admire::update_job( - server, job, admire::job::resources{job_nodes}, new_reqs); + server, job, admire::job::resources{new_job_nodes}); fmt::print( stdout, diff --git a/src/common/api/internal_types.hpp b/src/common/api/internal_types.hpp index a1c1fdde..0140a62f 100644 --- a/src/common/api/internal_types.hpp +++ b/src/common/api/internal_types.hpp @@ -53,6 +53,11 @@ struct job_info { return m_requirements; } + void + update(admire::job::resources resources) { + m_resources = std::move(resources); + } + admire::job m_job; std::optional m_resources; std::optional m_requirements; diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index a3a7e2c6..4161ef5d 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -310,7 +310,6 @@ MERCURY_GEN_PROC( ADM_update_job_in_t, ((ADM_job_t) (job)) ((ADM_job_resources_t) (job_resources)) - ((adm_job_requirements) (reqs)) ); MERCURY_GEN_PROC( diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 183db2dc..482421f5 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -224,8 +224,8 @@ register_job(const server& srv, const job::resources& resources, ADM_return_t update_job(const server& srv, const job& job, - const job::resources& job_resources, const job_requirements& reqs) { - return detail::update_job(srv, job, job_resources, reqs); + const job::resources& job_resources) { + return detail::update_job(srv, job, job_resources); } ADM_return_t diff --git a/src/lib/admire.h b/src/lib/admire.h index e54c5430..d0bf7cbb 100644 --- a/src/lib/admire.h +++ b/src/lib/admire.h @@ -80,9 +80,21 @@ ADM_register_job(ADM_server_t server, ADM_job_resources_t res, ADM_job_requirements_t reqs, uint64_t slurm_id, ADM_job_t* job); +/** + * Update a registered job resources. + * + * @remark The returned ADM_JOB will be freed when passed to + * ADM_remove_job(). + * + * @param[in] server The server to which the request is directed + * @param[in] job An ADM_JOB identifying the target job to update. + * @param[in] job_resources The new resources for the job. + * @return Returns ADM_SUCCESS if the remote procedure has completed + * successfully. + */ ADM_return_t ADM_update_job(ADM_server_t server, ADM_job_t job, - ADM_job_resources_t job_resources, ADM_job_requirements_t reqs); + ADM_job_resources_t job_resources); ADM_return_t ADM_remove_job(ADM_server_t server, ADM_job_t job); diff --git a/src/lib/admire.hpp b/src/lib/admire.hpp index 56b01c58..68315fc0 100644 --- a/src/lib/admire.hpp +++ b/src/lib/admire.hpp @@ -52,8 +52,7 @@ register_job(const server& srv, const job::resources& job_resources, const job_requirements& reqs, admire::slurm_job_id slurm_id); ADM_return_t -update_job(const server& srv, const job&, const job::resources& job_resources, - const job_requirements& reqs); +update_job(const server& srv, const job&, const job::resources& job_resources); ADM_return_t remove_job(const server& srv, const job& job); diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 21d43f14..05faa3f7 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -64,13 +64,12 @@ ADM_register_job(ADM_server_t server, ADM_job_resources_t res, ADM_return_t ADM_update_job(ADM_server_t server, ADM_job_t job, - ADM_job_resources_t job_resources, ADM_job_requirements_t reqs) { + ADM_job_resources_t job_resources) { const admire::server srv{server}; return admire::update_job(srv, admire::job{job}, - admire::job::resources{job_resources}, - admire::job_requirements{reqs}); + admire::job::resources{job_resources}); } ADM_return_t diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 5bb433c0..73556145 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -241,7 +241,7 @@ register_job(const server& srv, const job::resources& job_resources, admire::error_code update_job(const server& srv, const job& job, - const job::resources& job_resources, const job_requirements& reqs) { + const job::resources& job_resources) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; @@ -249,17 +249,14 @@ update_job(const server& srv, const job& job, auto endp = rpc_client.lookup(srv.address()); LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job: {}, job_resources: {}, job_requirements: {}}}", + "body: {{job: {}, job_resources: {}}}", rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), job, job_resources, - reqs); + std::quoted(rpc_client.self_address()), job, job_resources); const auto rpc_job = api::convert(job); const auto rpc_job_resources = api::convert(job_resources); - const auto rpc_reqs = api::convert(reqs); - ADM_update_job_in_t in{rpc_job.get(), rpc_job_resources.get(), - *rpc_reqs.get()}; + ADM_update_job_in_t in{rpc_job.get(), rpc_job_resources.get()}; ADM_update_job_out_t out; const auto rpc = endp.call("ADM_update_job", &in, &out); diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 4a07acb6..613b960e 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -40,7 +40,7 @@ register_job(const server& srv, const job::resources& job_resources, admire::error_code update_job(const server& srv, const job& job, - const job::resources& job_resources, const job_requirements& reqs); + const job::resources& job_resources); admire::error_code remove_job(const server& srv, const job& job); diff --git a/src/scord/job_manager.hpp b/src/scord/job_manager.hpp index 186434e6..97fa25a5 100644 --- a/src/scord/job_manager.hpp +++ b/src/scord/job_manager.hpp @@ -70,19 +70,13 @@ struct job_manager : scord::utils::singleton { } admire::error_code - update(admire::job_id id, admire::job::resources job_resources, - admire::job_requirements job_requirements) { + update(admire::job_id id, admire::job::resources job_resources) { abt::unique_lock lock(m_jobs_mutex); if(const auto it = m_jobs.find(id); it != m_jobs.end()) { const auto& current_job_info = it->second; - - const auto new_job_info = admire::internal::job_info{ - current_job_info->job(), std::move(job_resources), - std::move(job_requirements)}; - - *it->second = new_job_info; + current_job_info->update(std::move(job_resources)); return ADM_SUCCESS; } diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index fdf084af..5f1d713c 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -161,16 +161,15 @@ ADM_update_job(hg_handle_t h) { const admire::job job(in.job); const admire::job::resources job_resources(in.job_resources); - const admire::job_requirements reqs(&in.reqs); const auto rpc_id = remote_procedure::new_id(); LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job: {}, job_resources: {}, job_requirements: {}}}", + "body: {{job: {}, job_resources: {}}}", rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), - job, job_resources, reqs); + job, job_resources); auto& jm = scord::job_manager::instance(); - const auto ec = jm.update(job.id(), job_resources, reqs); + const auto ec = jm.update(job.id(), job_resources); if(ec != ADM_SUCCESS) { LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, -- GitLab