From 2389ad6931703cac7a27a12450c9ae9414dc0db3 Mon Sep 17 00:00:00 2001 From: Alberto Miranda <alberto.miranda@bsc.es> Date: Mon, 22 May 2023 16:16:50 +0200 Subject: [PATCH] logging: Refactor RPC formatting --- src/common/net/CMakeLists.txt | 4 +- src/common/net/utilities.hpp | 135 ++++++++++++++++ src/lib/detail/impl.cpp | 245 ++++++++++------------------- src/scord-ctl/rpc_server.cpp | 50 ++---- src/scord/rpc_server.cpp | 281 +++++++++++++--------------------- 5 files changed, 339 insertions(+), 376 deletions(-) create mode 100644 src/common/net/utilities.hpp diff --git a/src/common/net/CMakeLists.txt b/src/common/net/CMakeLists.txt index f73ca886..983e7df3 100644 --- a/src/common/net/CMakeLists.txt +++ b/src/common/net/CMakeLists.txt @@ -25,7 +25,7 @@ add_library(_rpc_client STATIC) target_sources( _rpc_client - INTERFACE endpoint.hpp client.hpp request.hpp serialization.hpp + INTERFACE endpoint.hpp client.hpp request.hpp serialization.hpp utilities.hpp PRIVATE endpoint.cpp client.cpp ) @@ -35,7 +35,7 @@ set_property(TARGET _rpc_client PROPERTY POSITION_INDEPENDENT_CODE ON) add_library(_rpc_server STATIC) target_sources( _rpc_server - INTERFACE endpoint.hpp server.hpp request.hpp serialization.hpp + INTERFACE endpoint.hpp server.hpp request.hpp serialization.hpp utilities.hpp PRIVATE server.cpp endpoint.cpp ) diff --git a/src/common/net/utilities.hpp b/src/common/net/utilities.hpp new file mode 100644 index 00000000..cc1dda42 --- /dev/null +++ b/src/common/net/utilities.hpp @@ -0,0 +1,135 @@ +/****************************************************************************** + * Copyright 2021-2023, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see <https://www.gnu.org/licenses/>. + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + + +#ifndef NETWORK_UTILITIES_HPP +#define NETWORK_UTILITIES_HPP + +#include <cstdint> +#include <optional> +#include <string> +#include <string_view> +#include <atomic> +#include <fmt/format.h> + +namespace network { + +class rpc_info { +private: + static std::uint64_t + new_id() { + static std::atomic_uint64_t s_current_id; + return s_current_id++; + } + +public: + rpc_info(std::uint64_t id, std::string name, std::string address) + : m_id(id), m_children(0), m_name(std::move(name)), + m_address(std::move(address)) {} + + rpc_info(std::uint64_t id, std::uint64_t pid, std::string name, + std::string address) + : m_id(id), m_pid(pid), m_children(0), m_name(std::move(name)), + m_address(std::move(address)) {} + + template <typename... Args> + static rpc_info + create(Args&&... args) { + return {new_id(), std::forward<Args>(args)...}; + } + + template <typename... Args> + rpc_info + add_child(std::string address) const { + return {m_children, m_id, m_name, std::move(address)}; + } + + constexpr std::uint64_t + id() const { + return m_id; + } + + constexpr std::optional<std::uint64_t> + pid() const { + return m_pid; + } + + const std::string& + name() const { + return m_name; + } + + const std::string& + address() const { + return m_address; + } + +private: + std::uint64_t m_id; + std::optional<std::uint64_t> m_pid; + std::uint64_t m_children; + std::string m_name; + std::string m_address; +}; + +} // namespace network + +template <> +struct fmt::formatter<network::rpc_info> { + + // RPC direction format: + // '<': from self to target (outbound) + // '>': from target to self (inbound) + bool m_outbound = true; + + // Parses format specifications in the form: + constexpr auto + parse(format_parse_context& ctx) { + auto it = ctx.begin(), end = ctx.end(); + + if(it != end && (*it == '<' || *it == '>')) { + m_outbound = *it++ == '<'; + } + + if(it != end && *it != '}') { + ctx.on_error("invalid format"); + } + + return it; + } + + template <typename FormatContext> + constexpr auto + format(const network::rpc_info& rpc, FormatContext& ctx) const { + format_to(ctx.out(), "{}{} id: {} name: {} ", m_outbound ? "<=" : "=>", + rpc.pid() ? fmt::format(" pid: {}", *rpc.pid()) : "", + rpc.id(), std::quoted(rpc.name())); + return m_outbound ? format_to(ctx.out(), "to: {}", + std::quoted(rpc.address())) + : format_to(ctx.out(), "from: {}", + std::quoted(rpc.address())); + } +}; + +#endif // NETWORK_UTILITIES_HPP diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index 8cc989b0..4ffb890d 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -27,6 +27,7 @@ #include <net/endpoint.hpp> #include <net/request.hpp> #include <net/serialization.hpp> +#include <net/utilities.hpp> #include <scord/types.hpp> #include "impl.hpp" @@ -46,33 +47,28 @@ struct remote_procedure { namespace scord::detail { +#define RPC_NAME() ("ADM_"s + __FUNCTION__) + scord::error_code ping(const server& srv) { network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address())); + LOGGER_INFO("rpc {:<} body: {{}}", rpc); - if(const auto call_rv = endp.call("ADM_"s + __FUNCTION__); - call_rv.has_value()) { + if(const auto call_rv = endp.call(rpc.name()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -89,32 +85,25 @@ register_job(const server& srv, const job::resources& job_resources, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO( - "rpc id: {} name: {} from: {} => " - "body: {{job_resources: {}, job_requirements: {}, slurm_id: " - "{}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), job_resources, - job_requirements, slurm_id); + LOGGER_INFO("rpc {:<} body: {{job_resources: {}, job_requirements: {}, " + "slurm_id: {}}}", + rpc, job_resources, job_requirements, slurm_id); - if(const auto call_rv = endp.call("ADM_"s + __FUNCTION__, job_resources, + if(const auto call_rv = endp.call(rpc.name(), job_resources, job_requirements, slurm_id); call_rv.has_value()) { const network::response_with_id resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}, job_id: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.value(), resp.op_id()); + "rpc {:>} body: {{retval: {}, job_id: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); if(const auto ec = resp.error_code(); !ec) { return tl::make_unexpected(resp.error_code()); @@ -134,30 +123,23 @@ update_job(const server& srv, const job& job, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_id: {}, new_resources: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), job.id(), - new_resources); + LOGGER_INFO("rpc {:<} body: {{job_id: {}, new_resources: {}}}", rpc, + job.id(), new_resources); - if(const auto& call_rv = - endp.call("ADM_"s + __FUNCTION__, job.id(), new_resources); + if(const auto& call_rv = endp.call(rpc.name(), job.id(), new_resources); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } } @@ -171,28 +153,22 @@ remove_job(const server& srv, const job& job) { network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_id: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), job.id()); + LOGGER_INFO("rpc {:<} body: {{job_id: {}}}", rpc, job.id()); - if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id()); + if(const auto& call_rv = endp.call(rpc.name(), job.id()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } } @@ -209,31 +185,26 @@ register_adhoc_storage(const server& srv, const std::string& name, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{name: {}, type: {}, adhoc_ctx: {}, " + LOGGER_INFO("rpc {:<} body: {{name: {}, type: {}, adhoc_ctx: {}, " "adhoc_resources: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), name, type, ctx, - resources); + rpc, name, type, ctx, resources); - if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, name, type, - ctx, resources); + if(const auto& call_rv = + endp.call(rpc.name(), name, type, ctx, resources); call_rv.has_value()) { const network::response_with_id resp{call_rv.value()}; - LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}, adhoc_id: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.value(), resp.op_id()); + LOGGER_EVAL( + resp.error_code(), INFO, ERROR, + "rpc {:>} body: {{retval: {}, adhoc_id: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); if(const auto ec = resp.error_code(); !ec) { return tl::make_unexpected(ec); @@ -254,30 +225,24 @@ update_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}, new_resources: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), adhoc_storage.id(), - new_resources); + LOGGER_INFO("rpc {:<} body: {{adhoc_id: {}, new_resources: {}}}", rpc, + adhoc_storage.id(), new_resources); - if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, - adhoc_storage.id(), new_resources); + if(const auto& call_rv = + endp.call(rpc.name(), adhoc_storage.id(), new_resources); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -292,29 +257,22 @@ remove_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), adhoc_storage.id()); + LOGGER_INFO("rpc {:<} body: {{adhoc_id: {}}}", rpc, adhoc_storage.id()); - if(const auto& call_rv = - endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); + if(const auto& call_rv = endp.call(rpc.name(), adhoc_storage.id()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -330,29 +288,23 @@ register_pfs_storage(const server& srv, const std::string& name, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{name: {}, type: {}, pfs_ctx: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), name, type, ctx); + LOGGER_INFO("rpc {:<} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, + name, type, ctx); - if(const auto& call_rv = - endp.call("ADM_"s + __FUNCTION__, name, type, ctx); + if(const auto& call_rv = endp.call(rpc.name(), name, type, ctx); call_rv.has_value()) { const network::response_with_id resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}, pfs_id: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.value(), resp.op_id()); + "rpc {:>} body: {{retval: {}, pfs_id: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); if(const auto ec = resp.error_code(); !ec) { return tl::make_unexpected(ec); @@ -372,30 +324,24 @@ update_pfs_storage(const server& srv, const pfs_storage& pfs_storage, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{pfs_id: {}, new_ctx: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), pfs_storage.id(), - new_ctx); + LOGGER_INFO("rpc {:<} body: {{pfs_id: {}, new_ctx: {}}}", rpc, + pfs_storage.id(), new_ctx); if(const auto& call_rv = - endp.call("ADM_"s + __FUNCTION__, pfs_storage.id(), new_ctx); + endp.call(rpc.name(), pfs_storage.id(), new_ctx); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -410,29 +356,22 @@ remove_pfs_storage(const server& srv, const pfs_storage& pfs_storage) { network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{pfs_id: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), pfs_storage.id()); + LOGGER_INFO("rpc {:<} body: {{pfs_id: {}}}", rpc, pfs_storage.id()); - if(const auto& call_rv = - endp.call("ADM_"s + __FUNCTION__, pfs_storage.id()); + if(const auto& call_rv = endp.call(rpc.name(), pfs_storage.id()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -447,29 +386,22 @@ deploy_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), adhoc_storage.id()); + LOGGER_INFO("rpc {:<} body: {{adhoc_id: {}}}", rpc, adhoc_storage.id()); - if(const auto& call_rv = - endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); + if(const auto& call_rv = endp.call(rpc.name(), adhoc_storage.id()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -484,29 +416,22 @@ tear_down_adhoc_storage(const server& srv, const adhoc_storage& adhoc_storage) { network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), adhoc_storage.id()); + LOGGER_INFO("rpc {:<} body: {{adhoc_id: {}}}", rpc, adhoc_storage.id()); - if(const auto call_rv = - endp.call("ADM_"s + __FUNCTION__, adhoc_storage.id()); + if(const auto call_rv = endp.call(rpc.name(), adhoc_storage.id()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", rpc, + resp.error_code(), resp.op_id()); return resp.error_code(); } @@ -525,31 +450,25 @@ transfer_datasets(const server& srv, const job& job, network::client rpc_client{srv.protocol()}; - const auto rpc_id = ::api::remote_procedure::new_id(); + const auto rpc = network::rpc_info::create(RPC_NAME(), srv.address()); if(const auto& lookup_rv = rpc_client.lookup(srv.address()); lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_id: {}, sources: {}, targets: {}, limits: {}, " - "mapping: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(rpc_client.self_address()), job.id(), sources, - targets, limits, mapping); + LOGGER_INFO("rpc {:<} body: {{job_id: {}, sources: {}, targets: {}, " + "limits: {}, mapping: {}}}", + rpc, job.id(), sources, targets, limits, mapping); - if(const auto& call_rv = endp.call("ADM_"s + __FUNCTION__, job.id(), - sources, targets, limits, mapping); + if(const auto& call_rv = endp.call(rpc.name(), job.id(), sources, + targets, limits, mapping); call_rv.has_value()) { const network::response_with_id resp{call_rv.value()}; LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}, tx_id: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), resp.error_code(), - resp.value(), resp.op_id()); + "rpc {:>} body: {{retval: {}, tx_id: {}}} [op_id: {}]", + rpc, resp.error_code(), resp.value(), resp.op_id()); if(const auto ec = resp.error_code(); !ec) { return tl::make_unexpected(ec); diff --git a/src/scord-ctl/rpc_server.cpp b/src/scord-ctl/rpc_server.cpp index f8c9f4e4..bb7da914 100644 --- a/src/scord-ctl/rpc_server.cpp +++ b/src/scord-ctl/rpc_server.cpp @@ -24,6 +24,7 @@ #include <net/request.hpp> #include <net/serialization.hpp> +#include <net/utilities.hpp> #include "rpc_server.hpp" // required for ::waitpid() @@ -32,14 +33,6 @@ using namespace std::literals; -struct remote_procedure { - static std::uint64_t - new_id() { - static std::atomic_uint64_t current_id; - return current_id++; - } -}; - namespace scord_ctl { rpc_server::rpc_server(std::string name, std::string address, bool daemonize, @@ -56,24 +49,22 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, #undef EXPAND } +#define RPC_NAME() ("ADM_"s + __FUNCTION__) + void rpc_server::ping(const network::request& req) { using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req))); + LOGGER_INFO("rpc {:>} body: {{}}", rpc); - const auto resp = generic_response{rpc_id, scord::error_code::success}; + const auto resp = generic_response{rpc.id(), scord::error_code::success}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, scord::error_code::success); req.respond(resp); @@ -88,15 +79,12 @@ rpc_server::deploy_adhoc_storage( using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{type: {}, ctx: {}, resources: {}}}", - rpc_id, std::quoted(__FUNCTION__), - std::quoted(get_address(req)), adhoc_type, adhoc_ctx, - adhoc_resources); + LOGGER_INFO("rpc {:>} body: {{type: {}, ctx: {}, resources: {}}}", rpc, + adhoc_type, adhoc_ctx, adhoc_resources); auto ec = scord::error_code::success; @@ -133,10 +121,7 @@ rpc_server::deploy_adhoc_storage( } case -1: { ec = scord::error_code::other; - LOGGER_ERROR("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), - std::quoted(get_address(req)), ec); + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, ec); break; } default: { @@ -145,7 +130,7 @@ rpc_server::deploy_adhoc_storage( if(retwait == -1) { LOGGER_ERROR( "rpc id: {} error_msg: \"Error waitpid code: {}\"", - rpc_id, retwait); + rpc.id(), retwait); ec = scord::error_code::other; } else { if(WEXITSTATUS(wstatus) != 0) { @@ -159,12 +144,9 @@ rpc_server::deploy_adhoc_storage( } } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } diff --git a/src/scord/rpc_server.cpp b/src/scord/rpc_server.cpp index 6055a7ad..fd63ea01 100644 --- a/src/scord/rpc_server.cpp +++ b/src/scord/rpc_server.cpp @@ -26,18 +26,11 @@ #include <net/request.hpp> #include <net/endpoint.hpp> #include <net/serialization.hpp> +#include <net/utilities.hpp> #include "rpc_server.hpp" using namespace std::literals; -struct remote_procedure { - static std::uint64_t - new_id() { - static std::atomic_uint64_t current_id; - return current_id++; - } -}; - namespace scord { rpc_server::rpc_server(std::string name, std::string address, bool daemonize, @@ -65,24 +58,22 @@ rpc_server::rpc_server(std::string name, std::string address, bool daemonize, #undef EXPAND } +#define RPC_NAME() ("ADM_"s + __FUNCTION__) + void rpc_server::ping(const network::request& req) { using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req))); + LOGGER_INFO("rpc {:>} body: {{}}", rpc); - const auto resp = generic_response{rpc_id, scord::error_code::success}; + const auto resp = generic_response{rpc.id(), scord::error_code::success}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, scord::error_code::success); req.respond(resp); @@ -96,15 +87,13 @@ rpc_server::register_job(const network::request& req, using network::get_address; using network::response_with_id; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_resources: {}, job_requirements: {}, slurm_id: " - "{}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - job_resources, job_requirements, slurm_id); + LOGGER_INFO("rpc {:>} body: {{job_resources: {}, job_requirements: {}, " + "slurm_id: {}}}", + rpc, job_resources, job_requirements, slurm_id); scord::error_code ec; std::optional<scord::job_id> job_id; @@ -128,18 +117,15 @@ rpc_server::register_job(const network::request& req, job_id = job_info->job().id(); } else { - LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", rpc_id, - jm_result.error()); + LOGGER_ERROR("rpc id: {} error_msg: \"Error creating job: {}\"", + rpc.id(), jm_result.error()); ec = jm_result.error(); } respond: - const auto resp = response_with_id{rpc_id, ec, job_id}; + const auto resp = response_with_id{rpc.id(), ec, job_id}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}, job_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec, job_id); + LOGGER_INFO("rpc {:<} body: {{retval: {}, job_id: {}}}", rpc, ec, job_id); req.respond(resp); } @@ -150,28 +136,23 @@ rpc_server::update_job(const network::request& req, scord::job_id job_id, using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_id: {}, new_resources: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - job_id, new_resources); + LOGGER_INFO("rpc {:>} body: {{job_id: {}, new_resources: {}}}", rpc, job_id, + new_resources); const auto ec = m_job_manager.update(job_id, new_resources); if(!ec) { - LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", rpc_id, - ec); + LOGGER_ERROR("rpc id: {} error_msg: \"Error updating job: {}\"", + rpc.id(), ec); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -181,14 +162,11 @@ rpc_server::remove_job(const network::request& req, scord::job_id job_id) { using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{job_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - job_id); + LOGGER_INFO("rpc {:>} body: {{job_id: {}}}", rpc, job_id); scord::error_code ec; const auto jm_result = m_job_manager.remove(job_id); @@ -203,17 +181,14 @@ rpc_server::remove_job(const network::request& req, scord::job_id job_id) { ec = m_adhoc_manager.remove_client_info(adhoc_storage->id()); } } else { - LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, - job_id); + LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", + rpc.id(), job_id); ec = jm_result.error(); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -227,15 +202,13 @@ rpc_server::register_adhoc_storage( using network::get_address; using network::response_with_id; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{name: {}, type: {}, adhoc_ctx: {}, " + LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, adhoc_ctx: {}, " "adhoc_resources: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - name, type, ctx, resources); + rpc, name, type, ctx, resources); scord::error_code ec; std::optional<std::uint64_t> adhoc_id; @@ -248,16 +221,14 @@ rpc_server::register_adhoc_storage( } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating adhoc_storage: " "{}\"", - rpc_id, am_result.error()); + rpc.id(), am_result.error()); ec = am_result.error(); } - const auto resp = response_with_id{rpc_id, ec, adhoc_id}; + const auto resp = response_with_id{rpc.id(), ec, adhoc_id}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}, adhoc_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec, adhoc_id); + LOGGER_INFO("rpc {:<} body: {{retval: {}, adhoc_id: {}}}", rpc, ec, + adhoc_id); req.respond(resp); } @@ -269,13 +240,11 @@ rpc_server::update_adhoc_storage( using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}, new_resources: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), + LOGGER_INFO("rpc {:>} body: {{adhoc_id: {}, new_resources: {}}}", rpc, adhoc_id, new_resources); const auto ec = m_adhoc_manager.update(adhoc_id, new_resources); @@ -283,15 +252,12 @@ rpc_server::update_adhoc_storage( if(!ec) { LOGGER_ERROR( "rpc id: {} error_msg: \"Error updating adhoc_storage: {}\"", - rpc_id, ec); + rpc.id(), ec); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -302,28 +268,22 @@ rpc_server::remove_adhoc_storage(const network::request& req, using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - adhoc_id); + LOGGER_INFO("rpc {:>} body: {{adhoc_id: {}}}", rpc, adhoc_id); scord::error_code ec = m_adhoc_manager.remove(adhoc_id); if(!ec) { - LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", rpc_id, - adhoc_id); + LOGGER_ERROR("rpc id: {} error_msg: \"Error removing job: {}\"", + rpc.id(), adhoc_id); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -334,14 +294,11 @@ rpc_server::deploy_adhoc_storage(const network::request& req, using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - adhoc_id); + LOGGER_INFO("rpc {:>} body: {{adhoc_id: {}}}", rpc, adhoc_id); auto ec = scord::error_code::success; @@ -356,41 +313,37 @@ rpc_server::deploy_adhoc_storage(const network::request& req, lookup_rv.has_value()) { const auto& endp = lookup_rv.value(); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{type: {}, ctx: {}, resources: {}}}", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(self_address()), adhoc_storage.type(), + const auto child_rpc = + rpc.add_child(adhoc_storage.context().controller_address()); + + LOGGER_INFO("rpc {:<} body: {{type: {}, ctx: {}, resources: {}}}", + child_rpc, adhoc_storage.type(), adhoc_storage.context(), adhoc_storage.get_resources()); - if(const auto call_rv = endp.call( - "ADM_"s + __FUNCTION__, adhoc_storage.type(), - adhoc_storage.context(), adhoc_storage.get_resources()); + if(const auto call_rv = endp.call(rpc.name(), adhoc_storage.type(), + adhoc_storage.context(), + adhoc_storage.get_resources()); call_rv.has_value()) { const network::generic_response resp{call_rv.value()}; ec = resp.error_code(); LOGGER_EVAL(resp.error_code(), INFO, ERROR, - "rpc id: {} name: {} from: {} <= " - "body: {{retval: {}}} [op_id: {}]", - rpc_id, std::quoted("ADM_"s + __FUNCTION__), - std::quoted(endp.address()), ec, resp.op_id()); + "rpc {:>} body: {{retval: {}}} [op_id: {}]", + child_rpc, ec, resp.op_id()); + } else { + ec = error_code::snafu; + LOGGER_ERROR("rpc call failed"); } } } else { ec = am_result.error(); - LOGGER_ERROR("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), - std::quoted(get_address(req)), ec); + LOGGER_ERROR("rpc {:<} body: {{retval: {}}}", rpc, ec); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -401,22 +354,17 @@ rpc_server::tear_down_adhoc_storage(const network::request& req, using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{adhoc_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - adhoc_id); + LOGGER_INFO("rpc {:>} body: {{adhoc_id: {}}}", rpc, adhoc_id); // TODO: actually tear down the adhoc storage instance - const auto resp = generic_response{rpc_id, scord::error_code::success}; + const auto resp = generic_response{rpc.id(), scord::error_code::success}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, scord::error_code::success); req.respond(resp); @@ -430,14 +378,12 @@ rpc_server::register_pfs_storage(const network::request& req, using network::get_address; using network::response_with_id; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{name: {}, type: {}, pfs_ctx: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - name, type, ctx); + LOGGER_INFO("rpc {:>} body: {{name: {}, type: {}, pfs_ctx: {}}}", rpc, name, + type, ctx); scord::error_code ec; std::optional<std::uint64_t> pfs_id = 0; @@ -448,16 +394,13 @@ rpc_server::register_pfs_storage(const network::request& req, pfs_id = adhoc_storage_info->pfs_storage().id(); } else { LOGGER_ERROR("rpc id: {} error_msg: \"Error creating pfs_storage: {}\"", - rpc_id, pm_result.error()); + rpc.id(), pm_result.error()); ec = pm_result.error(); } - const auto resp = response_with_id{rpc_id, ec, pfs_id}; + const auto resp = response_with_id{rpc.id(), ec, pfs_id}; - LOGGER_INFO("rpc id: {} name: {} to: {} => " - "body: {{retval: {}, pfs_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec, pfs_id); + LOGGER_INFO("rpc {:<} body: {{retval: {}, pfs_id: {}}}", rpc, ec, pfs_id); req.respond(resp); } @@ -469,28 +412,23 @@ rpc_server::update_pfs_storage(const network::request& req, using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{pfs_id: {}, new_ctx: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - pfs_id, new_ctx); + LOGGER_INFO("rpc {:>} body: {{pfs_id: {}, new_ctx: {}}}", rpc, pfs_id, + new_ctx); const auto ec = m_pfs_manager.update(pfs_id, new_ctx); if(!ec) { LOGGER_ERROR("rpc id: {} error_msg: \"Error updating pfs_storage: {}\"", - rpc_id, ec); + rpc.id(), ec); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} => " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -501,28 +439,22 @@ rpc_server::remove_pfs_storage(const network::request& req, using network::generic_response; using network::get_address; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO("rpc id: {} name: {} from: {} => " - "body: {{pfs_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - pfs_id); + LOGGER_INFO("rpc {:>} body: {{pfs_id: {}}}", rpc, pfs_id); scord::error_code ec = m_pfs_manager.remove(pfs_id); if(!ec) { LOGGER_ERROR("rpc id: {} error_msg: \"Error removing pfs storage: {}\"", - rpc_id, pfs_id); + rpc.id(), pfs_id); } - const auto resp = generic_response{rpc_id, ec}; + const auto resp = generic_response{rpc.id(), ec}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec); + LOGGER_INFO("rpc {:<} body: {{retval: {}}}", rpc, ec); req.respond(resp); } @@ -536,15 +468,13 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, using network::get_address; using network::response_with_id; + using network::rpc_info; - const auto rpc_name = "ADM_"s + __FUNCTION__; - const auto rpc_id = remote_procedure::new_id(); + const auto rpc = rpc_info::create(RPC_NAME(), get_address(req)); - LOGGER_INFO( - "rpc id: {} name: {} from: {} => " - "body: {{job_id: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - job_id, sources, targets, limits, mapping); + LOGGER_INFO("rpc {:>} body: {{job_id: {}, sources: {}, targets: {}, " + "limits: {}, mapping: {}}}", + rpc, job_id, sources, targets, limits, mapping); scord::error_code ec; @@ -554,12 +484,9 @@ rpc_server::transfer_datasets(const network::request& req, scord::job_id job_id, // actually request it tx_id = 42; - const auto resp = response_with_id{rpc_id, ec, tx_id}; + const auto resp = response_with_id{rpc.id(), ec, tx_id}; - LOGGER_INFO("rpc id: {} name: {} to: {} <= " - "body: {{retval: {}, tx_id: {}}}", - rpc_id, std::quoted(rpc_name), std::quoted(get_address(req)), - ec, tx_id); + LOGGER_INFO("rpc {:<} body: {{retval: {}, tx_id: {}}}", rpc, ec, tx_id); req.respond(resp); } -- GitLab