Commit 886463d6 authored by Alberto Miranda's avatar Alberto Miranda ♨️
Browse files

Merge branch...

Merge branch 'amiranda/115-adhoc-storage-deployment-needs-to-be-executed-by-scord-ctl-instead-of-scord' into 'main'

Resolve "Adhoc storage deployment needs to be executed by `scord-ctl` instead of `scord`"

Closes #115

See merge request !92
parents a027efbe c3ef654b
Loading
Loading
Loading
Loading
Loading
+237 −81
Original line number Diff line number Diff line
#!/usr/bin/env python3
from loguru import logger
import argparse
import itertools
import re
import sys
from pathlib import Path
from typing import Dict
from typing import Dict, Iterable, Any, Optional

from lark import Lark, Transformer
from loguru import logger

RPC_NAMES = {
    'ADM_ping',
@@ -40,7 +42,7 @@ class Meta:

    @property
    def line(self):
        return self._line
        return self._line.replace('{', '{{').replace('}', '}}')

    @property
    def lineno(self):
@@ -75,22 +77,21 @@ class RemoteProcedure:
    EXPR = re.compile(r"""
                ^(?P<preamble>
                    \[(?P<timestamp>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d+)]\s
                    \[(?P<progname>\w+)]\s
                    \[(?P<progname>\w+(?:-\w+)*?)]\s
                    \[(?P<pid>\d+)]\s
                    \[(?P<log_level>\w+)]\s
                    rpc\s
                    (?P<direction><=|=>)\s
                    (?:pid:\s(?P<rpc_pid>\d+)\s)?
                    id:\s(?P<rpc_id>\d+)\s
                    name:\s"(?P<rpc_name>\w+)"\s
                    (?:from|to):\s"(?P<address>.*?)"\s
                    (?P<direction><=|=>)\s
                )
                body:\s(?P<body>.*)$
            """, re.VERBOSE)

    def __init__(self, is_client: bool, meta: Dict, body: Dict,
                 opts: Dict):
    def __init__(self, meta: Dict, body: Dict, opts: Dict):

        self._is_client = is_client
        self._meta = Meta(
            meta['line'],
            meta['lineno'],
@@ -99,30 +100,26 @@ class RemoteProcedure:
            meta['pid'],
            meta['log_level'])

        self._pid = int(meta['rpc_pid']) if meta['rpc_pid'] else None
        self._id = int(meta['rpc_id'])
        self._name = meta['rpc_name']
        self._is_request = meta['direction'] == '=>'
        self._is_inbound = meta['direction'] == '<='
        self._address = meta['address']

        if opts:
            assert self.is_client and self.is_reply
            self._op_id = opts['op_id']
        else:
            self._op_id = self.id
            _, self._op_id = self.id

        self._body = body

    @property
    def is_client(self):
        return self._is_client

    @property
    def meta(self):
        return self._meta

    @property
    def id(self):
        return self._id
        return self._pid, self._id

    @property
    def op_id(self):
@@ -141,16 +138,21 @@ class RemoteProcedure:
        return self._address

    @property
    def is_request(self):
        return self._is_request
    def is_inbound(self):
        return self._is_inbound

    @property
    def is_reply(self):
        return not self._is_request
    def is_outbound(self):
        return not self._is_inbound

    def __eq__(self, other):

        assert self.name == other.name
        if self.name != other.name:
            logger.critical("Attempting to compare RPCs with different names:\n"
                            f"    self: {self}\n"
                            f"    other: {other}\n")
            sys.exit(1)

        # first, check that there are no extra keys in the body of the RPCs
        self_keys = set(self._body.keys())
@@ -165,8 +167,8 @@ class RemoteProcedure:
                logger.error(
                    "\nExtra fields were found when comparing an rpc "
                    "to its counterpart\n"
                    f"    extra fields: {extra_keys}"
                    f"    line number: {rpc.meta.lineno}"
                    f"    extra fields: {extra_keys}\n"
                    f"    line number: {rpc.meta.lineno}\n"
                    f"    line contents: {rpc.meta.line}", file=sys.stderr)
                return False

@@ -185,18 +187,18 @@ class RemoteProcedure:

    def __repr__(self):
        return f'RemoteProcedure(' \
               f'is_client={self.is_client}, ' \
               f'meta={self.meta}, ' \
               f'op_id={self.op_id}, ' \
               f'id={self.id}, ' \
               f'name={self.name}, ' \
               f'is_request={self.is_request}, ' \
               f'is_inbound={self.is_inbound}, ' \
               f'address="{self.address}", ' \
               f'body="{self._body}"' \
               f')'


class Operation:
    # noinspection PyShadowingBuiltins
    def __init__(self, id, request, reply):
        self._id = id
        self._request = request
@@ -257,14 +259,16 @@ class BodyTransformer(Transformer):
    pair = tuple
    opts = dict
    dict = dict
    true = lambda self, _: True
    false = lambda self, _: False
    true = lambda self, _: True  # noqa
    false = lambda self, _: False  # noqa

    # noinspection PyMethodMayBeStatic
    def start(self, items):
        body = dict(items[0])
        opts = dict(items[1]) if len(items) == 2 else dict()
        return body, opts

    # noinspection PyMethodMayBeStatic
    def number(self, n):
        (n,) = n
        try:
@@ -272,14 +276,17 @@ class BodyTransformer(Transformer):
        except ValueError:
            return float(n)

    # noinspection PyMethodMayBeStatic
    def escaped_string(self, s):
        (s,) = s
        return str(s[1:-1])

    # noinspection PyMethodMayBeStatic
    def string(self, s):
        (s,) = s
        return str(s)

    # noinspection PyMethodMayBeStatic
    def ident(self, ident):
        (ident,) = ident
        return str(ident)
@@ -291,85 +298,234 @@ def process_body(d):
    return BodyTransformer().transform(tree)


def find_rpcs(filename, is_client, rpc_name):
def sanitize(address):
    if address and "://" in address:
        return address.split("://")[1]
    return address


def find_rpcs(filename, rpc_name):
    with open(filename, 'r') as f:
        for ln, line in enumerate(f, start=1):

            logger.trace(f"Processing line {ln}:")
            logger.trace(f"  {repr(line)}")

            if m := RemoteProcedure.EXPR.match(line):
                tmp = m.groupdict()

                # We found a line with a valid RPC format, but its name is not
                # known to us. This can happen if the user has defined a new
                # RPC and forgot to add it to RPC_NAMES, or if the user has
                # made a typo. In any case, we should warn the user about this.
                if (n := tmp['rpc_name']) not in RPC_NAMES:
                    logger.warning(f"Found RPC with unknown name '{n}', "
                                   f"line ignored", file=sys.stderr)
                    continue

                # We found a line with a valid RPC format and its name is the
                # one we are looking for. We can now parse the body of the RPC
                # and yield it.
                if tmp['rpc_name'] == rpc_name:
                    logger.info(
                        f"Searching rpc name '{tmp['rpc_name']}' in "
                        f"line {ln} -- found")
                    tmp['lineno'] = ln
                    tmp['line'] = line
                    body, opts = process_body(tmp['body'])
                    del tmp['body']
                    yield RemoteProcedure(is_client, tmp, body, opts)
                    yield RemoteProcedure(tmp, body, opts), ln
                else:
                    logger.trace(
                        f"Searching rpc name '{tmp['rpc_name']}' in "
                        f"line {ln} -- not found")
            else:
                logger.warning(f"Failed to parse line {ln} in {filename}",
                               file=sys.stderr)


if __name__ == "__main__":
def process_file(file: Path, rpc_name: str, self_address: Optional[str],
                 targets: Optional[Iterable[str]] = None) -> Dict[
    int, Operation]:
    """Extract information about RPCs from a logfile and create the necessary
    Operation descriptors. Within one logfile, RPCs belonging to an operation
    can be identified by their id (i.e. an RPC request and an RPC response
    will share the same id).

    Across logfiles, RPCs belonging to an operation can be identified by their
    operation id, which corresponds to the rpc id in the server side and is
    sent back to the client to allow matching.

    Example:
        * client logfile:
           [...] rpc <= id: 10 name: "ADM_ping" [args...]
           [...] rpc => id: 10 name: "ADM_ping" [retval...] [op_id: 42]

        * server logfile:
           [...] rpc => id: 42 name: "ADM_ping" [args...]
           [...] rpc <= id: 42 name: "ADM_ping" [retval...]

    :param file: The path to logfile
    :param rpc_name: The name of the RPC to search for
    :param self_address: The address of the server that generated the logfile
    (or None if unknown)
    :return: A dict of Operations
    """

    ops = {}
    file_rpcs = {}
    valid_targets = targets or []

    logger.info(f"Searching for RPC \"{rpc_name}\" in {file}\n"
                f"  self address: {self_address}")

    for rpc, lineno in find_rpcs(file, rpc_name):

        prefix, direction = ("in", "from") if rpc.is_outbound else ("out", "to")

        logger.debug(f"Found {prefix}bound RPC to '{rpc.address}' with "
                     f"id '{rpc.id}' at line {lineno}")

    if len(sys.argv) != 4:
        print("ERROR: Invalid number of arguments", file=sys.stderr)
        print(
            f"Usage: {Path(sys.argv[0]).name} CLIENT_LOGFILE SERVER_LOGFILE RPC_NAME",
        if sanitize(rpc.address) == sanitize(self_address):
            logger.error(f"Found {prefix}bound RPC {direction} own address"
                         f" {rpc.meta.lineno}\n"
                         f"    raw: '{rpc.meta.line}'",
                         file=sys.stderr)
            sys.exit(1)

    client_logfile = Path(sys.argv[1])
    server_logfile = Path(sys.argv[2])
        if rpc.id not in file_rpcs:
            file_rpcs[rpc.id] = rpc
        else:
            req_rpc = file_rpcs[rpc.id]
            del file_rpcs[rpc.id]
            logger.debug(f"Creating new operation with id '{rpc.op_id}'")
            ops[rpc.op_id] = Operation(rpc.op_id, req_rpc, rpc)

    return ops


def match_ops(origin_ops, target_ops, strict=True):
    ec = True

    for op_id in origin_ops.keys():

        if op_id not in target_ops:
            logger.warning(
                f"An operation with id '{op_id}' was found in origin's "
                f"operations but is missing from target's operations")
            if strict:
                ec = False
            continue

        if origin_ops[op_id] != target_ops[op_id]:
            ec = False

    return ec

    for lf, n in zip([client_logfile, server_logfile], ['CLIENT_LOGFILE',
                                                        'SERVER_LOGFILE']):
        if not lf.is_file():
            logger.error(f"{n} '{lf}' is not a file", file=sys.stderr)
            sys.exit(1)

    rpc_name = sys.argv[3]
def configure_logging(verbosity):
    logger.remove()

    if verbosity == 0:
        log_level = "SUCCESS"
    elif verbosity == 1:
        log_level = "INFO"
    elif verbosity == 2:
        log_level = "DEBUG"
    else:
        log_level = "TRACE"

    logger.add(sys.stderr, level=log_level)


def group_by_pairs(it: Iterable[Any]):
    for e1, e2 in zip(itertools.islice(it, 0, None, 1),
                      itertools.islice(it, 1, None, 1)):
        yield e1, e2


def main():
    parser = argparse.ArgumentParser(
        description="Check that client and server logs match for a given RPC "
                    "name")

    parser.add_argument("RPC_NAME",
                        help="the name of the RPC to check")
    parser.add_argument("LIBSCORD_LOGFILE",
                        type=Path,
                        help="the path to the scord client's logfile")
    parser.add_argument("SCORD_LOGFILE",
                        type=Path,
                        help="the path to the scord server's logfile")
    parser.add_argument("SCORD_ADDRESS",
                        type=str,
                        help="the address of the scord server")
    parser.add_argument("SCORD_CTL_LOGFILE",
                        type=Path,
                        nargs='?',
                        help="the path to the scord server's logfile")
    parser.add_argument("SCORD_CTL_ADDRESS",
                        type=str,
                        nargs='?',
                        help="the address of the scord-ctl server")
    parser.add_argument(
        "-v",
        "--verbose",
        help="enable verbose output (additional flags increase verbosity)",
        action="count",
        dest='verbosity')

    parser.set_defaults(verbosity=0)
    args = parser.parse_args()
    configure_logging(args.verbosity)

    logfiles = [args.LIBSCORD_LOGFILE, args.SCORD_LOGFILE]
    argnames = ["LIBSCORD_LOGFILE", "SCORD_LOGFILE"]
    origins = [None]
    targets = [None, args.SCORD_ADDRESS]

    if args.SCORD_CTL_LOGFILE:
        logfiles.append(args.SCORD_CTL_LOGFILE)
        argnames.append("SCORD_CTL_LOGFILE")

        if not args.SCORD_CTL_ADDRESS:
            parser.error(
                "the following arguments are required: SCORD_CTL_ADDRESS")
        origins.append(args.SCORD_ADDRESS)
        targets.append(args.SCORD_CTL_ADDRESS)

    rpc_name = args.RPC_NAME

    for file, name in zip(logfiles, argnames):
        if not file.is_file():
            logger.critical(f"{name} '{file}' is not a file", file=sys.stderr)
            sys.exit(1)

    if rpc_name not in RPC_NAMES:
        logger.error(f"'{rpc_name}' is not a valid rpc name",
        logger.critical(f"'{rpc_name}' is not a valid rpc name",
                        file=sys.stderr)
        logger.error(f"  Valid names: {', '.join(sorted(RPC_NAMES))}",
        logger.critical(f"  Valid names: {', '.join(sorted(RPC_NAMES))}",
                        file=sys.stderr)
        sys.exit(1)

    logfiles = [client_logfile, server_logfile]
    client_side = [True, False]
    client_ops = {}
    server_ops = {}
    file_ops = []

    # extract information about RPCs from logfiles and create
    # the necessary Operation
    for lf, is_client, ops in zip(logfiles, client_side, [client_ops,
                                                          server_ops]):
    for file, self_address in zip(logfiles, targets):
        ops = process_file(file, rpc_name, self_address)
        file_ops.append((file, ops))

        found_rpcs = {}
    for (origin_file, origin_ops), (target_file, target_ops) in \
            group_by_pairs(file_ops):

        for rpc in find_rpcs(lf, is_client, rpc_name):
            if rpc.id not in found_rpcs:
                if rpc.is_request:
                    found_rpcs[rpc.id] = rpc
                else:
                    logger.error(f"\nFound server reply for RPC without "
                                 f"a corresponding client request at line"
                                 f" {rpc.meta.lineno}\n"
                                 f"    raw: '{rpc.meta.line}'", file=sys.stderr)
        logger.info(f"Matching operations in '{origin_file.name}' and "
                    f"'{target_file.name}'")
        if not match_ops(origin_ops, target_ops, False):
            logger.critical("Not all operations match")
            sys.exit(1)
            else:
                req_rpc = found_rpcs[rpc.id]
                req_rpc.op_id = rpc.op_id
                ops[rpc.op_id] = Operation(rpc.op_id, req_rpc, rpc)
                del found_rpcs[rpc.id]

    ec = 0
    for k in client_ops.keys():
    logger.success("All operations match")
    sys.exit(0)

        if k not in server_ops:
            logger.error(
                f"Operation ID '{k}' found in client log but missing "
                f"in server log")
            ec = 1

        if client_ops[k] != server_ops[k]:
            ec = 1

    sys.exit(ec)
if __name__ == "__main__":
    main()
+24 −0
Original line number Diff line number Diff line
@@ -33,6 +33,9 @@ if(SCORD_BUILD_TESTS)
  set(TEST_ENV)
  list(APPEND TEST_ENV SCORD_LOG_OUTPUT=${TEST_DIRECTORY}/scord_daemon.log)

  set(SCORD_ADDRESS_STRING
    ${SCORD_TRANSPORT_PROTOCOL}://${SCORD_BIND_ADDRESS}:${SCORD_BIND_PORT})

  add_test(start_scord_daemon
    ${CMAKE_SOURCE_DIR}/scripts/runner.sh start scord.pid
           ${CMAKE_BINARY_DIR}/src/scord/scord -f
@@ -48,6 +51,27 @@ if(SCORD_BUILD_TESTS)

  set_tests_properties(stop_scord_daemon PROPERTIES FIXTURES_CLEANUP
    scord_daemon)

  set(SCORD_CTL_TRANSPORT_PROTOCOL ${SCORD_TRANSPORT_PROTOCOL})
  set(SCORD_CTL_BIND_ADDRESS ${SCORD_BIND_ADDRESS})
  math(EXPR SCORD_CTL_BIND_PORT "${SCORD_BIND_PORT} + 1")
  set(SCORD_CTL_ADDRESS_STRING
    ${SCORD_CTL_TRANSPORT_PROTOCOL}://${SCORD_CTL_BIND_ADDRESS}:${SCORD_CTL_BIND_PORT})

  add_test(start_scord_ctl
    ${CMAKE_SOURCE_DIR}/scripts/runner.sh start scord-ctl.pid
    ${CMAKE_BINARY_DIR}/src/scord-ctl/scord-ctl -l ${SCORD_CTL_ADDRESS_STRING} -o ${TEST_DIRECTORY}/scord_ctl.log
    )

  set_tests_properties(start_scord_ctl
    PROPERTIES FIXTURES_SETUP scord_ctl)

  add_test(stop_scord_ctl
    ${CMAKE_SOURCE_DIR}/scripts/runner.sh stop TERM scord-ctl.pid
    )

  set_tests_properties(stop_scord_ctl PROPERTIES FIXTURES_CLEANUP scord_ctl)

endif()

add_subdirectory(c)
+11 −5
Original line number Diff line number Diff line
@@ -36,14 +36,19 @@
int
main(int argc, char* argv[]) {

    if(argc != 2) {
        fprintf(stderr, "ERROR: no location provided\n");
        fprintf(stderr, "Usage: ADM_cancel_transfer <SERVER_ADDRESS>\n");
    test_info_t test_info = {
            .name = TESTNAME,
            .requires_server = true,
            .requires_controller = true,
    };

    cli_args_t cli_args;
    if(process_args(argc, argv, test_info, &cli_args)) {
        exit(EXIT_FAILURE);
    }

    int exit_status = EXIT_SUCCESS;
    ADM_server_t server = ADM_server_create("tcp", argv[1]);
    ADM_server_t server = ADM_server_create("tcp", cli_args.server_address);

    ADM_job_t job = NULL;
    ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES);
@@ -64,7 +69,8 @@ main(int argc, char* argv[]) {
    assert(adhoc_resources);

    ADM_adhoc_context_t ctx = ADM_adhoc_context_create(
            ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false);
            cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW,
            ADM_ADHOC_ACCESS_RDWR, 100, false);
    assert(ctx);

    const char* name = "adhoc_storage_42";
+11 −5
Original line number Diff line number Diff line
@@ -36,14 +36,19 @@
int
main(int argc, char* argv[]) {

    if(argc != 2) {
        fprintf(stderr, "ERROR: no server address provided\n");
        fprintf(stderr, "Usage: ADM_connect_data_operation <SERVER_ADDRESS>\n");
    test_info_t test_info = {
            .name = TESTNAME,
            .requires_server = true,
            .requires_controller = true,
    };

    cli_args_t cli_args;
    if(process_args(argc, argv, test_info, &cli_args)) {
        exit(EXIT_FAILURE);
    }

    int exit_status = EXIT_SUCCESS;
    ADM_server_t server = ADM_server_create("tcp", argv[1]);
    ADM_server_t server = ADM_server_create("tcp", cli_args.server_address);

    ADM_job_t job;
    ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES);
@@ -60,7 +65,8 @@ main(int argc, char* argv[]) {
    assert(adhoc_resources);

    ADM_adhoc_context_t ctx = ADM_adhoc_context_create(
            ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false);
            cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW,
            ADM_ADHOC_ACCESS_RDWR, 100, false);
    assert(ctx);

    const char* name = "adhoc_storage_42";
+11 −5
Original line number Diff line number Diff line
@@ -36,15 +36,20 @@
int
main(int argc, char* argv[]) {

    if(argc != 2) {
        fprintf(stderr, "ERROR: no location provided\n");
        fprintf(stderr, "Usage: ADM_define_data_operation <SERVER_ADDRESS>\n");
    test_info_t test_info = {
            .name = TESTNAME,
            .requires_server = true,
            .requires_controller = true,
    };

    cli_args_t cli_args;
    if(process_args(argc, argv, test_info, &cli_args)) {
        exit(EXIT_FAILURE);
    }

    int exit_status = EXIT_SUCCESS;

    ADM_server_t server = ADM_server_create("tcp", argv[1]);
    ADM_server_t server = ADM_server_create("tcp", cli_args.server_address);

    ADM_job_t job = NULL;
    ADM_node_t* job_nodes = prepare_nodes(NJOB_NODES);
@@ -65,7 +70,8 @@ main(int argc, char* argv[]) {
    assert(adhoc_resources);

    ADM_adhoc_context_t ctx = ADM_adhoc_context_create(
            ADM_ADHOC_MODE_SEPARATE_NEW, ADM_ADHOC_ACCESS_RDWR, 100, false);
            cli_args.controller_address, ADM_ADHOC_MODE_SEPARATE_NEW,
            ADM_ADHOC_ACCESS_RDWR, 100, false);
    assert(ctx);

    const char* name = "adhoc_storage_42";
Loading