Skip to content
Snippets Groups Projects
Commit 88d5862a authored by Alberto Miranda's avatar Alberto Miranda :hotsprings:
Browse files

Merge branch...

Merge branch 'amanzano/38-refactor-library-rpc-implementation-of-admire-remove_adhoc_storage' into 'main'

Resolve "Refactor library RPC implementation of `admire::remove_adhoc_storage`"

This MR moves the implementation of the RPC construction for
`ADM_remove_adhoc_storage` to `admire::detail` in `impl.[ch]pp`
similarly to other RPCs already refactored. This includes changing
the C ADM_types to native C++ types.

Closes #38

See merge request !62
parents 3454a626 eba88555
No related branches found
No related tags found
1 merge request!62Resolve "Refactor library RPC implementation of `admire::remove_adhoc_storage`"
Pipeline #3351 passed
......@@ -22,9 +22,14 @@
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#include "fmt/format.h"
#include "admire.hpp"
#include <fmt/format.h>
#include <admire.hpp>
#include "common.hpp"
#define NJOB_NODES 50
#define NADHOC_NODES 25
#define NINPUTS 10
#define NOUTPUTS 5
int
main(int argc, char* argv[]) {
......@@ -38,24 +43,28 @@ main(int argc, char* argv[]) {
admire::server server{"tcp", argv[1]};
ADM_storage_t adhoc_storage{};
ADM_return_t ret = ADM_SUCCESS;
const auto adhoc_nodes = prepare_nodes(NADHOC_NODES);
const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS);
const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS);
std::string name = "adhoc_storage_42";
const auto adhoc_storage_ctx = admire::adhoc_storage::ctx{
admire::adhoc_storage::execution_mode::separate_new,
admire::adhoc_storage::access_type::read_write,
admire::adhoc_storage::resources{adhoc_nodes}, 100, false};
try {
ret = admire::remove_adhoc_storage(server, adhoc_storage);
const auto adhoc_storage = admire::register_adhoc_storage(
server, name, admire::storage::type::gekkofs,
adhoc_storage_ctx);
admire::remove_adhoc_storage(server, adhoc_storage);
fmt::print(stdout,
"ADM_remove_adhoc_storage() remote procedure completed "
"successfully\n");
exit(EXIT_SUCCESS);
} catch(const std::exception& e) {
fmt::print(stderr, "FATAL: ADM_remove_adhoc_storage() failed: {}\n",
e.what());
exit(EXIT_FAILURE);
}
if(ret != ADM_SUCCESS) {
fmt::print(stdout,
"ADM_remove_adhoc_storage() remote procedure not completed "
"successfully\n");
exit(EXIT_FAILURE);
}
fmt::print(stdout, "ADM_remove_adhoc_storage() remote procedure completed "
"successfully\n");
}
......@@ -327,7 +327,7 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(
ADM_remove_job_out_t,
((hg_uint64_t) (op_id))
((int32_t) (retval))
((hg_int32_t) (retval))
);
/// ADM_register_adhoc_storage
......@@ -341,8 +341,8 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(
ADM_register_adhoc_storage_out_t,
((hg_uint64_t) (op_id))
((int32_t) (retval))
((uint64_t) (id))
((hg_int32_t) (retval))
((hg_uint64_t) (id))
);
/// ADM_update_adhoc_storage
......@@ -359,9 +359,16 @@ MERCURY_GEN_PROC(
);
/// ADM_remove_adhoc_storage
MERCURY_GEN_PROC(ADM_remove_adhoc_storage_in_t, ((int32_t) (reqs)))
MERCURY_GEN_PROC(
ADM_remove_adhoc_storage_in_t,
((hg_uint64_t) (server_id))
);
MERCURY_GEN_PROC(ADM_remove_adhoc_storage_out_t, ((int32_t) (ret)))
MERCURY_GEN_PROC(
ADM_remove_adhoc_storage_out_t,
((hg_uint64_t) (op_id))
((hg_int32_t) (retval))
);
/// ADM_deploy_adhoc_storage
MERCURY_GEN_PROC(ADM_deploy_adhoc_storage_in_t, ((int32_t) (reqs)))
......
......@@ -255,30 +255,9 @@ update_adhoc_storage(const server& srv,
return detail::update_adhoc_storage(srv, adhoc_storage_ctx, adhoc_storage);
}
ADM_return_t
remove_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage) {
(void) srv;
(void) adhoc_storage;
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("ADM_remove_adhoc_storage(...)");
ADM_remove_adhoc_storage_in_t in{};
ADM_remove_adhoc_storage_out_t out;
const auto rpc = endp.call("ADM_remove_adhoc_storage", &in, &out);
if(out.ret < 0) {
LOGGER_ERROR("ADM_remove_adhoc_storage() = {}", out.ret);
return static_cast<ADM_return_t>(out.ret);
}
LOGGER_INFO("ADM_remove_adhoc_storage() = {}", ADM_SUCCESS);
return ADM_SUCCESS;
admire::error_code
remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {
return detail::remove_adhoc_storage(srv, adhoc_storage);
}
ADM_return_t
......
......@@ -67,8 +67,8 @@ update_adhoc_storage(const server& srv,
const adhoc_storage::ctx& adhoc_storage_ctx,
const adhoc_storage& adhoc_storage);
ADM_return_t
remove_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage);
admire::error_code
remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage);
ADM_return_t
deploy_adhoc_storage(const server& srv, ADM_storage_t adhoc_storage);
......
......@@ -116,7 +116,8 @@ ADM_remove_adhoc_storage(ADM_server_t server, ADM_storage_t adhoc_storage) {
const admire::server srv{server};
return admire::remove_adhoc_storage(srv, adhoc_storage);
return admire::detail::remove_adhoc_storage(
srv, admire::adhoc_storage{adhoc_storage});
}
ADM_return_t
......
......@@ -446,4 +446,37 @@ update_adhoc_storage(const server& srv,
return admire::error_code::success;
}
admire::error_code
remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) {
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{adhoc_storage_id: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), adhoc_storage.id());
ADM_remove_adhoc_storage_in_t in{adhoc_storage.id()};
ADM_remove_adhoc_storage_out_t out;
const auto rpc = endp.call("ADM_remove_adhoc_storage", &in, &out);
if(const auto rv = admire::error_code{out.retval}; !rv) {
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), rv, out.op_id);
return rv;
}
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), admire::error_code::success,
out.op_id);
return admire::error_code::success;
}
} // namespace admire::detail
......@@ -62,6 +62,9 @@ update_adhoc_storage(const server& srv,
const adhoc_storage::ctx& adhoc_storage_ctx,
const adhoc_storage& adhoc_storage);
admire::error_code
remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage);
} // namespace admire::detail
#endif // SCORD_ADMIRE_IMPL_HPP
......@@ -389,11 +389,28 @@ ADM_remove_adhoc_storage(hg_handle_t h) {
ret = margo_get_input(h, &in);
assert(ret == HG_SUCCESS);
out.ret = -1;
const auto rpc_id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{adhoc_storage_id: {}}}",
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
in.server_id);
LOGGER_INFO("ADM_remove_adhoc_storage()");
out.ret = 0;
auto& adhoc_manager = scord::adhoc_storage_manager::instance();
admire::error_code ec = adhoc_manager.remove(in.server_id);
if(!ec) {
LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id,
in.server_id);
}
out.op_id = rpc_id;
out.retval = ec;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
rpc_id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ec);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment