Skip to content
Snippets Groups Projects
Commit e749cd7a authored by Alberto Miranda's avatar Alberto Miranda :hotsprings:
Browse files

Merge branch 'amanzano/44-add-tests-to-verify-rpc-arguments-for-adm_register_job' into 'main'

Resolve "Add tests to verify RPC arguments."

This MR implements tests to validate that RPC information is correctly transferred between clients and servers. To do that, we provide a `ci/check_rpcs.py` script that requires client and server logfiles with the requests to validate, plus a RPC name for the RPC of interest. The script parses the logfiles and verifies that their contents match.

In order to implement this, we allow the logging framework to be configured using environment variables, so that it is possible to easily modify the logging output file in tests. We also augment RPC replies so that they include the operation id (`op_id`) assigned by the server, so that it is possible to match client information to server information.

We also define appropriate CMake tests that run the validation script for each RPC once all the RPC tests have finished.

Closes #44 #23

See merge request !31
parents 8dddf7fc 33280a87
No related branches found
No related tags found
1 merge request!31Resolve "Add tests to verify RPC arguments."
Pipeline #3173 passed
Showing
with 925 additions and 59 deletions
......@@ -295,6 +295,8 @@ find_package(RedisPlusPlus 1.3.3 REQUIRED)
# set compile flags
add_compile_options("-Wall" "-Wextra" "-Werror" "$<$<CONFIG:RELEASE>:-O3>")
add_compile_definitions("$<$<CONFIG:DEBUG,ASan>:SCORD_DEBUG_BUILD>")
add_compile_definitions("$<$<CONFIG:DEBUG,ASan>:__LOGGER_ENABLE_DEBUG__>")
if ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang")
add_compile_options("-stdlib=libc++")
else ()
......
#!/usr/bin/env python3
import pprint
import re
import sys
from pathlib import Path
from typing import Dict
from lark import Lark, Transformer
RPC_NAMES = {
'ADM_ping',
'ADM_register_job', 'ADM_update_job', 'ADM_remove_job',
'ADM_register_adhoc_storage', 'ADM_update_adhoc_storage',
'ADM_remove_adhoc_storage', 'ADM_deploy_adhoc_storage',
'ADM_register_pfs_storage', 'ADM_update_pfs_storage',
'ADM_remove_pfs_storage',
'ADM_transfer_datasets', 'ADM_get_transfer_priority',
'ADM_set_transfer_priority', 'ADM_cancel_transfer',
'ADM_get_pending_transfers',
'ADM_set_qos_constraints', 'ADM_get_qos_constraints',
'ADM_define_data_operation', 'ADM_connect_data_operation',
'ADM_finalize_data_operation',
'ADM_link_transfer_to_data_operation',
'ADM_in_situ_ops', 'ADM_in_transit_ops',
'ADM_get_statistics',
'ADM_set_dataset_information', 'ADM_set_io_resources'
}
class Meta:
def __init__(self, line, lineno, timestamp, progname, pid, log_level):
self._line = line
self._lineno = lineno
self._timestamp = timestamp
self._progname = progname
self._pid = pid
self._log_level = log_level
@property
def line(self):
return self._line
@property
def lineno(self):
return self._lineno
@property
def timestamp(self):
return self._timestamp
@property
def progname(self):
return self._progname
@property
def pid(self):
return self._pid
@property
def log_level(self):
return self._log_level
def __repr__(self):
return f'Meta(' \
f'timestamp="{self.timestamp}", ' \
f'progname="{self.progname}", ' \
f'pid={self.pid}), ' \
f'log_level="{self.log_level}"' \
f')'
class RemoteProcedure:
EXPR = re.compile(r"""
^(?P<preamble>
\[(?P<timestamp>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d+)]\s
\[(?P<progname>\w+)]\s
\[(?P<pid>\d+)]\s
\[(?P<log_level>\w+)]\s
rpc\s
id:\s(?P<rpc_id>\d+)\s
name:\s"(?P<rpc_name>\w+)"\s
(?:from|to):\s"(?P<address>.*?)"\s
(?P<direction><=|=>)\s
)
body:\s(?P<body>.*)$
""", re.VERBOSE)
def __init__(self, is_client: bool, meta: Dict, body: Dict,
opts: Dict):
self._is_client = is_client
self._meta = Meta(
meta['line'],
meta['lineno'],
meta['timestamp'],
meta['progname'],
meta['pid'],
meta['log_level'])
self._id = int(meta['rpc_id'])
self._name = meta['rpc_name']
self._is_request = meta['direction'] == '=>'
self._address = meta['address']
if opts:
assert self.is_client and self.is_reply
self._op_id = opts['op_id']
else:
self._op_id = self.id
self._body = body
@property
def is_client(self):
return self._is_client
@property
def meta(self):
return self._meta
@property
def id(self):
return self._id
@property
def op_id(self):
return self._op_id
@op_id.setter
def op_id(self, value):
self._op_id = value
@property
def name(self):
return self._name
@property
def address(self):
return self._address
@property
def is_request(self):
return self._is_request
@property
def is_reply(self):
return not self._is_request
def __eq__(self, other):
assert self.name == other.name
# first, check that there are no extra keys in the body of the RPCs
self_keys = set(self._body.keys())
other_keys = set(other._body.keys())
self_extra_keys = self_keys - other_keys
other_extra_keys = other_keys - self_keys
for extra_keys, rpc in zip([self_extra_keys, other_extra_keys],
[self, other]):
if len(extra_keys) != 0:
print("ERROR: Extra fields were found when comparing an rpc to "
"its counterpart\n"
f" extra fields: {extra_keys}"
f" line number: {rpc.meta.lineno}"
f" line contents: {rpc.meta.line}", file=sys.stderr)
return False
for k in self_keys:
if self._body[k] != other._body[k]:
print("ERROR: Mismatching values were found when comparing an "
"rpc to its counterpart\n"
f" value1 (line: {self.meta.lineno}): {k}: "
f"{self._body[k]}\n"
f" value2 (line: {other.meta.lineno}): {k}: "
f"{other._body[k]} ",
file=sys.stderr)
return False
return True
def __repr__(self):
return f'RemoteProcedure(' \
f'is_client={self.is_client}, ' \
f'meta={self.meta}, ' \
f'op_id={self.op_id}, ' \
f'id={self.id}, ' \
f'name={self.name}, ' \
f'is_request={self.is_request}, ' \
f'address="{self.address}", ' \
f'body="{self._body}"' \
f')'
class Operation:
def __init__(self, id, request, reply):
self._id = id
self._request = request
self._reply = reply
@property
def id(self):
return self._id
@property
def request(self):
return self._request
@property
def reply(self):
return self._reply
def __eq__(self, other):
return self.request == other.request and self.reply == other.reply
def __repr__(self):
return f'Operation(' \
f'id={self.id}, ' \
f'request={self.request}, ' \
f'reply={self.reply}' \
f')'
BODY_GRAMMAR = r"""
start: body [opts]
?body: value
?value: dict
| list
| string
| ESCAPED_STRING -> escaped_string
| SIGNED_NUMBER -> number
| "false" -> false
| "true" -> false
| opts
list: "[" [value ("," value)*] "]"
dict: "{" [pair ("," pair)*] "}"
pair: ident ":" value
string: CNAME
ident: CNAME
opts: "[" [pair ("," pair)*] "]"
%import common.CNAME -> CNAME
%import common.ESCAPED_STRING
%import common.SIGNED_NUMBER
%import common.WS
%ignore WS
"""
class BodyTransformer(Transformer):
list = list
pair = tuple
opts = dict
dict = dict
true = lambda self, _: True
false = lambda self, _: False
def start(self, items):
body = dict(items[0])
opts = dict(items[1]) if len(items) == 2 else dict()
return body, opts
def number(self, n):
(n,) = n
try:
return int(n)
except ValueError:
return float(n)
def escaped_string(self, s):
(s,) = s
return str(s[1:-1])
def string(self, s):
(s,) = s
return str(s)
def ident(self, ident):
(ident,) = ident
return str(ident)
def process_body(d):
body_parser = Lark(BODY_GRAMMAR, maybe_placeholders=False)
tree = body_parser.parse(d)
return BodyTransformer().transform(tree)
def find_rpcs(filename, is_client, rpc_name):
with open(filename, 'r') as f:
for ln, line in enumerate(f, start=1):
if m := RemoteProcedure.EXPR.match(line):
tmp = m.groupdict()
if tmp['rpc_name'] == rpc_name:
tmp['lineno'] = ln
tmp['line'] = line
body, opts = process_body(tmp['body'])
del tmp['body']
yield RemoteProcedure(is_client, tmp, body, opts)
if __name__ == "__main__":
if len(sys.argv) != 4:
print("ERROR: Invalid number of arguments", file=sys.stderr)
print(
f"Usage: {Path(sys.argv[0]).name} CLIENT_LOGFILE SERVER_LOGFILE RPC_NAME",
file=sys.stderr)
sys.exit(1)
client_logfile = Path(sys.argv[1])
server_logfile = Path(sys.argv[2])
for lf, n in zip([client_logfile, server_logfile], ['CLIENT_LOGFILE',
'SERVER_LOGFILE']):
if not lf.is_file():
print(f"ERROR: {n} '{lf}' is not a file", file=sys.stderr)
sys.exit(1)
rpc_name = sys.argv[3]
if rpc_name not in RPC_NAMES:
print(f"ERROR: '{rpc_name}' is not a valid rpc name", file=sys.stderr)
print(f" Valid names: {', '.join(sorted(RPC_NAMES))}", file=sys.stderr)
sys.exit(1)
logfiles = [client_logfile, server_logfile]
client_side = [True, False]
client_ops = {}
server_ops = {}
# extract information about RPCs from logfiles and create
# the necessary Operation
for lf, is_client, ops in zip(logfiles, client_side, [client_ops,
server_ops]):
found_rpcs = {}
for rpc in find_rpcs(lf, is_client, rpc_name):
if rpc.id not in found_rpcs:
if rpc.is_request:
found_rpcs[rpc.id] = rpc
else:
print(f"ERROR: Found RPC reply without corresponding "
f"request at line {rpc.meta.lineno}\n"
f" raw: '{rpc.meta.line}'", file=sys.stderr)
sys.exit(1)
else:
req_rpc = found_rpcs[rpc.id]
req_rpc.op_id = rpc.op_id
ops[rpc.op_id] = Operation(rpc.op_id, req_rpc, rpc)
del found_rpcs[rpc.id]
ec = 0
for k in client_ops.keys():
assert (k in server_ops)
if client_ops[k] != server_ops[k]:
ec = 1
sys.exit(ec)
......@@ -23,13 +23,25 @@
################################################################################
if(SCORD_BUILD_TESTS)
set(SCORD_TESTS_DIRECTORY "${CMAKE_BINARY_DIR}/Testing")
file(MAKE_DIRECTORY ${SCORD_TESTS_DIRECTORY})
# prepare the environment for the scord_daemon fixture
set(TEST_DIRECTORY "${SCORD_TESTS_DIRECTORY}/scord_daemon")
file(MAKE_DIRECTORY ${TEST_DIRECTORY})
set(TEST_ENV)
list(APPEND TEST_ENV SCORD_LOG=1)
list(APPEND TEST_ENV SCORD_LOG_OUTPUT=${TEST_DIRECTORY}/scord_daemon.log)
add_test(start_scord_daemon
${CMAKE_SOURCE_DIR}/scripts/runner.sh start scord.pid
${CMAKE_BINARY_DIR}/src/scord/scord -C -f
${CMAKE_BINARY_DIR}/src/scord/scord -f
)
set_tests_properties(start_scord_daemon PROPERTIES FIXTURES_SETUP
scord_daemon)
set_tests_properties(start_scord_daemon
PROPERTIES FIXTURES_SETUP scord_daemon
ENVIRONMENT "${TEST_ENV}")
add_test(stop_scord_daemon
${CMAKE_SOURCE_DIR}/scripts/runner.sh stop TERM scord.pid
......
......@@ -58,8 +58,30 @@ endforeach()
if(SCORD_BUILD_TESTS)
foreach(example IN LISTS examples_c)
add_test(${example}_c_test ${example} ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT})
set_tests_properties(${example}_c_test
PROPERTIES FIXTURES_REQUIRED scord_daemon)
# prepare environment for the RPC test itself and its validation test
set(TEST_NAME "${example}_c_test")
set(TEST_DIRECTORY "${SCORD_TESTS_DIRECTORY}/${TEST_NAME}")
file(MAKE_DIRECTORY ${TEST_DIRECTORY})
set(TEST_ENV)
list(APPEND TEST_ENV LIBSCORD_LOG=1)
list(APPEND TEST_ENV LIBSCORD_LOG_OUTPUT=${TEST_DIRECTORY}/libscord.log)
add_test(run_${TEST_NAME} ${example}
${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT})
set_tests_properties(run_${TEST_NAME}
PROPERTIES FIXTURES_REQUIRED scord_daemon
ENVIRONMENT "${TEST_ENV}")
add_test(validate_${TEST_NAME}
${CMAKE_SOURCE_DIR}/ci/check_rpcs.py
${TEST_DIRECTORY}/libscord.log
${SCORD_TESTS_DIRECTORY}/scord_daemon/scord_daemon.log
${example}
)
set_tests_properties(validate_${TEST_NAME}
PROPERTIES DEPENDS stop_scord_daemon
)
endforeach()
endif()
......@@ -57,10 +57,35 @@ foreach(example IN LISTS examples_cxx)
set_target_properties(${example}_cxx PROPERTIES OUTPUT_NAME ${example})
endforeach()
set(CXX_TEST_ID 0)
if(SCORD_BUILD_TESTS)
foreach(example IN LISTS examples_cxx)
add_test(${example}_cxx_test ${example} ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT})
set_tests_properties(${example}_cxx_test
PROPERTIES FIXTURES_REQUIRED scord_daemon)
# prepare environment for the RPC test itself and its validation test
set(TEST_NAME "${example}_cxx_test")
set(TEST_DIRECTORY "${SCORD_TESTS_DIRECTORY}/${TEST_NAME}")
file(MAKE_DIRECTORY ${TEST_DIRECTORY})
set(TEST_ENV)
list(APPEND TEST_ENV LIBSCORD_LOG=1)
list(APPEND TEST_ENV LIBSCORD_LOG_OUTPUT=${TEST_DIRECTORY}/libscord.log)
add_test(run_${TEST_NAME} ${example}
${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT})
set_tests_properties(run_${TEST_NAME}
PROPERTIES FIXTURES_REQUIRED scord_daemon
ENVIRONMENT "${TEST_ENV}"
)
add_test(validate_${TEST_NAME}
${CMAKE_SOURCE_DIR}/ci/check_rpcs.py
${TEST_DIRECTORY}/libscord.log
${SCORD_TESTS_DIRECTORY}/scord_daemon/scord_daemon.log
${example}
)
set_tests_properties(validate_${TEST_NAME}
PROPERTIES DEPENDS stop_scord_daemon
)
endforeach()
endif()
......@@ -66,7 +66,7 @@ public:
} else if(type == "file") {
m_internal_logger =
spdlog::basic_logger_mt<spdlog::async_factory>(
ident, log_file.string());
ident, log_file.string(), true);
}
#ifdef SPDLOG_ENABLE_SYSLOG
......
......@@ -73,6 +73,13 @@ struct margo_context {
// forward declarations
struct endpoint;
namespace utils {
std::string
get_address(hg_handle_t h);
} // namespace utils
struct engine {
enum class execution_mode : bool {
......@@ -123,6 +130,74 @@ struct engine {
endpoint
lookup(const std::string& address) const;
std::string
self_address() const {
struct addr_handle {
addr_handle(margo_instance_id mid, hg_addr_t addr)
: m_mid(mid), m_addr(addr) {}
~addr_handle() {
if(m_addr) {
margo_addr_free(m_mid, m_addr);
}
}
hg_addr_t
native() const {
return m_addr;
}
margo_instance_id m_mid;
hg_addr_t m_addr;
};
const auto self_addr = addr_handle{
m_context->m_mid, [mid = m_context->m_mid]() -> hg_addr_t {
hg_addr_t tmp;
hg_return_t ret = margo_addr_self(mid, &tmp);
if(ret != HG_SUCCESS) {
LOGGER_WARN(fmt::format(
"Error finding out self address: {}",
HG_Error_to_string(ret)));
return nullptr;
}
return tmp;
}()};
if(!self_addr.native()) {
return "unknown";
}
hg_size_t expected_length;
hg_return_t ret =
margo_addr_to_string(m_context->m_mid, nullptr,
&expected_length, self_addr.native());
if(ret != HG_SUCCESS) {
LOGGER_WARN(fmt::format("Error finding out self address: {}",
HG_Error_to_string(ret)));
return "unknown";
}
std::vector<char> tmp;
tmp.reserve(expected_length);
ret = margo_addr_to_string(m_context->m_mid, tmp.data(),
&expected_length, self_addr.native());
if(ret != HG_SUCCESS) {
LOGGER_WARN(fmt::format("Error finding out self address: {}",
HG_Error_to_string(ret)));
return "unknown";
}
return {tmp.data()};
}
std::shared_ptr<detail::margo_context> m_context;
};
......@@ -149,6 +224,11 @@ public:
return m_handle;
}
std::string
origin() const {
return utils::get_address(m_handle);
}
private:
hg_handle_t m_handle;
Output m_output;
......@@ -299,6 +379,45 @@ struct rpc_acceptor : engine {
: engine(format_address(protocol, bind_address, port)) {}
};
namespace utils {
inline std::string
get_address(hg_handle_t h) {
const hg_info* hgi = margo_get_info(h);
if(!hgi) {
LOGGER_WARN("Unable to get information from hg_handle");
return "unknown";
}
margo_instance_id mid = margo_hg_handle_get_instance(h);
hg_size_t expected_length;
hg_return_t ret =
margo_addr_to_string(mid, nullptr, &expected_length, hgi->addr);
if(ret != HG_SUCCESS) {
LOGGER_WARN("Error finding out client address: {}",
HG_Error_to_string(ret));
return "unknown";
}
std::vector<char> tmp;
tmp.reserve(expected_length);
ret = margo_addr_to_string(mid, tmp.data(), &expected_length, hgi->addr);
if(ret != HG_SUCCESS) {
LOGGER_WARN("Error finding out client address: {}",
HG_Error_to_string(ret));
return "unknown";
}
return {tmp.data()};
}
} // namespace utils
} // namespace scord::network
......
......@@ -251,6 +251,12 @@ MERCURY_GEN_STRUCT_PROC(
// clang-format off
MERCURY_GEN_PROC(
ADM_ping_out_t,
((hg_uint64_t) (op_id))
((int32_t) (retval))
);
/// ADM_register_job
MERCURY_GEN_PROC(
ADM_register_job_in_t,
......@@ -259,6 +265,7 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(
ADM_register_job_out_t,
((hg_uint64_t) (op_id))
((int32_t) (retval))
((ADM_job_t) (job))
);
......@@ -272,6 +279,7 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(
ADM_update_job_out_t,
((hg_uint64_t) (op_id))
((int32_t) (retval))
);
......@@ -283,6 +291,7 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(
ADM_remove_job_out_t,
((hg_uint64_t) (op_id))
((int32_t) (retval))
);
......@@ -421,6 +430,7 @@ MERCURY_GEN_PROC(
MERCURY_GEN_PROC(
ADM_transfer_datasets_out_t,
((hg_uint64_t) (op_id))
((hg_int32_t) (retval))
((ADM_transfer_t) (tx)))
......
......@@ -33,6 +33,10 @@
#include <system_error>
#include <fmt/format.h>
#ifdef SCORD_DEBUG_BUILD
#include <sys/prctl.h>
#endif
#include <config/settings.hpp>
#include <logger/logger.hpp>
#include <utils/signal_listener.hpp>
......@@ -348,7 +352,7 @@ server::run() {
// validate settings
check_configuration();
#ifdef __LOGGER_ENABLE_DEBUG__
#ifdef SCORD_DEBUG_BUILD
if(::prctl(PR_SET_DUMPABLE, 1) != 0) {
LOGGER_WARN("Failed to set PR_SET_DUMPABLE flag for process. "
"Daemon will not produce core dumps.");
......
......@@ -26,7 +26,7 @@ add_library(adm_iosched SHARED)
target_sources(adm_iosched
PUBLIC admire.h admire.hpp
PRIVATE admire.cpp c_wrapper.cpp detail/impl.hpp detail/impl.cpp errors.c)
PRIVATE admire.cpp c_wrapper.cpp detail/impl.hpp detail/impl.cpp errors.c env.hpp)
set_target_properties(adm_iosched PROPERTIES PUBLIC_HEADER "admire.h;admire.hpp")
......
......@@ -27,18 +27,20 @@
#include <net/proto/rpc_types.h>
#include <logger/logger.hpp>
#include <utils/ctype_ptr.hpp>
#include <env.hpp>
#include <iostream>
#include "detail/impl.hpp"
namespace {
void
[[maybe_unused]] void
init_library() __attribute__((constructor));
void
init_logger();
void
[[maybe_unused]] void
init_library() {
init_logger();
}
......@@ -46,14 +48,30 @@ init_library() {
/** Logging for the library */
void
init_logger() {
// for now, just create a simple console logger
scord::logger::create_global_logger("libadm_iosched", "console color");
try {
if(const auto p = std::getenv(admire::env::LOG);
p && !std::string{p}.empty() && std::string{p} != "0") {
if(const auto log_file = std::getenv(admire::env::LOG_OUTPUT)) {
scord::logger::create_global_logger("libadm_iosched", "file",
log_file);
} else {
scord::logger::create_global_logger("libadm_iosched",
"console color");
}
}
} catch(const std::exception& ex) {
std::cerr << fmt::format("WARNING: Error initializing logger: {}",
ex.what());
}
}
void
rpc_registration_cb(scord::network::rpc_client* client) {
REGISTER_RPC(client, "ADM_ping", void, void, NULL, false);
REGISTER_RPC(client, "ADM_ping", void, ADM_ping_out_t, NULL, true);
REGISTER_RPC(client, "ADM_register_job", ADM_register_job_in_t,
ADM_register_job_out_t, NULL, true);
......
......@@ -29,10 +29,12 @@
#include <admire_types.hpp>
#include "impl.hpp"
using namespace std::literals;
void
rpc_registration_cb(scord::network::rpc_client* client) {
REGISTER_RPC(client, "ADM_ping", void, void, NULL, false);
REGISTER_RPC(client, "ADM_ping", void, ADM_ping_out_t, NULL, true);
REGISTER_RPC(client, "ADM_register_job", ADM_register_job_in_t,
ADM_register_job_out_t, NULL, true);
......@@ -154,6 +156,18 @@ rpc_registration_cb(scord::network::rpc_client* client) {
ADM_get_statistics_out_t, NULL, true);
}
namespace api {
struct remote_procedure {
static std::uint64_t
new_id() {
static std::atomic_uint64_t current_id;
return current_id++;
}
};
} // namespace api
namespace admire::detail {
admire::error_code
......@@ -161,12 +175,24 @@ ping(const server& srv) {
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("RPC (ADM_{}) => {{}}", __FUNCTION__);
const auto rpc = endp.call("ADM_ping");
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()));
ADM_ping_out_t out;
LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS);
const auto rpc = endp.call("ADM_ping", nullptr, &out);
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()),
static_cast<admire::error_code>(out.retval), out.op_id);
return ADM_SUCCESS;
}
......@@ -175,9 +201,13 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("RPC (ADM_{}) => {{job_requirements: {}}}", __FUNCTION__, reqs);
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job_requirements: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), reqs);
auto rpc_reqs = api::convert(reqs);
......@@ -187,14 +217,19 @@ register_job(const admire::server& srv, const admire::job_requirements& reqs) {
const auto rpc = endp.call("ADM_register_job", &in, &out);
if(out.retval < 0) {
LOGGER_ERROR("RPC (ADM_{}) <= {}", __FUNCTION__, out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), out.retval, out.op_id);
return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
}
const admire::job job = api::convert(out.job);
LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, job: {}}}", __FUNCTION__,
ADM_SUCCESS, job.id());
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}, job: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), ADM_SUCCESS, job, out.op_id);
return job;
}
......@@ -204,10 +239,13 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("RPC (ADM_{}) => {{job: {}, job_requirements: {}}}",
__FUNCTION__, job, reqs);
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}, job_requirements: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), job, reqs);
const auto rpc_job = api::convert(job);
const auto rpc_reqs = api::convert(reqs);
......@@ -217,14 +255,19 @@ update_job(const server& srv, const job& job, const job_requirements& reqs) {
const auto rpc = endp.call("ADM_update_job", &in, &out);
if(out.retval < 0) {
const auto retval = static_cast<admire::error_code>(out.retval);
LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), retval, out.op_id);
return retval;
}
LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS);
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), ADM_SUCCESS, out.op_id);
return ADM_SUCCESS;
}
......@@ -233,9 +276,13 @@ remove_job(const server& srv, const job& job) {
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("RPC (ADM_{}) => {{job: {}}}", __FUNCTION__, job);
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), job);
const auto rpc_job = api::convert(job);
......@@ -246,11 +293,17 @@ remove_job(const server& srv, const job& job) {
if(out.retval < 0) {
const auto retval = static_cast<admire::error_code>(out.retval);
LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), retval, out.op_id);
return retval;
}
LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__, ADM_SUCCESS);
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), ADM_SUCCESS, out.op_id);
return ADM_SUCCESS;
}
......@@ -263,11 +316,15 @@ transfer_datasets(const server& srv, const job& job,
scord::network::rpc_client rpc_client{srv.protocol(), rpc_registration_cb};
const auto rpc_id = ::api::remote_procedure::new_id();
auto endp = rpc_client.lookup(srv.address());
LOGGER_INFO("RPC (ADM_{}) => {{job: {}, sources: {}, targets: {}, "
"limits: {}, mapping: {}}}",
__FUNCTION__, job, sources, targets, limits, mapping);
LOGGER_INFO(
"rpc id: {} name: {} from: {} => "
"body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc_client.self_address()), job, sources, targets,
limits, mapping);
const auto rpc_job = api::convert(job);
const auto rpc_sources = api::convert(sources);
......@@ -283,15 +340,19 @@ transfer_datasets(const server& srv, const job& job,
endp.call("ADM_transfer_datasets", &in, &out);
if(out.retval < 0) {
LOGGER_ERROR("RPC (ADM_{}) <= {{retval: {}}}", __FUNCTION__,
out.retval);
LOGGER_ERROR("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), out.retval, out.op_id);
return tl::make_unexpected(static_cast<admire::error_code>(out.retval));
}
const admire::transfer tx = api::convert(out.tx);
LOGGER_INFO("RPC (ADM_{}) <= {{retval: {}, transfer: {}}}", __FUNCTION__,
ADM_SUCCESS, tx);
LOGGER_INFO("rpc id: {} name: {} from: {} <= "
"body: {{retval: {}, transfer: {}}} [op_id: {}]",
rpc_id, std::quoted("ADM_"s + __FUNCTION__),
std::quoted(rpc.origin()), ADM_SUCCESS, tx, out.op_id);
return tx;
}
......
/******************************************************************************
* Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
*
* This software was partially supported by the EuroHPC-funded project ADMIRE
* (Project ID: 956748, https://www.admire-eurohpc.eu).
*
* This file is part of scord.
*
* scord is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* scord is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with scord. If not, see <https://www.gnu.org/licenses/>.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#ifndef LIBSCORD_ENV_HPP
#define LIBSCORD_ENV_HPP
#define LIBSCORD_ENV_PREFIX "LIBSCORD_"
#define ADD_PREFIX(str) LIBSCORD_ENV_PREFIX str
namespace admire::env {
static constexpr auto LOG = ADD_PREFIX("LOG");
static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT");
} // namespace admire::env
#endif // LIBSCORD_ENV_HPP
......@@ -25,7 +25,7 @@
# scord-ctl daemon
add_executable(scord-ctl)
target_sources(scord-ctl PRIVATE scord-ctl.cpp rpc_handlers.hpp rpc_handlers.cpp)
target_sources(scord-ctl PRIVATE scord-ctl.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp)
target_include_directories(
scord-ctl
......
/******************************************************************************
* Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
*
* This software was partially supported by the EuroHPC-funded project ADMIRE
* (Project ID: 956748, https://www.admire-eurohpc.eu).
*
* This file is part of scord.
*
* scord is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* scord is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with scord. If not, see <https://www.gnu.org/licenses/>.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#ifndef SCORD_CTL_ENV_HPP
#define SCORD_CTL_ENV_HPP
#define SCORD_ENV_PREFIX "SCORDCTL_"
#define ADD_PREFIX(str) SCORD_ENV_PREFIX str
namespace scord_ctl::env {
static constexpr auto LOG = ADD_PREFIX("LOG");
static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT");
} // namespace scord_ctl::env
#endif // SCORD_CTL_ENV_HPP
......@@ -24,16 +24,43 @@
#include <logger/logger.hpp>
#include <net/proto/rpc_types.h>
#include <net/engine.hpp>
#include "rpc_handlers.hpp"
struct remote_procedure {
static std::uint64_t
new_id() {
static std::atomic_uint64_t current_id;
return current_id++;
}
};
static void
ADM_ping(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
[[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);
LOGGER_INFO("PING(noargs)");
const auto id = remote_procedure::new_id();
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)));
ADM_ping_out_t out;
out.op_id = id;
out.retval = ADM_SUCCESS;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ADM_SUCCESS);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
......
......@@ -36,6 +36,7 @@
#include <net/proto/rpc_types.h>
#include <config/settings.hpp>
#include "rpc_handlers.hpp"
#include "env.hpp"
namespace fs = std::filesystem;
namespace bpo = boost::program_options;
......@@ -52,6 +53,20 @@ print_help(const std::string& progname,
fmt::print("{}", opt_desc);
}
std::unordered_map<std::string, std::string>
load_envs() {
std::unordered_map<std::string, std::string> envs;
if(const auto p = std::getenv(scord_ctl::env::LOG);
p && !std::string{p}.empty() && std::string{p} != "0") {
if(const auto log_file = std::getenv(scord_ctl::env::LOG_OUTPUT)) {
envs.emplace(scord_ctl::env::LOG_OUTPUT, log_file);
}
}
return envs;
}
int
main(int argc, char* argv[]) {
......@@ -75,6 +90,7 @@ main(int argc, char* argv[]) {
->implicit_value("")
->zero_tokens()
->notifier([&](const std::string&) {
cfg.log_file(fs::path{});
cfg.use_console(true);
}),
"override any logging options defined in configuration files and "
......@@ -140,6 +156,15 @@ main(int argc, char* argv[]) {
return EXIT_FAILURE;
}
// override settings from the configuration file with settings
// from environment variables
const auto env_opts = load_envs();
if(const auto& it = env_opts.find(scord_ctl::env::LOG_OUTPUT);
it != env_opts.end()) {
cfg.log_file(it->second);
}
// calling notify() here basically invokes all define notifiers, thus
// overriding any configuration loaded from the global configuration
// file with its command-line counterparts if provided (for those
......@@ -155,7 +180,7 @@ main(int argc, char* argv[]) {
const auto rpc_registration_cb = [](auto&& ctx) {
LOGGER_INFO(" * Registering RPCs handlers...");
REGISTER_RPC(ctx, "ADM_ping", void, void, ADM_ping, false);
REGISTER_RPC(ctx, "ADM_ping", void, ADM_ping_out_t, ADM_ping, true);
// TODO: add internal RPCs for communication with scord
};
......
......@@ -25,7 +25,7 @@
# scord daemon
add_executable(scord)
target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp)
target_sources(scord PRIVATE scord.cpp rpc_handlers.hpp rpc_handlers.cpp env.hpp)
target_include_directories(
scord
......
/******************************************************************************
* Copyright 2021-2022, Barcelona Supercomputing Center (BSC), Spain
*
* This software was partially supported by the EuroHPC-funded project ADMIRE
* (Project ID: 956748, https://www.admire-eurohpc.eu).
*
* This file is part of scord.
*
* scord is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* scord is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with scord. If not, see <https://www.gnu.org/licenses/>.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*****************************************************************************/
#ifndef SCORD_ENV_HPP
#define SCORD_ENV_HPP
#define SCORD_ENV_PREFIX "SCORD_"
#define ADD_PREFIX(str) SCORD_ENV_PREFIX str
namespace scord::env {
static constexpr auto LOG = ADD_PREFIX("LOG");
static constexpr auto LOG_OUTPUT = ADD_PREFIX("LOG_OUTPUT");
} // namespace scord::env
#endif // SCORD_ENV_HPP
......@@ -23,6 +23,7 @@
*****************************************************************************/
#include <logger/logger.hpp>
#include <net/engine.hpp>
#include <net/proto/rpc_types.h>
#include <admire.hpp>
#include <api/convert.hpp>
......@@ -39,16 +40,30 @@ struct remote_procedure {
static void
ADM_ping(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
[[maybe_unused]] margo_instance_id mid = margo_hg_handle_get_instance(h);
const auto id = remote_procedure::new_id();
LOGGER_INFO("RPC ID {} ({}) => {{}}", id, __FUNCTION__);
LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__,
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)));
ADM_ping_out_t out;
out.op_id = id;
out.retval = ADM_SUCCESS;
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
ADM_SUCCESS);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
ret = margo_destroy(h);
assert(ret == HG_SUCCESS);
}
......@@ -58,6 +73,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_ping);
static void
ADM_register_job(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_register_job_in_t in;
......@@ -71,18 +88,23 @@ ADM_register_job(hg_handle_t h) {
const admire::job_requirements reqs(&in.reqs);
const auto id = remote_procedure::new_id();
LOGGER_INFO("RPC ID {} ({}) => {{job_requirements: {}}}", id, __FUNCTION__,
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job_requirements: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
reqs);
const auto job = admire::job{42};
admire::error_code rv = ADM_SUCCESS;
out.op_id = id;
out.retval = rv;
out.job = admire::api::convert(job).release();
LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, job: {}}}", id, __FUNCTION__,
rv, job);
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}, job: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
job);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......@@ -100,6 +122,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_job);
static void
ADM_update_job(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_update_job_in_t in;
......@@ -114,13 +138,18 @@ ADM_update_job(hg_handle_t h) {
const admire::job_requirements reqs(&in.reqs);
const auto id = remote_procedure::new_id();
LOGGER_INFO("RPC ID {} ({}) => {{job: {}, job_requirements: {}}}", id,
__FUNCTION__, job, reqs);
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}, job_requirements: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job,
reqs);
admire::error_code rv = ADM_SUCCESS;
out.op_id = id;
out.retval = rv;
LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv);
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......@@ -138,6 +167,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_job);
static void
ADM_remove_job(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_remove_job_in_t in;
......@@ -151,12 +182,18 @@ ADM_remove_job(hg_handle_t h) {
const admire::job job(in.job);
const auto id = remote_procedure::new_id();
LOGGER_INFO("RPC ID {} ({}) => {{job: {}}}", id, __FUNCTION__, job);
LOGGER_INFO("rpc id: {} name: {} from: {} => "
"body: {{job: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)),
job);
admire::error_code rv = ADM_SUCCESS;
out.op_id = id;
out.retval = rv;
LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}}}", id, __FUNCTION__, rv);
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......@@ -173,6 +210,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_remove_job);
static void
ADM_register_adhoc_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_register_adhoc_storage_in_t in;
......@@ -204,6 +243,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_adhoc_storage);
static void
ADM_update_adhoc_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_update_adhoc_storage_in_t in;
......@@ -235,6 +276,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_adhoc_storage);
static void
ADM_remove_adhoc_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_remove_adhoc_storage_in_t in;
......@@ -266,6 +309,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_remove_adhoc_storage);
static void
ADM_deploy_adhoc_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_deploy_adhoc_storage_in_t in;
......@@ -297,6 +342,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_deploy_adhoc_storage);
static void
ADM_register_pfs_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_register_pfs_storage_in_t in;
......@@ -328,6 +375,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_register_pfs_storage);
static void
ADM_update_pfs_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_update_pfs_storage_in_t in;
......@@ -359,6 +408,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_update_pfs_storage);
static void
ADM_remove_pfs_storage(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_remove_pfs_storage_in_t in;
......@@ -974,6 +1025,8 @@ DEFINE_MARGO_RPC_HANDLER(ADM_in_transit_ops)
static void
ADM_transfer_datasets(hg_handle_t h) {
using scord::network::utils::get_address;
[[maybe_unused]] hg_return_t ret;
ADM_transfer_datasets_in_t in;
......@@ -994,19 +1047,24 @@ ADM_transfer_datasets(hg_handle_t h) {
const auto mapping = static_cast<admire::transfer::mapping>(in.mapping);
const auto id = remote_procedure::new_id();
LOGGER_INFO("RPC ID {} ({}) => {{job: {}, sources: {}, targets: {}, "
"limits: {}, mapping: {}}}",
id, __FUNCTION__, job, sources, targets, limits, mapping);
LOGGER_INFO(
"rpc id: {} name: {} from: {} => "
"body: {{job: {}, sources: {}, targets: {}, limits: {}, mapping: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), job,
sources, targets, limits, mapping);
admire::error_code rv = ADM_SUCCESS;
const auto transfer = admire::transfer{42};
out.op_id = id;
out.retval = rv;
out.tx = admire::api::convert(transfer).release();
LOGGER_INFO("RPC ID {} ({}) <= {{retval: {}, transfer: {}}}", id,
__FUNCTION__, rv, transfer);
LOGGER_INFO("rpc id: {} name: {} to: {} <= "
"body: {{retval: {}, transfer: {}}}",
id, std::quoted(__FUNCTION__), std::quoted(get_address(h)), rv,
transfer);
ret = margo_respond(h, &out);
assert(ret == HG_SUCCESS);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment