Commit bde4b1fd authored by Jean Bez's avatar Jean Bez
Browse files

Include forwarding test support in the new test harness

parent 00f2275b
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -232,6 +232,15 @@ if(GKFS_BUILD_TESTS)
    message(STATUS "[gekkofs] Preparing tests...")
    set(GKFS_TESTS_INTERFACE "lo" CACHE STRING "Network interface to use when running tests (default: lo)")
    message(STATUS "[gekkofs] Network interface for tests: ${GKFS_TESTS_INTERFACE}")

    message(STATUS "[gekkofs] Check for forwarding tests...")
    if(ENABLE_FORWARDING)
        set(GKFS_TESTS_FORWARDING "ON" CACHE STRING "Enable I/O forwarding tests (default: OFF)")
    else()
        set(GKFS_TESTS_FORWARDING "OFF" CACHE STRING "Enable I/O forwarding tests (default: OFF)")
    endif()
    message(STATUS "[gekkofs] Forwarding tests: ${GKFS_TESTS_FORWARDING}")

    add_subdirectory(tests)
else()
    unset(GKFS_TESTS_INTERFACE CACHE)
+5 −1
Original line number Diff line number Diff line
@@ -137,11 +137,15 @@ void *forwarding_mapper(void *p) {
            LOG(INFO, "{}() Forward to {}", __func__, CTX->fwd_host_id());
        } catch (std::exception& e) {
            exit_error_msg(EXIT_FAILURE, fmt::format("Unable set the forwarding host '{}'", e.what()));

            return;
        }

        // Sleeps for 10 seconds
        sleep(10);
    }

    return;
}
#endif

+18 −4
Original line number Diff line number Diff line
@@ -19,7 +19,7 @@ from pathlib import Path
from harness.logger import logger, initialize_logging, finalize_logging
from harness.cli import add_cli_options, set_default_log_formatter
from harness.workspace import Workspace, FileCreator
from harness.gkfs import Daemon, Client, ShellClient
from harness.gkfs import Daemon, Client, ShellClient, FwdDaemon, FwdClient, ShellFwdClient
from harness.reporter import report_test_status, report_test_headline, report_assertion_pass

def pytest_configure(config):
@@ -87,27 +87,41 @@ def gkfs_daemon(test_workspace, request):

    interface = request.config.getoption('--interface')

    if request.config.getoption('--forwarding') == 'ON':
        daemon = FwdDaemon(interface, test_workspace)
    else:
        daemon = Daemon(interface, test_workspace)
    
    yield daemon.run()
    daemon.shutdown()

@pytest.fixture
def gkfs_client(test_workspace):
def gkfs_client(test_workspace, request):
    """
    Sets up a gekkofs client environment so that
    operations (system calls, library calls, ...) can
    be requested from a co-running daemon.
    """

    interface = request.config.getoption('--interface')

    if request.config.getoption('--forwarding') == 'ON':
        return FwdClient(test_workspace)

    return Client(test_workspace)

@pytest.fixture
def gkfs_shell(test_workspace):
def gkfs_shell(test_workspace, request):
    """
    Sets up a gekkofs environment so that shell commands
    (stat, ls, mkdir, etc.) can be issued to a co-running daemon.
    """

    interface = request.config.getoption('--interface')

    if request.config.getoption('--forwarding') == 'ON':
        return ShellFwdClient(test_workspace)

    return ShellClient(test_workspace)

