diff --git a/ci/check_rpcs.py b/ci/check_rpcs.py index cd23b502272263f4fa25d20fcb56f38b5660c25b..5055a07e363837f328eb6722a1b7b874282c0e24 100755 --- a/ci/check_rpcs.py +++ b/ci/check_rpcs.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -import pprint +from loguru import logger import re import sys from pathlib import Path @@ -12,6 +12,7 @@ RPC_NAMES = { 'ADM_register_job', 'ADM_update_job', 'ADM_remove_job', 'ADM_register_adhoc_storage', 'ADM_update_adhoc_storage', 'ADM_remove_adhoc_storage', 'ADM_deploy_adhoc_storage', + 'ADM_tear_down_adhoc_storage', 'ADM_register_pfs_storage', 'ADM_update_pfs_storage', 'ADM_remove_pfs_storage', 'ADM_transfer_datasets', 'ADM_get_transfer_priority', @@ -161,22 +162,23 @@ class RemoteProcedure: for extra_keys, rpc in zip([self_extra_keys, other_extra_keys], [self, other]): if len(extra_keys) != 0: - print("ERROR: Extra fields were found when comparing an rpc to " - "its counterpart\n" - f" extra fields: {extra_keys}" - f" line number: {rpc.meta.lineno}" - f" line contents: {rpc.meta.line}", file=sys.stderr) + logger.error( + "\nExtra fields were found when comparing an rpc " + "to its counterpart\n" + f" extra fields: {extra_keys}" + f" line number: {rpc.meta.lineno}" + f" line contents: {rpc.meta.line}", file=sys.stderr) return False for k in self_keys: if self._body[k] != other._body[k]: - print("ERROR: Mismatching values were found when comparing an " - "rpc to its counterpart\n" - f" value1 (line: {self.meta.lineno}): {k}: " - f"{self._body[k]}\n" - f" value2 (line: {other.meta.lineno}): {k}: " - f"{other._body[k]} ", - file=sys.stderr) + logger.error("\nMismatching values were found when " + "comparing an rpc to its counterpart\n" + f" value1 (line: {self.meta.lineno}): {k}: " + f"{self._body[k]}\n" + f" value2 (line: {other.meta.lineno}): {k}: " + f"{other._body[k]} ", + file=sys.stderr) return False return True @@ -318,14 +320,16 @@ if __name__ == "__main__": for lf, n in zip([client_logfile, server_logfile], ['CLIENT_LOGFILE', 'SERVER_LOGFILE']): if not lf.is_file(): - print(f"ERROR: {n} '{lf}' is not a file", file=sys.stderr) + logger.error(f"{n} '{lf}' is not a file", file=sys.stderr) sys.exit(1) rpc_name = sys.argv[3] if rpc_name not in RPC_NAMES: - print(f"ERROR: '{rpc_name}' is not a valid rpc name", file=sys.stderr) - print(f" Valid names: {', '.join(sorted(RPC_NAMES))}", file=sys.stderr) + logger.error(f"'{rpc_name}' is not a valid rpc name", + file=sys.stderr) + logger.error(f" Valid names: {', '.join(sorted(RPC_NAMES))}", + file=sys.stderr) sys.exit(1) logfiles = [client_logfile, server_logfile] @@ -345,9 +349,10 @@ if __name__ == "__main__": if rpc.is_request: found_rpcs[rpc.id] = rpc else: - print(f"ERROR: Found RPC reply without corresponding " - f"request at line {rpc.meta.lineno}\n" - f" raw: '{rpc.meta.line}'", file=sys.stderr) + logger.error(f"\nFound server reply for RPC without " + f"a corresponding client request at line" + f" {rpc.meta.lineno}\n" + f" raw: '{rpc.meta.line}'", file=sys.stderr) sys.exit(1) else: req_rpc = found_rpcs[rpc.id] @@ -358,7 +363,11 @@ if __name__ == "__main__": ec = 0 for k in client_ops.keys(): - assert (k in server_ops) + if k not in server_ops: + logger.error( + f"Operation ID '{k}' found in client log but missing " + f"in server log") + ec = 1 if client_ops[k] != server_ops[k]: ec = 1 diff --git a/examples/c/ADM_tear_down_adhoc_storage.c b/examples/c/ADM_tear_down_adhoc_storage.c new file mode 100644 index 0000000000000000000000000000000000000000..10e6546c46c279b03fc29f7fdd4e7493066a3d99 --- /dev/null +++ b/examples/c/ADM_tear_down_adhoc_storage.c @@ -0,0 +1,154 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include +#include "common.h" + +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 + +int +main(int argc, char* argv[]) { + + if(argc != 2) { + fprintf(stderr, "ERROR: no location provided\n"); + fprintf(stderr, "Usage: ADM_deploy_adhoc_storage \n"); + exit(EXIT_FAILURE); + } + + int exit_status = EXIT_FAILURE; + ADM_return_t ret = ADM_SUCCESS; + ADM_server_t server = NULL; + + // adhoc information + const char* adhoc_name = "adhoc_storage_42"; + + ADM_node_t* adhoc_nodes = NULL; + ADM_adhoc_resources_t adhoc_resources = NULL; + ADM_adhoc_context_t adhoc_ctx = NULL; + ADM_adhoc_context_t new_adhoc_ctx = NULL; + ADM_adhoc_storage_t adhoc_storage = NULL; + + + // Let's prepare all the information required by the API calls. + // ADM_update_adhoc_storage() obviously requires an adhoc storage to have + // been registered onto the system, so let's prepare first the data required + // to call ADM_register_adhoc_storage(): + + // 1. the jobs required by the associated adhoc storage + adhoc_nodes = prepare_nodes(NADHOC_NODES); + + if(adhoc_nodes == NULL) { + fprintf(stderr, "Fatal error preparing adhoc nodes\n"); + goto cleanup; + } + + // 2. the adhoc storage resources + adhoc_resources = ADM_adhoc_resources_create(adhoc_nodes, NADHOC_NODES); + + if(adhoc_resources == NULL) { + fprintf(stderr, "Fatal error preparing adhoc resources\n"); + goto cleanup; + } + + // 3. the adhoc storage execution context + adhoc_ctx = ADM_adhoc_context_create(ADM_ADHOC_MODE_SEPARATE_NEW, + ADM_ADHOC_ACCESS_RDWR, 100, false); + + if(adhoc_ctx == NULL) { + fprintf(stderr, "Fatal error preparing adhoc context\n"); + goto cleanup; + } + + + // All the information required by the ADM_register_adhoc_storage() API is + // now ready. Let's actually contact the server: + + // 1. Find the server endpoint + if((server = ADM_server_create("tcp", argv[1])) == NULL) { + fprintf(stderr, "Fatal error creating server\n"); + goto cleanup; + } + + // 2. Register the adhoc storage + if(ADM_register_adhoc_storage( + server, adhoc_name, ADM_ADHOC_STORAGE_DATACLAY, adhoc_ctx, + adhoc_resources, &adhoc_storage) != ADM_SUCCESS) { + fprintf(stderr, "ADM_register_adhoc_storage() failed: %s\n", + ADM_strerror(ret)); + goto cleanup; + } + + + // Now that we have an existing adhoc storage registered into the + // system, let's prepare a new execution context for the adhoc + // storage system + + new_adhoc_ctx = ADM_adhoc_context_create(ADM_ADHOC_MODE_SEPARATE_NEW, + ADM_ADHOC_ACCESS_RDWR, 200, false); + + if(new_adhoc_ctx == NULL) { + fprintf(stderr, "Fatal error preparing new adhoc context\n"); + goto cleanup; + } + + // We can now request the deployment to the server + if((ret = ADM_deploy_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_deploy_adhoc_storage() failed: %s\n", + ADM_strerror(ret)); + goto cleanup; + } + + // We can noe request the tear down of the adhoc storage + if((ret = ADM_tear_down_adhoc_storage(server, adhoc_storage)) != + ADM_SUCCESS) { + fprintf(stderr, "ADM_tear_down_adhoc_storage() failed: %s\n", + ADM_strerror(ret)); + goto cleanup; + } + + // At this point, the adhoc storage has been torn down... + exit_status = EXIT_SUCCESS; + + // Once the adhoc storage is no longer required we need to notify the server + if((ret = ADM_remove_adhoc_storage(server, adhoc_storage)) != ADM_SUCCESS) { + fprintf(stderr, "ADM_remove_adhoc_storage() failed: %s\n", + ADM_strerror(ret)); + adhoc_storage = NULL; + exit_status = EXIT_FAILURE; + // intentionally fall through... + } + + +cleanup: + ADM_server_destroy(server); + ADM_adhoc_context_destroy(new_adhoc_ctx); + ADM_adhoc_context_destroy(adhoc_ctx); + ADM_adhoc_resources_destroy(adhoc_resources); + destroy_nodes(adhoc_nodes, NADHOC_NODES); + exit(exit_status); +} diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 61cf9d67225e192ec87f37830de19ca8d7b56717..ce29ffbdc3845c92ff2102ce4c67569cb1acac09 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -29,7 +29,7 @@ list(APPEND examples_c ADM_register_job ADM_update_job ADM_remove_job # adhoc storage ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage - ADM_deploy_adhoc_storage + ADM_deploy_adhoc_storage ADM_tear_down_adhoc_storage # pfs storage ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage # transfers diff --git a/examples/cxx/ADM_tear_down_adhoc_storage.cpp b/examples/cxx/ADM_tear_down_adhoc_storage.cpp new file mode 100644 index 0000000000000000000000000000000000000000..229242744f1c6e001891e10401d09d3a01eef293 --- /dev/null +++ b/examples/cxx/ADM_tear_down_adhoc_storage.cpp @@ -0,0 +1,79 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#include +#include +#include "common.hpp" + +#define NJOB_NODES 50 +#define NADHOC_NODES 25 +#define NINPUTS 10 +#define NOUTPUTS 5 + +int +main(int argc, char* argv[]) { + + if(argc != 2) { + fmt::print(stderr, "ERROR: no location provided\n"); + fmt::print(stderr, + "Usage: ADM_tear_down_adhoc_storage \n"); + exit(EXIT_FAILURE); + } + + scord::server server{"tcp", argv[1]}; + + const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); + const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); + const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); + + std::string name = "adhoc_storage_42"; + const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ + scord::adhoc_storage::execution_mode::separate_new, + scord::adhoc_storage::access_type::read_write, 100, false}; + const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; + + try { + const auto adhoc_storage = scord::register_adhoc_storage( + server, name, scord::adhoc_storage::type::dataclay, + adhoc_storage_ctx, adhoc_resources); + + fmt::print(stdout, + "ADM_register_adhoc_storage() remote procedure completed " + "successfully\n"); + + scord::deploy_adhoc_storage(server, adhoc_storage); + + scord::tear_down_adhoc_storage(server, adhoc_storage); + + } catch(const std::exception& e) { + fmt::print(stderr, + "FATAL: ADM_register_adhoc_storage() or " + "ADM_deploy_adhoc_storage() failed: {}\n", + e.what()); + exit(EXIT_FAILURE); + } + + fmt::print(stdout, "ADM_deploy_adhoc_storage() remote procedure completed " + "successfully\n"); +} diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index ead7d2515034d2eec8eff6e63d9ac87387c2f84f..0b4ca7469cc0a3a3a06ac4c5d61be2320d7707fe 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -29,7 +29,7 @@ list(APPEND examples_cxx ADM_register_job ADM_update_job ADM_remove_job # adhoc storage ADM_register_adhoc_storage ADM_update_adhoc_storage ADM_remove_adhoc_storage - ADM_deploy_adhoc_storage + ADM_deploy_adhoc_storage ADM_tear_down_adhoc_storage # pfs storage ADM_register_pfs_storage ADM_update_pfs_storage ADM_remove_pfs_storage # transfers diff --git a/src/lib/c_wrapper.cpp b/src/lib/c_wrapper.cpp index 1074843a97d52ad4cab2aedd280ac8bfe6819cb5..e383a86e47de1f32794d4d3102492da6b98c4cf1 100644 --- a/src/lib/c_wrapper.cpp +++ b/src/lib/c_wrapper.cpp @@ -161,6 +161,14 @@ ADM_deploy_adhoc_storage(ADM_server_t server, srv, scord::adhoc_storage{adhoc_storage}); } +ADM_return_t +ADM_tear_down_adhoc_storage(ADM_server_t server, + ADM_adhoc_storage_t adhoc_storage) { + + return scord::detail::tear_down_adhoc_storage( + scord::server{server}, scord::adhoc_storage{adhoc_storage}); +} + ADM_return_t ADM_register_pfs_storage(ADM_server_t server, const char* name, ADM_pfs_storage_type_t type, ADM_pfs_context_t ctx, diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 6f3bc40d82e359ebc7cc73e8c4153d92885ebf56..ec28bcb799929b100b8e22d959948da019917fb5 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -479,6 +479,43 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { return scord::error_code::other; } +scord::error_code +tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { + + scord::network::client rpc_client{srv.protocol()}; + + const auto rpc_id = ::api::remote_procedure::new_id(); + + if(const auto lookup_rv = rpc_client.lookup(srv.address()); + lookup_rv.has_value()) { + const auto& endp = lookup_rv.value(); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_id: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), adhoc_storage.id()); + + if(const auto call_rv = + endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); + call_rv.has_value()) { + + const scord::network::generic_response resp{call_rv.value()}; + + LOGGER_EVAL(resp.error_code(), INFO, ERROR, + "rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(endp.address()), resp.error_code(), + resp.op_id()); + + return resp.error_code(); + } + } + + LOGGER_ERROR("rpc call failed"); + return scord::error_code::other; +} + tl::expected transfer_datasets(const server& srv, const job& job, const std::vector& sources, diff --git a/src/lib/detail/impl.hpp b/src/lib/detail/impl.hpp index 40bb30c9132521941f7cec8b19f180a1db53a805..d87539086ac90ef9a2f4fa15948fdf9bfd2c5297 100644 --- a/src/lib/detail/impl.hpp +++ b/src/lib/detail/impl.hpp @@ -62,6 +62,9 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); scord::error_code deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +scord::error_code +tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); + tl::expected register_pfs_storage(const server& srv, const std::string& name, enum pfs_storage::type type, const pfs_storage::ctx& ctx); diff --git a/src/lib/libscord.cpp b/src/lib/libscord.cpp index cbe3b47c37195774ca3bbe2c88132c66d27f42c8..a73c7738fdd19cc7211b16c8660fcb6638e07ec9 100644 --- a/src/lib/libscord.cpp +++ b/src/lib/libscord.cpp @@ -299,6 +299,17 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { } } +void +tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { + + const auto ec = detail::tear_down_adhoc_storage(srv, adhoc_storage); + + if(!ec) { + throw std::runtime_error(fmt::format( + "ADM_deploy_adhoc_storage() error: {}", ec.message())); + } +} + scord::pfs_storage register_pfs_storage(const server& srv, const std::string& name, enum pfs_storage::type type, const pfs_storage::ctx& ctx) { diff --git a/src/lib/scord/scord.h b/src/lib/scord/scord.h index 1693a3ba5a2c20112add750d5d1728812cca3e61..4733624aab84c149ec7e32db369b165241b59418 100644 --- a/src/lib/scord/scord.h +++ b/src/lib/scord/scord.h @@ -158,6 +158,18 @@ ADM_return_t ADM_deploy_adhoc_storage(ADM_server_t server, ADM_adhoc_storage_t adhoc_storage); +/** + * Tear down a previously deployed adhoc storage system instance + * + * @param[in] server The server to which the request is directed + * @param[in] adhoc_storage An ADM_STORAGE referring to the adhoc storage + * instance of interest. + * @return Returns ADM_SUCCESS if the remote procedure has completed + */ +ADM_return_t +ADM_tear_down_adhoc_storage(ADM_server_t server, + ADM_adhoc_storage_t adhoc_storage); + /** * Register a PFS storage tier. * diff --git a/src/lib/scord/scord.hpp b/src/lib/scord/scord.hpp index 1a9a5c560b18a6df818a26b69f6b261ef48cee3b..1325b5c617b95919e608864d1ce9d6c73d498a66 100644 --- a/src/lib/scord/scord.hpp +++ b/src/lib/scord/scord.hpp @@ -73,6 +73,9 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); void deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); +void +tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage); + scord::pfs_storage register_pfs_storage(const server& srv, const std::string& name, enum scord::pfs_storage::type type, diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 9c4045bc00bd88af52cf23c50cb377bc41747285..329f4c716b9eaeb6ce9e932257e1f0ea4601432d 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -401,6 +401,32 @@ deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id) { req.respond(resp); } +void +tear_down_adhoc_storage(const request& req, std::uint64_t adhoc_id) { + + using scord::network::generic_response; + using scord::network::get_address; + + const auto rpc_name = "ADM_"s + __FUNCTION__; + const auto rpc_id = remote_procedure::new_id(); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{adhoc_id: {}}}", + rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), + adhoc_id); + + // TODO: actually tear down the adhoc storage instance + + const auto resp = generic_response{rpc_id, scord::error_code::success}; + + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), + scord::error_code::success); + + req.respond(resp); +} + void register_pfs_storage(const request& req, const std::string& name, enum scord::pfs_storage::type type, diff --git a/src/scord/rpc_handlers.hpp b/src/scord/rpc_handlers.hpp index c0bc39638518673053a2ed16633b4b461e3c827b..1ccc493dcfd43be900b74a4f70e13fed7b41509d 100644 --- a/src/scord/rpc_handlers.hpp +++ b/src/scord/rpc_handlers.hpp @@ -62,6 +62,9 @@ remove_adhoc_storage(const request& req, std::uint64_t adhoc_id); void deploy_adhoc_storage(const request& req, std::uint64_t adhoc_id); +void +tear_down_adhoc_storage(const request& req, std::uint64_t adhoc_id); + void register_pfs_storage(const request& req, const std::string& name, enum scord::pfs_storage::type type, diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp index 3615a8bfeb16647eb5add0acbd15cb28faf8863d..bdace7a7180bcd592f61174b258376700251795f 100644 --- a/src/scord/scord.cpp +++ b/src/scord/scord.cpp @@ -192,6 +192,7 @@ main(int argc, char* argv[]) { daemon.set_handler(EXPAND(update_adhoc_storage)); daemon.set_handler(EXPAND(remove_adhoc_storage)); daemon.set_handler(EXPAND(deploy_adhoc_storage)); + daemon.set_handler(EXPAND(tear_down_adhoc_storage)); daemon.set_handler(EXPAND(register_pfs_storage)); daemon.set_handler(EXPAND(update_pfs_storage)); daemon.set_handler(EXPAND(remove_pfs_storage));