Loading tests/harness/gkfs.py +64 −36 Original line number Diff line number Diff line Loading @@ -15,6 +15,8 @@ import os, sh, sys, re, pytest, signal import random, socket, netifaces from pathlib import Path from itertools import islice from time import perf_counter from pprint import pformat from harness.logger import logger from harness.io import IOParser from harness.cmd import CommandParser Loading Loading @@ -103,6 +105,29 @@ def get_ephemeral_address(iface): return f"{iface}:{get_ephemeral_port(host=get_ip_addr(iface))}" def _process_exists(pid): """ Checks whether a given PID exists in the system Parameters ---------- pid: `int` The PID to check for Returns ------- True if a process with the provided `pid` exists in the system. False Otherwise """ try: sh.ps(['-h', '-p', pid]) except Exception: # sh.raises an Exception if the command doesn't return 0 return False return True class Daemon: def __init__(self, interface, workspace): Loading Loading @@ -132,7 +157,7 @@ class Daemon: logger.debug(f"spawning daemon") logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, args))) logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env:\n{pformat(self._patched_env)}") self._proc = self._cmd( args, Loading @@ -143,17 +168,29 @@ class Daemon: ) logger.debug(f"daemon process spawned (PID={self._proc.pid})") logger.debug("waiting for daemon to be ready") try: self.wait_until_active(self._proc.pid, 10.0) except Exception as ex: logger.error(f"daemon initialization failed: {ex}") # make sure daemon is ready to accept requests if not self.wait_until_active(self._proc.pid): raise RuntimeError("GKFS daemon doesn't seem to be active") # if the daemon initialized correctly but took longer than # `timeout`, we may be leaving running processes behind if _process_exists(self._proc.pid): self.shutdown() logger.critical(f"daemon was shut down, what is ex? {ex.__repr__}?") raise ex logger.debug("daemon is ready") return self def wait_until_active(self, pid, retries=500, max_lines=50): def wait_until_active(self, pid, timeout, max_lines=50): """ Waits until a GKFS daemon is active or until a certain number of retries has been exhausted. Checks if the daemon is running by searching its Waits until a GKFS daemon is active or until a certain timeout has expired. Checks if the daemon is running by searching its log for a pre-defined readiness message. Parameters Loading @@ -161,50 +198,37 @@ class Daemon: pid: `int` The PID of the daemon process to wait for. retries: `int` The number of retries before giving up. timeout: `number` The number of seconds to wait for max_lines: `int` The maximum number of log lines to check for a match. Returns ------- True if the message is found, False if the log file doesn't exist or the message can't be found. """ gkfs_daemon_active_log_pattern = r'Startup successful. Daemon is ready.' is_active = False # wait a bit for the daemon to initialize sh.sleep(0.1) init_time = perf_counter() r = 0 while not is_active and r < retries: r += 1 while perf_counter() - init_time < timeout: try: #logger.debug(f"checking log file") with open(self.logdir / gkfs_daemon_log_file) as log: for line in islice(log, max_lines): if re.search(gkfs_daemon_active_log_pattern, line) is not None: return True return except FileNotFoundError: # Log is missing, the daemon might have crashed... logger.debug(f"daemon log file missing, checking if daemon is alive") logger.debug(f"daemon log file missing, checking if daemon is alive...") pid=self._proc.pid try: sh.ps(['-h', '-p', pid]) except Exception: logger.error(f"daemon process {pid} is not running") raise if not _process_exists(pid): raise RuntimeError(f"process {pid} is not running") # ... or it might just be lazy. let's give it some more time sh.sleep(0.05) pass logger.debug(f"daemon log file found, retrying...") return False raise RuntimeError("initialization timeout exceeded") def shutdown(self): logger.debug(f"terminating daemon") Loading @@ -214,6 +238,9 @@ class Daemon: err = self._proc.wait() except sh.SignalException_SIGTERM: pass except Exception: raise @property def cwd(self): Loading Loading @@ -297,9 +324,10 @@ class Client: return self._preload_library def run(self, cmd, *args): logger.debug(f"running client") logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, list(args)))) logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env: {pformat(self._patched_env)}") out = self._cmd( [ cmd ] + list(args), Loading Loading @@ -463,7 +491,7 @@ class ShellClient: logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}") if intercept_shell: logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env: {self._patched_env}") # 'sh' raises an exception if the return code is not zero; # since we'd rather check for return codes explictly, we Loading @@ -476,7 +504,7 @@ class ShellClient: # _err=sys.stderr, _timeout=timeout, _timeout_signal=timeout_signal, _ok_code=list(range(0, 256)) # _ok_code=list(range(0, 256)) ) def run(self, cmd, *args, timeout=60, timeout_signal=signal.SIGKILL): Loading Loading @@ -526,7 +554,7 @@ class ShellClient: logger.debug(f"cmd: bash -c '{bash_c_args}'") logger.debug(f"timeout: {timeout} seconds") logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}") logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env:\n{pformat(self._patched_env)}") # 'sh' raises an exception if the return code is not zero; # since we'd rather check for return codes explictly, we Loading @@ -539,7 +567,7 @@ class ShellClient: # _err=sys.stderr, _timeout=timeout, _timeout_signal=timeout_signal, _ok_code=list(range(0, 256)) # _ok_code=list(range(0, 256)) ) return ShellCommand(cmd, proc) Loading Loading
tests/harness/gkfs.py +64 −36 Original line number Diff line number Diff line Loading @@ -15,6 +15,8 @@ import os, sh, sys, re, pytest, signal import random, socket, netifaces from pathlib import Path from itertools import islice from time import perf_counter from pprint import pformat from harness.logger import logger from harness.io import IOParser from harness.cmd import CommandParser Loading Loading @@ -103,6 +105,29 @@ def get_ephemeral_address(iface): return f"{iface}:{get_ephemeral_port(host=get_ip_addr(iface))}" def _process_exists(pid): """ Checks whether a given PID exists in the system Parameters ---------- pid: `int` The PID to check for Returns ------- True if a process with the provided `pid` exists in the system. False Otherwise """ try: sh.ps(['-h', '-p', pid]) except Exception: # sh.raises an Exception if the command doesn't return 0 return False return True class Daemon: def __init__(self, interface, workspace): Loading Loading @@ -132,7 +157,7 @@ class Daemon: logger.debug(f"spawning daemon") logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, args))) logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env:\n{pformat(self._patched_env)}") self._proc = self._cmd( args, Loading @@ -143,17 +168,29 @@ class Daemon: ) logger.debug(f"daemon process spawned (PID={self._proc.pid})") logger.debug("waiting for daemon to be ready") try: self.wait_until_active(self._proc.pid, 10.0) except Exception as ex: logger.error(f"daemon initialization failed: {ex}") # make sure daemon is ready to accept requests if not self.wait_until_active(self._proc.pid): raise RuntimeError("GKFS daemon doesn't seem to be active") # if the daemon initialized correctly but took longer than # `timeout`, we may be leaving running processes behind if _process_exists(self._proc.pid): self.shutdown() logger.critical(f"daemon was shut down, what is ex? {ex.__repr__}?") raise ex logger.debug("daemon is ready") return self def wait_until_active(self, pid, retries=500, max_lines=50): def wait_until_active(self, pid, timeout, max_lines=50): """ Waits until a GKFS daemon is active or until a certain number of retries has been exhausted. Checks if the daemon is running by searching its Waits until a GKFS daemon is active or until a certain timeout has expired. Checks if the daemon is running by searching its log for a pre-defined readiness message. Parameters Loading @@ -161,50 +198,37 @@ class Daemon: pid: `int` The PID of the daemon process to wait for. retries: `int` The number of retries before giving up. timeout: `number` The number of seconds to wait for max_lines: `int` The maximum number of log lines to check for a match. Returns ------- True if the message is found, False if the log file doesn't exist or the message can't be found. """ gkfs_daemon_active_log_pattern = r'Startup successful. Daemon is ready.' is_active = False # wait a bit for the daemon to initialize sh.sleep(0.1) init_time = perf_counter() r = 0 while not is_active and r < retries: r += 1 while perf_counter() - init_time < timeout: try: #logger.debug(f"checking log file") with open(self.logdir / gkfs_daemon_log_file) as log: for line in islice(log, max_lines): if re.search(gkfs_daemon_active_log_pattern, line) is not None: return True return except FileNotFoundError: # Log is missing, the daemon might have crashed... logger.debug(f"daemon log file missing, checking if daemon is alive") logger.debug(f"daemon log file missing, checking if daemon is alive...") pid=self._proc.pid try: sh.ps(['-h', '-p', pid]) except Exception: logger.error(f"daemon process {pid} is not running") raise if not _process_exists(pid): raise RuntimeError(f"process {pid} is not running") # ... or it might just be lazy. let's give it some more time sh.sleep(0.05) pass logger.debug(f"daemon log file found, retrying...") return False raise RuntimeError("initialization timeout exceeded") def shutdown(self): logger.debug(f"terminating daemon") Loading @@ -214,6 +238,9 @@ class Daemon: err = self._proc.wait() except sh.SignalException_SIGTERM: pass except Exception: raise @property def cwd(self): Loading Loading @@ -297,9 +324,10 @@ class Client: return self._preload_library def run(self, cmd, *args): logger.debug(f"running client") logger.debug(f"cmdline: {self._cmd} " + " ".join(map(str, list(args)))) logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env: {pformat(self._patched_env)}") out = self._cmd( [ cmd ] + list(args), Loading Loading @@ -463,7 +491,7 @@ class ShellClient: logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}") if intercept_shell: logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env: {self._patched_env}") # 'sh' raises an exception if the return code is not zero; # since we'd rather check for return codes explictly, we Loading @@ -476,7 +504,7 @@ class ShellClient: # _err=sys.stderr, _timeout=timeout, _timeout_signal=timeout_signal, _ok_code=list(range(0, 256)) # _ok_code=list(range(0, 256)) ) def run(self, cmd, *args, timeout=60, timeout_signal=signal.SIGKILL): Loading Loading @@ -526,7 +554,7 @@ class ShellClient: logger.debug(f"cmd: bash -c '{bash_c_args}'") logger.debug(f"timeout: {timeout} seconds") logger.debug(f"timeout_signal: {signal.Signals(timeout_signal).name}") logger.debug(f"env: {self._patched_env}") logger.debug(f"patched env:\n{pformat(self._patched_env)}") # 'sh' raises an exception if the return code is not zero; # since we'd rather check for return codes explictly, we Loading @@ -539,7 +567,7 @@ class ShellClient: # _err=sys.stderr, _timeout=timeout, _timeout_signal=timeout_signal, _ok_code=list(range(0, 256)) # _ok_code=list(range(0, 256)) ) return ShellCommand(cmd, proc) Loading