@pytest.fixture
+8 −0
Original line number Diff line number Diff line
@@ -33,6 +33,14 @@ def add_cli_options(parser):
            help="network interface used for communications (default: 'lo')."
        )

        parser.addoption(
            '--forwarding',
            action='store',
            type=str,
            default='ON',
            help="enable the forwarding mode (default: 'OFF')."
        )

        parser.addoption(
            "--bin-dir",
            action='append',
+435 −0
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ gkfs_daemon_cmd = 'gkfs_daemon'
gkfs_client_cmd = 'gkfs.io'
gkfs_client_lib_file = 'libgkfs_intercept.so'
gkfs_hosts_file = 'gkfs_hosts.txt'
gkfs_forwarding_map_file = 'gkfs_forwarding.map'
gkfs_daemon_log_file = 'gkfs_daemon.log'
gkfs_daemon_log_level = '100'
gkfs_client_log_file = 'gkfs_client.log'
@@ -582,3 +583,437 @@ class ShellClient:
    @property
    def cwd(self):
        return self._workspace.twd

class FwdDaemon:
    def __init__(self, interface, workspace):

        self._address = get_ephemeral_address(interface)
        self._workspace = workspace

        self._cmd = sh.Command(gkfs_daemon_cmd, self._workspace.bindirs)
        self._env = os.environ.copy()

        libdirs = ':'.join(
                filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] +
                             [str(p) for p in self._workspace.libdirs]))

        self._patched_env = {
            'LD_LIBRARY_PATH'      : libdirs,
            'GKFS_HOSTS_FILE'      : self.cwd / gkfs_hosts_file,
            'GKFS_DAEMON_LOG_PATH' : self.logdir / gkfs_daemon_log_file,
            'GKFS_LOG_LEVEL'       : gkfs_daemon_log_level
        }
        self._env.update(self._patched_env)

    def run(self):

        args = [ '--mountdir', self.mountdir,
                 '--rootdir', self.rootdir,
                 '-l', self._address ]

        logger.debug(f"spawning daemon")
        logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, args)))
        logger.debug(f"patched env:\n{pformat(self._patched_env)}")

        self._proc = self._cmd(
                args,
                _env=self._env,
#                _out=sys.stdout,
#                _err=sys.stderr,
                _bg=True,
            )

        logger.debug(f"daemon process spawned (PID={self._proc.pid})")
        logger.debug("waiting for daemon to be ready")

        try:
            self.wait_until_active(self._proc.pid, 10.0)
        except Exception as ex:
            logger.error(f"daemon initialization failed: {ex}")

            # if the daemon initialized correctly but took longer than
            # `timeout`, we may be leaving running processes behind
            if _process_exists(self._proc.pid):
                self.shutdown()

            logger.critical(f"daemon was shut down, what is ex? {ex.__repr__}?")
            raise ex

        logger.debug("daemon is ready")

        return self

    def wait_until_active(self, pid, timeout, max_lines=50):
        """
        Waits until a GKFS daemon is active or until a certain timeout
        has expired. Checks if the daemon is running by searching its
        log for a pre-defined readiness message.

        Parameters
        ----------
        pid: `int`
            The PID of the daemon process to wait for.

        timeout: `number`
            The number of seconds to wait for

        max_lines: `int`
            The maximum number of log lines to check for a match.
        """

        gkfs_daemon_active_log_pattern = r'Startup successful. Daemon is ready.'

        init_time = perf_counter()

        while perf_counter() - init_time < timeout:
            try:
                logger.debug(f"checking log file")
                with open(self.logdir / gkfs_daemon_log_file) as log:
                    for line in islice(log, max_lines):
                        if re.search(gkfs_daemon_active_log_pattern, line) is not None:
                            return
            except FileNotFoundError:
                # Log is missing, the daemon might have crashed...
                logger.debug(f"daemon log file missing, checking if daemon is alive...")

                pid=self._proc.pid

                if not _process_exists(pid):
                    raise RuntimeError(f"process {pid} is not running")

                # ... or it might just be lazy. let's give it some more time
                logger.debug(f"daemon {pid} found, retrying...")

        raise RuntimeError("initialization timeout exceeded")

    def shutdown(self):
        logger.debug(f"terminating daemon")

        try:
            self._proc.terminate()
            err = self._proc.wait()
        except sh.SignalException_SIGTERM:
            pass
        except Exception:
            raise


    @property
    def cwd(self):
        return self._workspace.twd

    @property
    def rootdir(self):
        return self._workspace.rootdir

    @property
    def mountdir(self):
        return self._workspace.mountdir

    @property
    def logdir(self):
        return self._workspace.logdir

    @property
    def interface(self):
        return self._interface

class FwdClient:
    """
    A class to represent a GekkoFS client process with a patched LD_PRELOAD.
    This class allows tests to interact with the file system using I/O-related
    function calls, be them system calls (e.g. read()) or glibc I/O functions
    (e.g. opendir()).
    """
    def __init__(self, workspace):
        self._parser = IOParser()
        self._workspace = workspace
        self._cmd = sh.Command(gkfs_client_cmd, self._workspace.bindirs)
        self._env = os.environ.copy()

        # create the forwarding map file
        fwd_map_file = open(self.cwd / gkfs_forwarding_map_file, 'w')
        fwd_map_file.write('{} {}\n'.format(socket.gethostname(), 0))
        fwd_map_file.close()

        libdirs = ':'.join(
                filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] +
                             [str(p) for p in self._workspace.libdirs]))

        # ensure the client interception library is available:
        # to avoid running code with potentially installed libraries,
        # it must be found in one (and only one) of the workspace's bindirs
        preloads = []
        for d in self._workspace.bindirs:
            search_path = Path(d) / gkfs_client_lib_file
            if search_path.exists():
                preloads.append(search_path)

        if len(preloads) == 0:
            logger.error(f'No client libraries found in the test\'s binary directories:')
            pytest.exit("Aborted due to initialization error. Check test logs.")

        if len(preloads) != 1:
            logger.error(f'Multiple client libraries found in the test\'s binary directories:')
            for p in preloads:
                logger.error(f'  {p}')
            logger.error(f'Make sure that only one copy of the client library is available.')
            pytest.exit("Aborted due to initialization error. Check test logs.")

        self._preload_library = preloads[0]

        self._patched_env = {
            'LD_LIBRARY_PATH'               : libdirs,
            'LD_PRELOAD'                    : self._preload_library,
            'LIBGKFS_HOSTS_FILE'            : self.cwd / gkfs_hosts_file,
            'LIBGKFS_FORWARDING_MAP_FILE'   : self.cwd / gkfs_forwarding_map_file,
            'LIBGKFS_LOG'                   : gkfs_client_log_level,
            'LIBGKFS_LOG_OUTPUT'            : self._workspace.logdir / gkfs_client_log_file
        }

        self._env.update(self._patched_env)

    @property
    def preload_library(self):
        """
        Return the preload library detected for this client
        """

        return self._preload_library

    def run(self, cmd, *args):

        logger.debug(f"running client")
        logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, list(args))))
        logger.debug(f"patched env: {pformat(self._patched_env)}")

        out = self._cmd(
            [ cmd ] + list(args),
            _env = self._env,
    #        _out=sys.stdout,
    #        _err=sys.stderr,
            )

        logger.debug(f"command output: {out.stdout}")
        return self._parser.parse(cmd, out.stdout)

    def __getattr__(self, name):
        return _proxy_exec(self, name)

    @property
    def cwd(self):
        return self._workspace.twd

