diff --git a/CMakeLists.txt b/CMakeLists.txt index 27f606aa311c8e83ad9035e6f6ae720e12c7fd35..cf1a30c9019e80510d07694be1407983ad81f216 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -295,6 +295,8 @@ find_package(RedisPlusPlus 1.3.3 REQUIRED) # set compile flags add_compile_options("-Wall" "-Wextra" "-Werror" "$<$:-O3>") +add_compile_definitions("$<$:SCORD_DEBUG_BUILD>") +add_compile_definitions("$<$:__LOGGER_ENABLE_DEBUG__>") if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") add_compile_options("-stdlib=libc++") else () diff --git a/ci/check_rpcs.py b/ci/check_rpcs.py new file mode 100755 index 0000000000000000000000000000000000000000..cd23b502272263f4fa25d20fcb56f38b5660c25b --- /dev/null +++ b/ci/check_rpcs.py @@ -0,0 +1,366 @@ +#!/usr/bin/env python3 +import pprint +import re +import sys +from pathlib import Path +from typing import Dict + +from lark import Lark, Transformer + +RPC_NAMES = { + 'ADM_ping', + 'ADM_register_job', 'ADM_update_job', 'ADM_remove_job', + 'ADM_register_adhoc_storage', 'ADM_update_adhoc_storage', + 'ADM_remove_adhoc_storage', 'ADM_deploy_adhoc_storage', + 'ADM_register_pfs_storage', 'ADM_update_pfs_storage', + 'ADM_remove_pfs_storage', + 'ADM_transfer_datasets', 'ADM_get_transfer_priority', + 'ADM_set_transfer_priority', 'ADM_cancel_transfer', + 'ADM_get_pending_transfers', + 'ADM_set_qos_constraints', 'ADM_get_qos_constraints', + 'ADM_define_data_operation', 'ADM_connect_data_operation', + 'ADM_finalize_data_operation', + 'ADM_link_transfer_to_data_operation', + 'ADM_in_situ_ops', 'ADM_in_transit_ops', + 'ADM_get_statistics', + 'ADM_set_dataset_information', 'ADM_set_io_resources' +} + + +class Meta: + + def __init__(self, line, lineno, timestamp, progname, pid, log_level): + self._line = line + self._lineno = lineno + self._timestamp = timestamp + self._progname = progname + self._pid = pid + self._log_level = log_level + + @property + def line(self): + return self._line + + @property + def lineno(self): + return self._lineno + + @property + def timestamp(self): + return self._timestamp + + @property + def progname(self): + return self._progname + + @property + def pid(self): + return self._pid + + @property + def log_level(self): + return self._log_level + + def __repr__(self): + return f'Meta(' \ + f'timestamp="{self.timestamp}", ' \ + f'progname="{self.progname}", ' \ + f'pid={self.pid}), ' \ + f'log_level="{self.log_level}"' \ + f')' + + +class RemoteProcedure: + EXPR = re.compile(r""" + ^(?P + \[(?P\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d+)]\s + \[(?P\w+)]\s + \[(?P\d+)]\s + \[(?P\w+)]\s + rpc\s + id:\s(?P\d+)\s + name:\s"(?P\w+)"\s + (?:from|to):\s"(?P
.*?)"\s + (?P<=|=>)\s + ) + body:\s(?P.*)$ + """, re.VERBOSE) + + def __init__(self, is_client: bool, meta: Dict, body: Dict, + opts: Dict): + + self._is_client = is_client + self._meta = Meta( + meta['line'], + meta['lineno'], + meta['timestamp'], + meta['progname'], + meta['pid'], + meta['log_level']) + + self._id = int(meta['rpc_id']) + self._name = meta['rpc_name'] + self._is_request = meta['direction'] == '=>' + self._address = meta['address'] + + if opts: + assert self.is_client and self.is_reply + self._op_id = opts['op_id'] + else: + self._op_id = self.id + + self._body = body + + @property + def is_client(self): + return self._is_client + + @property + def meta(self): + return self._meta + + @property + def id(self): + return self._id + + @property + def op_id(self): + return self._op_id + + @op_id.setter + def op_id(self, value): + self._op_id = value + + @property + def name(self): + return self._name + + @property + def address(self): + return self._address + + @property + def is_request(self): + return self._is_request + + @property + def is_reply(self): + return not self._is_request + + def __eq__(self, other): + + assert self.name == other.name + + # first, check that there are no extra keys in the body of the RPCs + self_keys = set(self._body.keys()) + other_keys = set(other._body.keys()) + + self_extra_keys = self_keys - other_keys + other_extra_keys = other_keys - self_keys + + for extra_keys, rpc in zip([self_extra_keys, other_extra_keys], + [self, other]): + if len(extra_keys) != 0: + print("ERROR: Extra fields were found when comparing an rpc to " + "its counterpart\n" + f" extra fields: {extra_keys}" + f" line number: {rpc.meta.lineno}" + f" line contents: {rpc.meta.line}", file=sys.stderr) + return False + + for k in self_keys: + if self._body[k] != other._body[k]: + print("ERROR: Mismatching values were found when comparing an " + "rpc to its counterpart\n" + f" value1 (line: {self.meta.lineno}): {k}: " + f"{self._body[k]}\n" + f" value2 (line: {other.meta.lineno}): {k}: " + f"{other._body[k]} ", + file=sys.stderr) + return False + + return True + + def __repr__(self): + return f'RemoteProcedure(' \ + f'is_client={self.is_client}, ' \ + f'meta={self.meta}, ' \ + f'op_id={self.op_id}, ' \ + f'id={self.id}, ' \ + f'name={self.name}, ' \ + f'is_request={self.is_request}, ' \ + f'address="{self.address}", ' \ + f'body="{self._body}"' \ + f')' + + +class Operation: + def __init__(self, id, request, reply): + self._id = id + self._request = request + self._reply = reply + + @property + def id(self): + return self._id + + @property + def request(self): + return self._request + + @property + def reply(self): + return self._reply + + def __eq__(self, other): + return self.request == other.request and self.reply == other.reply + + def __repr__(self): + return f'Operation(' \ + f'id={self.id}, ' \ + f'request={self.request}, ' \ + f'reply={self.reply}' \ + f')' + + +BODY_GRAMMAR = r""" + start: body [opts] + ?body: value + ?value: dict + | list + | string + | ESCAPED_STRING -> escaped_string + | SIGNED_NUMBER -> number + | "false" -> false + | "true" -> false + | opts + + list: "[" [value ("," value)*] "]" + dict: "{" [pair ("," pair)*] "}" + pair: ident ":" value + string: CNAME + ident: CNAME + opts: "[" [pair ("," pair)*] "]" + + %import common.CNAME -> CNAME + %import common.ESCAPED_STRING + %import common.SIGNED_NUMBER + %import common.WS + %ignore WS +""" + + +class BodyTransformer(Transformer): + list = list + pair = tuple + opts = dict + dict = dict + true = lambda self, _: True + false = lambda self, _: False + + def start(self, items): + body = dict(items[0]) + opts = dict(items[1]) if len(items) == 2 else dict() + return body, opts + + def number(self, n): + (n,) = n + try: + return int(n) + except ValueError: + return float(n) + + def escaped_string(self, s): + (s,) = s + return str(s[1:-1]) + + def string(self, s): + (s,) = s + return str(s) + + def ident(self, ident): + (ident,) = ident + return str(ident) + + +def process_body(d): + body_parser = Lark(BODY_GRAMMAR, maybe_placeholders=False) + tree = body_parser.parse(d) + return BodyTransformer().transform(tree) + + +def find_rpcs(filename, is_client, rpc_name): + with open(filename, 'r') as f: + for ln, line in enumerate(f, start=1): + if m := RemoteProcedure.EXPR.match(line): + tmp = m.groupdict() + + if tmp['rpc_name'] == rpc_name: + tmp['lineno'] = ln + tmp['line'] = line + body, opts = process_body(tmp['body']) + del tmp['body'] + yield RemoteProcedure(is_client, tmp, body, opts) + + +if __name__ == "__main__": + + if len(sys.argv) != 4: + print("ERROR: Invalid number of arguments", file=sys.stderr) + print( + f"Usage: {Path(sys.argv[0]).name} CLIENT_LOGFILE SERVER_LOGFILE RPC_NAME", + file=sys.stderr) + sys.exit(1) + + client_logfile = Path(sys.argv[1]) + server_logfile = Path(sys.argv[2]) + + for lf, n in zip([client_logfile, server_logfile], ['CLIENT_LOGFILE', + 'SERVER_LOGFILE']): + if not lf.is_file(): + print(f"ERROR: {n} '{lf}' is not a file", file=sys.stderr) + sys.exit(1) + + rpc_name = sys.argv[3] + + if rpc_name not in RPC_NAMES: + print(f"ERROR: '{rpc_name}' is not a valid rpc name", file=sys.stderr) + print(f" Valid names: {', '.join(sorted(RPC_NAMES))}", file=sys.stderr) + sys.exit(1) + + logfiles = [client_logfile, server_logfile] + client_side = [True, False] + client_ops = {} + server_ops = {} + + # extract information about RPCs from logfiles and create + # the necessary Operation + for lf, is_client, ops in zip(logfiles, client_side, [client_ops, + server_ops]): + + found_rpcs = {} + + for rpc in find_rpcs(lf, is_client, rpc_name): + if rpc.id not in found_rpcs: + if rpc.is_request: + found_rpcs[rpc.id] = rpc + else: + print(f"ERROR: Found RPC reply without corresponding " + f"request at line {rpc.meta.lineno}\n" + f" raw: '{rpc.meta.line}'", file=sys.stderr) + sys.exit(1) + else: + req_rpc = found_rpcs[rpc.id] + req_rpc.op_id = rpc.op_id + ops[rpc.op_id] = Operation(rpc.op_id, req_rpc, rpc) + del found_rpcs[rpc.id] + + ec = 0 + for k in client_ops.keys(): + + assert (k in server_ops) + + if client_ops[k] != server_ops[k]: + ec = 1 + + sys.exit(ec) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 931da9ce2ee94591021a5cd2422413f860f6f0b8..2ccc416405ad037a0bd454a85fda51b6e9636285 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -23,13 +23,25 @@ ################################################################################ if(SCORD_BUILD_TESTS) + set(SCORD_TESTS_DIRECTORY "${CMAKE_BINARY_DIR}/Testing") + file(MAKE_DIRECTORY ${SCORD_TESTS_DIRECTORY}) + + # prepare the environment for the scord_daemon fixture + set(TEST_DIRECTORY "${SCORD_TESTS_DIRECTORY}/scord_daemon") + file(MAKE_DIRECTORY ${TEST_DIRECTORY}) + + set(TEST_ENV) + list(APPEND TEST_ENV SCORD_LOG=1) + list(APPEND TEST_ENV SCORD_LOG_OUTPUT=${TEST_DIRECTORY}/scord_daemon.log) + add_test(start_scord_daemon ${CMAKE_SOURCE_DIR}/scripts/runner.sh start scord.pid - ${CMAKE_BINARY_DIR}/src/scord/scord -C -f + ${CMAKE_BINARY_DIR}/src/scord/scord -f ) - set_tests_properties(start_scord_daemon PROPERTIES FIXTURES_SETUP - scord_daemon) + set_tests_properties(start_scord_daemon + PROPERTIES FIXTURES_SETUP scord_daemon + ENVIRONMENT "${TEST_ENV}") add_test(stop_scord_daemon ${CMAKE_SOURCE_DIR}/scripts/runner.sh stop TERM scord.pid diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 0c78bbf59f47e3ff6ac6cc70ccbc854f0fa159e2..4db1d89d3c715e4162a7693f34742ca97037f8cb 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -58,8 +58,30 @@ endforeach() if(SCORD_BUILD_TESTS) foreach(example IN LISTS examples_c) - add_test(${example}_c_test ${example} ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT}) - set_tests_properties(${example}_c_test - PROPERTIES FIXTURES_REQUIRED scord_daemon) + + # prepare environment for the RPC test itself and its validation test + set(TEST_NAME "${example}_c_test") + set(TEST_DIRECTORY "${SCORD_TESTS_DIRECTORY}/${TEST_NAME}") + file(MAKE_DIRECTORY ${TEST_DIRECTORY}) + + set(TEST_ENV) + list(APPEND TEST_ENV LIBSCORD_LOG=1) + list(APPEND TEST_ENV LIBSCORD_LOG_OUTPUT=${TEST_DIRECTORY}/libscord.log) + + add_test(run_${TEST_NAME} ${example} + ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT}) + set_tests_properties(run_${TEST_NAME} + PROPERTIES FIXTURES_REQUIRED scord_daemon + ENVIRONMENT "${TEST_ENV}") + + add_test(validate_${TEST_NAME} + ${CMAKE_SOURCE_DIR}/ci/check_rpcs.py + ${TEST_DIRECTORY}/libscord.log + ${SCORD_TESTS_DIRECTORY}/scord_daemon/scord_daemon.log + ${example} + ) + set_tests_properties(validate_${TEST_NAME} + PROPERTIES DEPENDS stop_scord_daemon + ) endforeach() endif() diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 86a81f2520bb6b6181d7dc15c2b5e8a70e3850a4..7c9d6f3c6be5db1f11ae96847477264ab7857d3d 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -57,10 +57,35 @@ foreach(example IN LISTS examples_cxx) set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example}) endforeach() +set(CXX_TEST_ID 0) + if(SCORD_BUILD_TESTS) foreach(example IN LISTS examples_cxx) - add_test(${example}_cxx_test ${example} ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT}) - set_tests_properties(${example}_cxx_test - PROPERTIES FIXTURES_REQUIRED scord_daemon) + + # prepare environment for the RPC test itself and its validation test + set(TEST_NAME "${example}_cxx_test") + set(TEST_DIRECTORY "${SCORD_TESTS_DIRECTORY}/${TEST_NAME}") + file(MAKE_DIRECTORY ${TEST_DIRECTORY}) + + set(TEST_ENV) + list(APPEND TEST_ENV LIBSCORD_LOG=1) + list(APPEND TEST_ENV LIBSCORD_LOG_OUTPUT=${TEST_DIRECTORY}/libscord.log) + + add_test(run_${TEST_NAME} ${example} + ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT}) + set_tests_properties(run_${TEST_NAME} + PROPERTIES FIXTURES_REQUIRED scord_daemon + ENVIRONMENT "${TEST_ENV}" + ) + + add_test(validate_${TEST_NAME} + ${CMAKE_SOURCE_DIR}/ci/check_rpcs.py + ${TEST_DIRECTORY}/libscord.log + ${SCORD_TESTS_DIRECTORY}/scord_daemon/scord_daemon.log + ${example} + ) + set_tests_properties(validate_${TEST_NAME} + PROPERTIES DEPENDS stop_scord_daemon + ) endforeach() endif() diff --git a/src/common/logger/logger.hpp b/src/common/logger/logger.hpp index b75bed6566c19606bf5f280fe49984188644845e..7aa530cb8a06ab78c762a5c388e09b3d78b1174c 100644 --- a/src/common/logger/logger.hpp +++ b/src/common/logger/logger.hpp @@ -66,7 +66,7 @@ public: } else if(type == "file") { m_internal_logger = spdlog::basic_logger_mt( - ident, log_file.string()); + ident, log_file.string(), true); } #ifdef SPDLOG_ENABLE_SYSLOG diff --git a/src/common/net/engine.hpp b/src/common/net/engine.hpp index cb31008086cf6f2d2b00d5bcbe9be64f38a7fb2a..5822fa508dac3d9d24ad4bb86ba6f97bed9d69d3 100644 --- a/src/common/net/engine.hpp +++ b/src/common/net/engine.hpp @@ -73,6 +73,13 @@ struct margo_context { // forward declarations struct endpoint; +namespace utils { + +std::string +get_address(hg_handle_t h); + +} // namespace utils + struct engine { enum class execution_mode : bool { @@ -123,6 +130,74 @@ struct engine { endpoint lookup(const std::string& address) const; + std::string + self_address() const { + + struct addr_handle { + addr_handle(margo_instance_id mid, hg_addr_t addr) + : m_mid(mid), m_addr(addr) {} + + ~addr_handle() { + if(m_addr) { + margo_addr_free(m_mid, m_addr); + } + } + + hg_addr_t + native() const { + return m_addr; + } + + margo_instance_id m_mid; + hg_addr_t m_addr; + }; + + const auto self_addr = addr_handle{ + m_context->m_mid, [mid = m_context->m_mid]() -> hg_addr_t { + hg_addr_t tmp; + + hg_return_t ret = margo_addr_self(mid, &tmp); + + if(ret != HG_SUCCESS) { + LOGGER_WARN(fmt::format( + "Error finding out self address: {}", + HG_Error_to_string(ret))); + return nullptr; + } + + return tmp; + }()}; + + if(!self_addr.native()) { + return "unknown"; + } + + hg_size_t expected_length; + hg_return_t ret = + margo_addr_to_string(m_context->m_mid, nullptr, + &expected_length, self_addr.native()); + + if(ret != HG_SUCCESS) { + LOGGER_WARN(fmt::format("Error finding out self address: {}", + HG_Error_to_string(ret))); + return "unknown"; + } + + std::vector tmp; + tmp.reserve(expected_length); + + ret = margo_addr_to_string(m_context->m_mid, tmp.data(), + &expected_length, self_addr.native()); + + if(ret != HG_SUCCESS) { + LOGGER_WARN(fmt::format("Error finding out self address: {}", + HG_Error_to_string(ret))); + return "unknown"; + } + + return {tmp.data()}; + } + std::shared_ptr m_context; }; @@ -149,6 +224,11 @@ public: return m_handle; } + std::string + origin() const { + return utils::get_address(m_handle); + } + private: hg_handle_t m_handle; Output m_output; @@ -299,6 +379,45 @@ struct rpc_acceptor : engine { : engine(format_address(protocol, bind_address, port)) {} }; +namespace utils { + +inline std::string +get_address(hg_handle_t h) { + + const hg_info* hgi = margo_get_info(h); + + if(!hgi) { + LOGGER_WARN("Unable to get information from hg_handle"); + return "unknown"; + } + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + hg_size_t expected_length; + hg_return_t ret = + margo_addr_to_string(mid, nullptr, &expected_length, hgi->addr); + + if(ret != HG_SUCCESS) { + LOGGER_WARN("Error finding out client address: {}", + HG_Error_to_string(ret)); + return "unknown"; + } + + std::vector tmp; + tmp.reserve(expected_length); + + ret = margo_addr_to_string(mid, tmp.data(), &expected_length, hgi->addr); + + if(ret != HG_SUCCESS) { + LOGGER_WARN("Error finding out client address: {}", + HG_Error_to_string(ret)); + return "unknown"; + } + + return {tmp.data()}; +} + +} // namespace utils } // namespace scord::network diff --git a/src/common/net/proto/rpc_types.h b/src/common/net/proto/rpc_types.h index d07727d0fd07eed296b540615b2920447a789e52..3f60cc22e157198bdd7d617e8dc497a5e27395b0 100644 --- a/src/common/net/proto/rpc_types.h +++ b/src/common/net/proto/rpc_types.h @@ -251,6 +251,12 @@ MERCURY_GEN_STRUCT_PROC( // clang-format off +MERCURY_GEN_PROC( + ADM_ping_out_t, + ((hg_uint64_t) (op_id)) + ((int32_t) (retval)) +); + /// ADM_register_job MERCURY_GEN_PROC( ADM_register_job_in_t, @@ -259,6 +265,7 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_register_job_out_t, + ((hg_uint64_t) (op_id)) ((int32_t) (retval)) ((ADM_job_t) (job)) ); @@ -272,6 +279,7 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_update_job_out_t, + ((hg_uint64_t) (op_id)) ((int32_t) (retval)) ); @@ -283,6 +291,7 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_remove_job_out_t, + ((hg_uint64_t) (op_id)) ((int32_t) (retval)) ); @@ -421,6 +430,7 @@ MERCURY_GEN_PROC( MERCURY_GEN_PROC( ADM_transfer_datasets_out_t, + ((hg_uint64_t) (op_id)) ((hg_int32_t) (retval)) ((ADM_transfer_t) (tx))) diff --git a/src/common/net/server.cpp b/src/common/net/server.cpp index 1818650af3da629ec3073d616fc9eaf395fe936f..22174f21e29239a38de3db88ea4662b50009a95f 100644 --- a/src/common/net/server.cpp +++ b/src/common/net/server.cpp @@ -33,6 +33,10 @@ #include #include +#ifdef SCORD_DEBUG_BUILD +#include +#endif + #include #include #include @@ -348,7 +352,7 @@ server::run() { // validate settings check_configuration(); -#ifdef __LOGGER_ENABLE_DEBUG__ +#ifdef SCORD_DEBUG_BUILD if(::prctl(PR_SET_DUMPABLE, 1) != 0) { LOGGER_WARN("Failed to set PR_SET_DUMPABLE flag for process. " "Daemon will not produce core dumps."); diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt index bc1f58d3e8224712bbff4ac7774397702f5a21c4..68351cbf57995480404c83a9989bc72702f727ff 100644 --- a/src/lib/CMakeLists.txt +++ b/src/lib/CMakeLists.txt @@ -26,7 +26,7 @@ add_library(adm_iosched SHARED) target_sources(adm_iosched PUBLIC admire.h admire.hpp - PRIVATE admire.cpp c_wrapper.cpp detail/impl.hpp detail/impl.cpp errors.c) + PRIVATE admire.cpp c_wrapper.cpp detail/impl.hpp detail/impl.cpp errors.c env.hpp) set_target_properties(adm_iosched PROPERTIES PUBLIC_HEADER "admire.h;admire.hpp") diff --git a/src/lib/admire.cpp b/src/lib/admire.cpp index 2b3757aa68caf8499a333bb3560727b82bf55307..78e2007be51625a03a658f70be6ecdd02419a90d 100644 --- a/src/lib/admire.cpp +++ b/src/lib/admire.cpp @@ -27,18 +27,20 @@ #include #include #include +#include +#include #include "detail/impl.hpp" namespace { -void +[[maybe_unused]] void init_library() __attribute__((constructor)); void init_logger(); -void +[[maybe_unused]] void init_library() { init_logger(); } @@ -46,14 +48,30 @@ init_library() { /** Logging for the library */ void init_logger() { - // for now, just create a simple console logger - scord::logger::create_global_logger("libadm_iosched", "console color"); + + try { + + + if(const auto p = std::getenv(admire::env::LOG); + p && !std::string{p}.empty() && std::string{p} != "0") { + if(const auto log_file = std::getenv(admire::env::LOG_OUTPUT)) { + scord::logger::create_global_logger("libadm_iosched", "file", + log_file); + } else { + scord::logger::create_global_logger("libadm_iosched", + "console color"); + } + } + } catch(const std::exception& ex) { + std::cerr << fmt::format("WARNING: Error initializing logger: {}", + ex.what()); + } } void rpc_registration_cb(scord::network::rpc_client* client) { - REGISTER_RPC(client, "ADM_ping", void, void, NULL, false); + REGISTER_RPC(client, "ADM_ping", void, ADM_ping_out_t, NULL, true); REGISTER_RPC(client, "ADM_register_job", ADM_register_job_in_t, ADM_register_job_out_t, NULL, true); diff --git a/src/lib/detail/impl.cpp b/src/lib/detail/impl.cpp index d6d3b266f3c4c19ec762e6f3602a16ea51061e68..07ba498cd8ec2919cd0771aea49df43c88d399fc 100644 --- a/src/lib/detail/impl.cpp +++ b/src/lib/detail/impl.cpp @@ -29,10 +29,12 @@ #include #include "impl.hpp" +using namespace std::literals; + void rpc_registration_cb(scord::network::rpc_client* client) { - REGISTER_RPC(client, "ADM_ping", void, void, NULL, false); + REGISTER_RPC(client, "ADM_ping", void, ADM_ping_out_t, NULL, true); REGISTER_RPC(client, "ADM_register_job", ADM_register_job_in_t, ADM_register_job_out_t, NULL, true); @@ -154,6 +156,18 @@ rpc_registration_cb(scord::network::rpc_client* client) { ADM_get_statistics_out_t, NULL, true); } +namespace api { + +struct remote_procedure { + static std::uint64_t + new_id() { + static std::atomic_uint64_t current_id; + return current_id++; + } +}; + +} // namespace api + namespace admire::detail { admire::error_code @@ -161,12 +175,24 @@ ping(const server& srv) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + const auto rpc_id = ::api::remote_procedure::new_id(); + auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{}}", __FUNCTION__); - const auto rpc = endp.call("ADM_ping"); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address())); + + ADM_ping_out_t out; - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + const auto rpc = endp.call("ADM_ping", nullptr, &out); + + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), + static_cast(out.retval), out.op_id); return ADM_SUCCESS; } @@ -175,9 +201,13 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job_requirements: {}}}", __FUNCTION__, reqs); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job_requirements: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), reqs); auto rpc_reqs = api::convert(reqs); @@ -187,14 +217,19 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) { const auto rpc = endp.call("ADM_register_job", &in, &out); if(out.retval < 0) { - LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval); + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), out.retval, out.op_id); return tl::make_unexpected(static_cast(out.retval)); } const admire::job job = api::convert(out.job); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, job: {}}}", __FUNCTION__, - ADM_SUCCESS, job.id()); + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}, job: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), ADM_SUCCESS, job, out.op_id); return job; } @@ -204,10 +239,13 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}, job_requirements: {}}}", - __FUNCTION__, job, reqs); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job: {}, job_requirements: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), job, reqs); const auto rpc_job = api::convert(job); const auto rpc_reqs = api::convert(reqs); @@ -217,14 +255,19 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) { const auto rpc = endp.call("ADM_update_job", &in, &out); - if(out.retval < 0) { const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), retval, out.op_id); return retval; } - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), ADM_SUCCESS, out.op_id); return ADM_SUCCESS; } @@ -233,9 +276,13 @@ remove_job(const server& srv, const job& job) { scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), job); const auto rpc_job = api::convert(job); @@ -246,11 +293,17 @@ remove_job(const server& srv, const job& job) { if(out.retval < 0) { const auto retval = static_cast(out.retval); - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval); + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), retval, out.op_id); return retval; } - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS); + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), ADM_SUCCESS, out.op_id); return ADM_SUCCESS; } @@ -263,11 +316,15 @@ transfer_datasets(const server& srv, const job& job, scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb}; + const auto rpc_id = ::api::remote_procedure::new_id(); auto endp = rpc_client.lookup(srv.address()); - LOGGER_INFO("RPC (ADM_{}) => {{job: {}, sources: {}, targets: {}, " - "limits: {}, mapping: {}}}", - __FUNCTION__, job, sources, targets, limits, mapping); + LOGGER_INFO( + "rpc id: {} name: {} from: {} => " + "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc_client.self_address()), job, sources, targets, + limits, mapping); const auto rpc_job = api::convert(job); const auto rpc_sources = api::convert(sources); @@ -283,15 +340,19 @@ transfer_datasets(const server& srv, const job& job, endp.call("ADM_transfer_datasets", &in, &out); if(out.retval < 0) { - LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, - out.retval); + LOGGER_ERROR("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), out.retval, out.op_id); return tl::make_unexpected(static_cast(out.retval)); } const admire::transfer tx = api::convert(out.tx); - LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {}}}", __FUNCTION__, - ADM_SUCCESS, tx); + LOGGER_INFO("rpc id: {} name: {} from: {} <= " + "body: {{retval: {}, transfer: {}}} [op_id: {}]", + rpc_id, std::quoted("ADM_"s + __FUNCTION__), + std::quoted(rpc.origin()), ADM_SUCCESS, tx, out.op_id); return tx; } diff --git a/src/lib/env.hpp b/src/lib/env.hpp new file mode 100644 index 0000000000000000000000000000000000000000..5329efdc5867df6168d0bc62930a13970f56fde8 --- /dev/null +++ b/src/lib/env.hpp @@ -0,0 +1,39 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef LIBSCORD_ENV_HPP +#define LIBSCORD_ENV_HPP + +#define LIBSCORD_ENV_PREFIX "LIBSCORD_" +#define ADD_PREFIX(str) LIBSCORD_ENV_PREFIX str + +namespace admire::env { + +static constexpr auto LOG = ADD_PREFIX("LOG"); +static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); + +} // namespace admire::env + + +#endif // LIBSCORD_ENV_HPP diff --git a/src/scord-ctl/CMakeLists.txt b/src/scord-ctl/CMakeLists.txt index 66315a8e1a7ef9a8f1eafb7398bddc53b580ca41..b06f68386f4443a69a9555412ac88888cae0677f 100644 --- a/src/scord-ctl/CMakeLists.txt +++ b/src/scord-ctl/CMakeLists.txt @@ -25,7 +25,7 @@ # scord-ctl daemon add_executable(scord-ctl) -target_sources(scord-ctl PRIVATE scord-ctl.cpp rpc_handlers.hpp rpc_handlers.cpp) +target_sources(scord-ctl PRIVATE scord-ctl.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp) target_include_directories( scord-ctl diff --git a/src/scord-ctl/env.hpp b/src/scord-ctl/env.hpp new file mode 100644 index 0000000000000000000000000000000000000000..920199cf52056676aa0ae8ca4aebdb7ec497b2c6 --- /dev/null +++ b/src/scord-ctl/env.hpp @@ -0,0 +1,39 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_CTL_ENV_HPP +#define SCORD_CTL_ENV_HPP + +#define SCORD_ENV_PREFIX "SCORDCTL_" +#define ADD_PREFIX(str) SCORD_ENV_PREFIX str + +namespace scord_ctl::env { + +static constexpr auto LOG = ADD_PREFIX("LOG"); +static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); + +} // namespace scord_ctl::env + + +#endif // SCORD_CTL_ENV_HPP diff --git a/src/scord-ctl/rpc_handlers.cpp b/src/scord-ctl/rpc_handlers.cpp index f53dc8a02bc48eb5142f71283824a74f91dd610b..3aad2c7eb0b7a2fceb300a034cfd1d064f453d85 100644 --- a/src/scord-ctl/rpc_handlers.cpp +++ b/src/scord-ctl/rpc_handlers.cpp @@ -24,16 +24,43 @@ #include #include +#include #include "rpc_handlers.hpp" +struct remote_procedure { + static std::uint64_t + new_id() { + static std::atomic_uint64_t current_id; + return current_id++; + } +}; + static void ADM_ping(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); - LOGGER_INFO("PING(noargs)"); + const auto id = remote_procedure::new_id(); + + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h))); + + ADM_ping_out_t out; + out.op_id = id; + out.retval = ADM_SUCCESS; + + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + ADM_SUCCESS); + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); ret = margo_destroy(h); assert(ret == HG_SUCCESS); diff --git a/src/scord-ctl/scord-ctl.cpp b/src/scord-ctl/scord-ctl.cpp index 834f25a176edc2013c90f6d60ccc1b5ecb897966..7a930271c1d4ce37e8acfe3fea4d3bd2f87f732a 100644 --- a/src/scord-ctl/scord-ctl.cpp +++ b/src/scord-ctl/scord-ctl.cpp @@ -36,6 +36,7 @@ #include #include #include "rpc_handlers.hpp" +#include "env.hpp" namespace fs = std::filesystem; namespace bpo = boost::program_options; @@ -52,6 +53,20 @@ print_help(const std::string& progname, fmt::print("{}", opt_desc); } +std::unordered_map +load_envs() { + + std::unordered_map envs; + + if(const auto p = std::getenv(scord_ctl::env::LOG); + p && !std::string{p}.empty() && std::string{p} != "0") { + if(const auto log_file = std::getenv(scord_ctl::env::LOG_OUTPUT)) { + envs.emplace(scord_ctl::env::LOG_OUTPUT, log_file); + } + } + + return envs; +} int main(int argc, char* argv[]) { @@ -75,6 +90,7 @@ main(int argc, char* argv[]) { ->implicit_value("") ->zero_tokens() ->notifier([&](const std::string&) { + cfg.log_file(fs::path{}); cfg.use_console(true); }), "override any logging options defined in configuration files and " @@ -140,6 +156,15 @@ main(int argc, char* argv[]) { return EXIT_FAILURE; } + // override settings from the configuration file with settings + // from environment variables + const auto env_opts = load_envs(); + + if(const auto& it = env_opts.find(scord_ctl::env::LOG_OUTPUT); + it != env_opts.end()) { + cfg.log_file(it->second); + } + // calling notify() here basically invokes all define notifiers, thus // overriding any configuration loaded from the global configuration // file with its command-line counterparts if provided (for those @@ -155,7 +180,7 @@ main(int argc, char* argv[]) { const auto rpc_registration_cb = [](auto&& ctx) { LOGGER_INFO(" * Registering RPCs handlers..."); - REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false); + REGISTER_RPC(ctx, "ADM_ping", void, ADM_ping_out_t, ADM_ping, true); // TODO: add internal RPCs for communication with scord }; diff --git a/src/scord/CMakeLists.txt b/src/scord/CMakeLists.txt index 1cd70863cc863425699de80e9a2e03061065584a..6dca1f758270c0a72f659240bd024dcbf3585d76 100644 --- a/src/scord/CMakeLists.txt +++ b/src/scord/CMakeLists.txt @@ -25,7 +25,7 @@ # scord daemon add_executable(scord) -target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp) +target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp) target_include_directories( scord diff --git a/src/scord/env.hpp b/src/scord/env.hpp new file mode 100644 index 0000000000000000000000000000000000000000..6d73fbfc0736567037eee66640853e1ba27c1d4b --- /dev/null +++ b/src/scord/env.hpp @@ -0,0 +1,39 @@ +/****************************************************************************** + * Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain + * + * This software was partially supported by the EuroHPC-funded project ADMIRE + * (Project ID: 956748, https://www.admire-eurohpc.eu). + * + * This file is part of scord. + * + * scord is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * scord is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with scord. If not, see . + * + * SPDX-License-Identifier: GPL-3.0-or-later + *****************************************************************************/ + +#ifndef SCORD_ENV_HPP +#define SCORD_ENV_HPP + +#define SCORD_ENV_PREFIX "SCORD_" +#define ADD_PREFIX(str) SCORD_ENV_PREFIX str + +namespace scord::env { + +static constexpr auto LOG = ADD_PREFIX("LOG"); +static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT"); + +} // namespace scord::env + + +#endif // SCORD_ENV_HPP diff --git a/src/scord/rpc_handlers.cpp b/src/scord/rpc_handlers.cpp index 9ebf50607a4f4047630779ecc980500551acca32..12efe2b50e1ee71d90dd37ee1807e29564b969e1 100644 --- a/src/scord/rpc_handlers.cpp +++ b/src/scord/rpc_handlers.cpp @@ -23,6 +23,7 @@ *****************************************************************************/ #include +#include #include #include #include @@ -39,16 +40,30 @@ struct remote_procedure { static void ADM_ping(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; [[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) => {{}}", id, __FUNCTION__); - LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h))); + + ADM_ping_out_t out; + out.op_id = id; + out.retval = ADM_SUCCESS; + + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), ADM_SUCCESS); + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + ret = margo_destroy(h); assert(ret == HG_SUCCESS); } @@ -58,6 +73,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_ping); static void ADM_register_job(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_register_job_in_t in; @@ -71,18 +88,23 @@ ADM_register_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) => {{job_requirements: {}}}", id, __FUNCTION__, + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job_requirements: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), reqs); const auto job = admire::job{42}; admire::error_code rv = ADM_SUCCESS; + out.op_id = id; out.retval = rv; out.job = admire::api::convert(job).release(); - LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, job: {}}}", id, __FUNCTION__, - rv, job); + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}, job: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv, + job); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -100,6 +122,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_job); static void ADM_update_job(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_update_job_in_t in; @@ -114,13 +138,18 @@ ADM_update_job(hg_handle_t h) { const admire::job_requirements reqs(&in.reqs); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) => {{job: {}, job_requirements: {}}}", id, - __FUNCTION__, job, reqs); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job: {}, job_requirements: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job, + reqs); admire::error_code rv = ADM_SUCCESS; + out.op_id = id; out.retval = rv; - LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv); + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -138,6 +167,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_job); static void ADM_remove_job(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_remove_job_in_t in; @@ -151,12 +182,18 @@ ADM_remove_job(hg_handle_t h) { const admire::job job(in.job); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) => {{job: {}}}", id, __FUNCTION__, job); + LOGGER_INFO("rpc id: {} name: {} from: {} => " + "body: {{job: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), + job); admire::error_code rv = ADM_SUCCESS; + out.op_id = id; out.retval = rv; - LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv); + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); @@ -173,6 +210,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_remove_job); static void ADM_register_adhoc_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_register_adhoc_storage_in_t in; @@ -204,6 +243,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage); static void ADM_update_adhoc_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_update_adhoc_storage_in_t in; @@ -235,6 +276,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage); static void ADM_remove_adhoc_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_remove_adhoc_storage_in_t in; @@ -266,6 +309,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage); static void ADM_deploy_adhoc_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_deploy_adhoc_storage_in_t in; @@ -297,6 +342,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage); static void ADM_register_pfs_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_register_pfs_storage_in_t in; @@ -328,6 +375,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_pfs_storage); static void ADM_update_pfs_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_update_pfs_storage_in_t in; @@ -359,6 +408,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_pfs_storage); static void ADM_remove_pfs_storage(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_remove_pfs_storage_in_t in; @@ -974,6 +1025,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops) static void ADM_transfer_datasets(hg_handle_t h) { + using scord::network::utils::get_address; + [[maybe_unused]] hg_return_t ret; ADM_transfer_datasets_in_t in; @@ -994,19 +1047,24 @@ ADM_transfer_datasets(hg_handle_t h) { const auto mapping = static_cast(in.mapping); const auto id = remote_procedure::new_id(); - LOGGER_INFO("RPC ID {} ({}) => {{job: {}, sources: {}, targets: {}, " - "limits: {}, mapping: {}}}", - id, __FUNCTION__, job, sources, targets, limits, mapping); + LOGGER_INFO( + "rpc id: {} name: {} from: {} => " + "body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job, + sources, targets, limits, mapping); admire::error_code rv = ADM_SUCCESS; const auto transfer = admire::transfer{42}; + out.op_id = id; out.retval = rv; out.tx = admire::api::convert(transfer).release(); - LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, transfer: {}}}", id, - __FUNCTION__, rv, transfer); + LOGGER_INFO("rpc id: {} name: {} to: {} <= " + "body: {{retval: {}, transfer: {}}}", + id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv, + transfer); ret = margo_respond(h, &out); assert(ret == HG_SUCCESS); diff --git a/src/scord/scord.cpp b/src/scord/scord.cpp index 3710382fd921e4c3ccf750906f58ca3930784b78..64afe54db165a12d3bf2621f196be26454511906 100644 --- a/src/scord/scord.cpp +++ b/src/scord/scord.cpp @@ -36,6 +36,7 @@ #include #include #include "rpc_handlers.hpp" +#include "env.hpp" #include @@ -54,6 +55,20 @@ print_help(const std::string& progname, fmt::print("{}", opt_desc); } +std::unordered_map +load_envs() { + + std::unordered_map envs; + + if(const auto p = std::getenv(scord::env::LOG); + p && !std::string{p}.empty() && std::string{p} != "0") { + if(const auto log_file = std::getenv(scord::env::LOG_OUTPUT)) { + envs.emplace(scord::env::LOG_OUTPUT, log_file); + } + } + + return envs; +} int main(int argc, char* argv[]) { @@ -77,6 +92,7 @@ main(int argc, char* argv[]) { ->implicit_value("") ->zero_tokens() ->notifier([&](const std::string&) { + cfg.log_file(fs::path{}); cfg.use_console(true); }), "override any logging options defined in configuration files and " @@ -142,6 +158,15 @@ main(int argc, char* argv[]) { return EXIT_FAILURE; } + // override settings from the configuration file with settings + // from environment variables + const auto env_opts = load_envs(); + + if(const auto& it = env_opts.find(scord::env::LOG_OUTPUT); + it != env_opts.end()) { + cfg.log_file(it->second); + } + // calling notify() here basically invokes all define notifiers, thus // overriding any configuration loaded from the global configuration // file with its command-line counterparts if provided (for those @@ -158,7 +183,7 @@ main(int argc, char* argv[]) { const auto rpc_registration_cb = [](auto&& ctx) { LOGGER_INFO(" * Registering RPCs handlers..."); - REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false); + REGISTER_RPC(ctx, "ADM_ping", void, ADM_ping_out_t, ADM_ping, true); REGISTER_RPC(ctx, "ADM_register_job", ADM_register_job_in_t, ADM_register_job_out_t, ADM_register_job, true);