Loading examples/cxx/ADM_transfer_update.cpp 0 → 100644 +89 −0 Original line number Diff line number Diff line /****************************************************************************** * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). * * This file is part of scord. * * scord is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * scord is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with scord. If not, see <https://www.gnu.org/licenses/>. * * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ #include <fmt/format.h> #include <scord/scord.hpp> #include "common.hpp" #define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 #define NSOURCES 5 #define NTARGETS 5 #define NLIMITS 4 int main(int argc, char* argv[]) { test_info test_info{ .name = TESTNAME, .requires_server = true, .requires_controller = true, }; const auto cli_args = process_args(argc, argv, test_info); scord::server server{"tcp", cli_args.server_address}; const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); const auto qos_limits = prepare_qos_limits(NLIMITS); const auto mapping = scord::transfer::mapping::n_to_n; std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, scord::adhoc_storage::execution_mode::separate_new, scord::adhoc_storage::access_type::read_write, 100, false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { const auto adhoc_storage = scord::register_adhoc_storage( server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); scord::job::requirements reqs(inputs, outputs, adhoc_storage); const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); const auto transfer = scord::transfer_datasets( server, job, sources, targets, qos_limits, mapping); scord::transfer_update(server, transfer, 10.0f); fmt::print(stdout, "ADM_transfer_update remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_transfer_update() failed: {}\n", e.what()); exit(EXIT_FAILURE); } } examples/cxx/CMakeLists.txt +1 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update # qos ADM_set_qos_constraints ADM_get_qos_constraints # data operations Loading src/lib/c_wrapper.cpp +9 −0 Original line number Diff line number Diff line Loading @@ -245,6 +245,15 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, return ADM_SUCCESS; } ADM_return_t ADM_transfer_update(ADM_server_t server, ADM_transfer_t transfer, float obtained_bw) { return scord::detail::transfer_update( scord::server{server}, scord::transfer{transfer}, obtained_bw); } ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { Loading src/lib/detail/impl.cpp +32 −0 Original line number Diff line number Diff line Loading @@ -530,4 +530,36 @@ transfer_datasets(const server& srv, const job& job, return tl::make_unexpected(scord::error_code::other); } scord::error_code transfer_update(const server& srv, scord::transfer transfer, float obtained_bw) { network::client rpc_client{srv.protocol()}; const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc {:<} body: {{transfer_id: {}}}", rpc, transfer.id()); if(const auto& call_rv = endp.call(rpc.name(), transfer, obtained_bw); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_ERROR("rpc call failed"); return scord::error_code::other; } } // namespace scord::detail src/lib/detail/impl.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector<qos::limit>& limits, transfer::mapping mapping); scord::error_code transfer_update(const server& srv, scord::transfer transfer, float obtained_bw); } // namespace scord::detail #endif // SCORD_ADMIRE_IMPL_HPP Loading
examples/cxx/ADM_transfer_update.cpp 0 → 100644 +89 −0 Original line number Diff line number Diff line /****************************************************************************** * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain * * This software was partially supported by the EuroHPC-funded project ADMIRE * (Project ID: 956748, https://www.admire-eurohpc.eu). * * This file is part of scord. * * scord is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * scord is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with scord. If not, see <https://www.gnu.org/licenses/>. * * SPDX-License-Identifier: GPL-3.0-or-later *****************************************************************************/ #include <fmt/format.h> #include <scord/scord.hpp> #include "common.hpp" #define NJOB_NODES 50 #define NADHOC_NODES 25 #define NINPUTS 10 #define NOUTPUTS 5 #define NSOURCES 5 #define NTARGETS 5 #define NLIMITS 4 int main(int argc, char* argv[]) { test_info test_info{ .name = TESTNAME, .requires_server = true, .requires_controller = true, }; const auto cli_args = process_args(argc, argv, test_info); scord::server server{"tcp", cli_args.server_address}; const auto job_nodes = prepare_nodes(NJOB_NODES); const auto adhoc_nodes = prepare_nodes(NADHOC_NODES); const auto inputs = prepare_datasets("input-dataset-{}", NINPUTS); const auto outputs = prepare_datasets("output-dataset-{}", NOUTPUTS); const auto sources = prepare_datasets("source-dataset-{}", NSOURCES); const auto targets = prepare_datasets("target-dataset-{}", NTARGETS); const auto qos_limits = prepare_qos_limits(NLIMITS); const auto mapping = scord::transfer::mapping::n_to_n; std::string name = "adhoc_storage_42"; const auto adhoc_storage_ctx = scord::adhoc_storage::ctx{ cli_args.controller_address, scord::adhoc_storage::execution_mode::separate_new, scord::adhoc_storage::access_type::read_write, 100, false}; const auto adhoc_resources = scord::adhoc_storage::resources{adhoc_nodes}; try { const auto adhoc_storage = scord::register_adhoc_storage( server, name, scord::adhoc_storage::type::gekkofs, adhoc_storage_ctx, adhoc_resources); scord::job::requirements reqs(inputs, outputs, adhoc_storage); const auto job = scord::register_job( server, scord::job::resources{job_nodes}, reqs, 0); const auto transfer = scord::transfer_datasets( server, job, sources, targets, qos_limits, mapping); scord::transfer_update(server, transfer, 10.0f); fmt::print(stdout, "ADM_transfer_update remote procedure completed " "successfully\n"); exit(EXIT_SUCCESS); } catch(const std::exception& e) { fmt::print(stderr, "FATAL: ADM_transfer_update() failed: {}\n", e.what()); exit(EXIT_FAILURE); } }
examples/cxx/CMakeLists.txt +1 −1 Original line number Diff line number Diff line Loading @@ -30,7 +30,7 @@ list(APPEND cxx_examples_with_controller ADM_deploy_adhoc_storage ADM_terminate_adhoc_storage # transfers ADM_transfer_datasets ADM_get_transfer_priority ADM_set_transfer_priority ADM_cancel_transfer ADM_get_pending_transfers ADM_cancel_transfer ADM_get_pending_transfers ADM_transfer_update # qos ADM_set_qos_constraints ADM_get_qos_constraints # data operations Loading
src/lib/c_wrapper.cpp +9 −0 Original line number Diff line number Diff line Loading @@ -245,6 +245,15 @@ ADM_transfer_datasets(ADM_server_t server, ADM_job_t job, return ADM_SUCCESS; } ADM_return_t ADM_transfer_update(ADM_server_t server, ADM_transfer_t transfer, float obtained_bw) { return scord::detail::transfer_update( scord::server{server}, scord::transfer{transfer}, obtained_bw); } ADM_return_t ADM_set_dataset_information(ADM_server_t server, ADM_job_t job, ADM_dataset_t target, ADM_dataset_info_t info) { Loading
src/lib/detail/impl.cpp +32 −0 Original line number Diff line number Diff line Loading @@ -530,4 +530,36 @@ transfer_datasets(const server& srv, const job& job, return tl::make_unexpected(scord::error_code::other); } scord::error_code transfer_update(const server& srv, scord::transfer transfer, float obtained_bw) { network::client rpc_client{srv.protocol()}; const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); LOGGER_INFO("rpc {:<} body: {{transfer_id: {}}}", rpc, transfer.id()); if(const auto& call_rv = endp.call(rpc.name(), transfer, obtained_bw); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, resp.error_code(), resp.op_id()); return resp.error_code(); } } LOGGER_ERROR("rpc call failed"); return scord::error_code::other; } } // namespace scord::detail
src/lib/detail/impl.hpp +4 −0 Original line number Diff line number Diff line Loading @@ -86,6 +86,10 @@ transfer_datasets(const server& srv, const job& job, const std::vector<qos::limit>& limits, transfer::mapping mapping); scord::error_code transfer_update(const server& srv, scord::transfer transfer, float obtained_bw); } // namespace scord::detail #endif // SCORD_ADMIRE_IMPL_HPP