class ShellFwdClient:
    """
    A class to represent a GekkoFS shell client process.
    This class allows tests to execute shell commands or scripts via bash -c
    on a GekkoFS instance.
    """

    def __init__(self, workspace):
        self._workspace = workspace
        self._cmd = sh.Command("bash")
        self._env = os.environ.copy()

        # create the forwarding map file
        fwd_map_file = open(self.cwd / gkfs_forwarding_map_file, 'w')
        fwd_map_file.write('{} {}\n'.format(socket.gethostname(), 0))
        fwd_map_file.close()

        libdirs = ':'.join(
                filter(None, [os.environ.get('LD_LIBRARY_PATH', '')] +
                             [str(p) for p in self._workspace.libdirs]))

        # ensure the client interception library is available:
        # to avoid running code with potentially installed libraries,
        # it must be found in one (and only one) of the workspace's bindirs
        preloads = []
        for d in self._workspace.bindirs:
            search_path = Path(d) / gkfs_client_lib_file
            if search_path.exists():
                preloads.append(search_path)

        if len(preloads) != 1:
            logger.error(f'Multiple client libraries found in the test\'s binary directories:')
            for p in preloads:
                logger.error(f'  {p}')
            logger.error(f'Make sure that only one copy of the client library is available.')
            pytest.exit("Aborted due to initialization error")

        self._preload_library = preloads[0]

        self._patched_env = {
            'LD_LIBRARY_PATH'               : libdirs,
            'LD_PRELOAD'                    : self._preload_library,
            'LIBGKFS_HOSTS_FILE'            : self.cwd / gkfs_hosts_file,
            'LIBGKFS_FORWARDING_MAP_FILE'   : self.cwd / gkfs_forwarding_map_file,
            'LIBGKFS_LOG'                   : gkfs_client_log_level,
            'LIBGKFS_LOG_OUTPUT'            : self._workspace.logdir / gkfs_client_log_file
        }

        self._env.update(self._patched_env)

    @property
    def patched_environ(self):
        """
        Return the patched environment required to run a test as a string that
        can be prepended to a shell command.
        """

        return ' '.join(f'{k}="{v}"' for k,v in self._patched_env.items())

    def script(self, code, intercept_shell=True, timeout=60, timeout_signal=signal.SIGKILL):
        """
        Execute a shell script passed as an argument in bash.

        For instance, the following snippet:

            mountdir = pathlib.Path('/tmp')
            file01 = 'file01'

            ShellClient().script(
                f'''
                    expected_pathname={mountdir / file01}
                    if [[ -e ${{expected_pathname}} ]];
                    then
                        exit 0
                    fi
                    exit 1
                ''')

        transforms into:

            bash -c '
                expected_pathname=/tmp/file01
                if [[ -e ${expected_pathname} ]];
                then
                    exit 0
                fi
                exit 1
            '

        Note that since we are using Python's f-strings, for variable
        expansions to work correctly, they need to be defined with double
        braces, e.g.  ${{expected_pathname}}.

        Parameters
        ----------
        code: `str`
            The script code to be passed to 'bash -c'.

        intercept_shell: `bool`
            Controls whether the shell executing the script should be
            executed with LD_PRELOAD=libgkfs_intercept.so (default: True).

        timeout: `int`
            How much time, in seconds, we should give the process to complete.
            If the process does not finish within the timeout, it will be sent
            the signal defined by `timeout_signal`.

            Default value: 60

        timeout_signal: `int`
            The signal to be sent to the process if `timeout` is not None.

            Default value: signal.SIGKILL

        Returns
        -------
        A sh.RunningCommand instance that allows interacting with
        the finished process.
        """

        logger.debug(f"running bash")
        logger.debug(f"cmd: bash -c '{code}'")
        logger.debug(f"timeout: {timeout} seconds")
        logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}")

        if intercept_shell:
            logger.debug(f"patched env: {self._patched_env}")

        # 'sh' raises an exception if the return code is not zero;
        # since we'd rather check for return codes explictly, we
        # whitelist all exit codes from 1 to 255 as 'ok' using the
        # _ok_code argument
        return self._cmd('-c',
            code,
            _env = (self._env if intercept_shell else os.environ),
    #        _out=sys.stdout,
    #        _err=sys.stderr,
            _timeout=timeout,
            _timeout_signal=timeout_signal,
    #        _ok_code=list(range(0, 256))
            )

    def run(self, cmd, *args, timeout=60, timeout_signal=signal.SIGKILL):
        """
        Execute a shell command  with arguments.

        For example, the following snippet:

            mountdir = pathlib.Path('/tmp')
            file01 = 'file01'

            ShellClient().stat('--terse', mountdir / file01)

        transforms into:

            bash -c 'stat --terse /tmp/file01'

        Parameters:
        -----------
        cmd: `str`
            The command to execute.

        args: `list`
            The list of arguments for the command.

        timeout: `number`
            How much time, in seconds, we should give the process to complete.
            If the process does not finish within the timeout, it will be sent
            the signal defined by `timeout_signal`.

            Default value: 60

        timeout_signal: `int`
            The signal to be sent to the process if `timeout` is not None.

            Default value: signal.SIGKILL

        Returns
        -------
        A ShellCommand instance that allows interacting with the finished
        process. Note that ShellCommand wraps sh.RunningCommand and adds s
        extra properties to it.
        """

        bash_c_args = f"{cmd} {' '.join(str(a) for a in args)}"
        logger.debug(f"running bash")
        logger.debug(f"cmd: bash -c '{bash_c_args}'")
        logger.debug(f"timeout: {timeout} seconds")
        logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}")
        logger.debug(f"patched env:\n{pformat(self._patched_env)}")

        # 'sh' raises an exception if the return code is not zero;
        # since we'd rather check for return codes explictly, we
        # whitelist all exit codes from 1 to 255 as 'ok' using the
        # _ok_code argument
        proc = self._cmd('-c',
            bash_c_args,
            _env = self._env,
    #        _out=sys.stdout,
    #        _err=sys.stderr,
            _timeout=timeout,
            _timeout_signal=timeout_signal,
    #        _ok_code=list(range(0, 256))
            )

        return ShellCommand(cmd, proc)

    def __getattr__(self, name):
        return _proxy_exec(self, name)

    @property
    def cwd(self):
        return self._workspace.twd
Loading