gkfs.py 8.6 KiB
Newer Older
Alberto Miranda's avatar
Alberto Miranda committed
################################################################################
#  Copyright 2018-2020, Barcelona Supercomputing Center (BSC), Spain           #
#  Copyright 2015-2020, Johannes Gutenberg Universitaet Mainz, Germany         #
#                                                                              #
#  This software was partially supported by the                                #
#  EC H2020 funded project NEXTGenIO (Project ID: 671951, www.nextgenio.eu).   #
#                                                                              #
#  This software was partially supported by the                                #
#  ADA-FS project under the SPPEXA project funded by the DFG.                  #
#                                                                              #
#  SPDX-License-Identifier: MIT                                                #
################################################################################

import os, sh, sys, re
import random, socket, netifaces
from pathlib import Path
from itertools import islice
from loguru import logger
from harness.io import IOParser

gkfs_daemon_cmd = 'gkfs_daemon'
gkfs_client_cmd = 'gkfs.io'
gkfs_client_lib_file = 'libgkfs_intercept.so'
gkfs_hosts_file = 'gkfs_hosts.txt'
gkfs_daemon_log_file = 'gkfs_daemon.log'
gkfs_daemon_log_level = '100'
gkfs_client_log_file = 'gkfs_client.log'
gkfs_client_log_level = 'all'

def get_ip_addr(iface):
    return netifaces.ifaddresses(iface)[netifaces.AF_INET][0]['addr']

def get_ephemeral_host():
    """
    Returns a random IP in the 127.0.0.0/24. This decreases the likelihood of
    races for ports by 255^3.
    """

    res = '127.{}.{}.{}'.format(random.randrange(1, 255),
                                random.randrange(1, 255),
                                random.randrange(2, 255),)

    return res

def get_ephemeral_port(port=0, host=None):
    """
    Get an ephemeral socket at random from the kernel.

    Parameters
    ----------
    port: `str`
        If specified, use this port as a base and the next free port after that
        base will be returned.
    host: `str`
        If specified, use this host. Otherwise it will use a temporary IP in
        the 127.0.0.0/24 range
    Returns
    -------
    Available port to use
    """

    if host is None:
        host = get_ephemeral_host()

    # Dynamic port-range:
    # * cat /proc/sys/net/ipv4/ip_local_port_range
    # 32768   61000
    if port == 0:
        port = random.randrange(1024, 32768)

    while True:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind((host, port))
            port = s.getsockname()[1]
            s.close()
            return port
        except socket.error:
            port = random.randrange(1024, 32768)

def get_ephemeral_address(iface):
    """
    Get an ephemeral network address (IPv4:port) from an interface
    and a random port.

    Parameters
    ----------
    iface: `str`
        The interface that will be used to find out the IPv4 address
        for the ephemeral address.
    Returns
    -------
    A network address formed by iface's IPv4 address and an available 
    randomly selected port.
    """

    return f"{iface}:{get_ephemeral_port(host=get_ip_addr(iface))}"

class Daemon:
    def __init__(self, interface, workspace):

        self._address = get_ephemeral_address(interface)
        self._workspace = workspace

        self._cmd = sh.Command(gkfs_daemon_cmd, self._workspace.bindirs)
        self._env = os.environ.copy()
        self._patched_env = {
            'LD_LIBRARY_PATH'      : ':'.join([os.environ.get('LD_LIBRARY_PATH', '')] + [str(p) for p in self._workspace.libdirs]),
            'GKFS_HOSTS_FILE'      : self.cwd / gkfs_hosts_file,
            'GKFS_DAEMON_LOG_PATH' : self.logdir / gkfs_daemon_log_file,
            'GKFS_LOG_LEVEL'       : gkfs_daemon_log_level,
        }
        self._env.update(self._patched_env)

    def run(self):

        args = [ '--mountdir', self.mountdir,
                 '--rootdir', self.rootdir,
                 '-l', self._address ]

        logger.info(f"spawning daemon")
        logger.info(f"cmdline: {self._cmd} " + " ".join(map(str, args)))
        logger.info(f"env: {self._patched_env}")

        self._proc = self._cmd(
                args,
                _env=self._env,
#                _out=sys.stdout,
#                _err=sys.stderr,
                _bg=True,
            )

        logger.info(f"daemon process spawned (PID={self._proc.pid})")

        # make sure daemon is ready to accept requests
        if not self.wait_until_active(self._proc.pid):
            raise RuntimeError("GKFS daemon doesn't seem to be active")

        return self

    def wait_until_active(self, pid, retries=500, max_lines=50):
        """
        Waits until a GKFS daemon is active or until a certain number of retries
        has been exhausted. Checks if the daemon is running by searching its 
        log for a pre-defined readiness message.
        
        Parameters
        ----------
        pid: `int`
            The PID of the daemon process to wait for.
        retries: `int`
            The number of retries before giving up.
        max_lines: `int`
            The maximum number of log lines to check for a match.

        Returns
        -------
        True if the message is found, False if the log file doesn't exist
        or the message can't be found.
        """

        pattern = r'Startup successful. Daemon is ready.'

        is_active = False

        # wait a bit for the daemon to initialize
        sh.sleep(0.1)

        r = 0
        while not is_active and r < retries:
            r += 1
            try:
                #logger.debug(f"checking log file")
                with open(self.logdir / gkfs_daemon_log_file) as log:
                    for line in islice(log, max_lines):
                        if re.search(pattern, line) is not None:
                            return True
            except FileNotFoundError:
                # Log is missing, the daemon might have crashed...
                logger.info(f"daemon log file missing, checking if daemon is alive")

                pid=self._proc.pid
                try:
                    sh.ps(['-h', '-p', pid])
                except Exception:
                    logger.error(f"daemon process {pid} is not running")
                    raise
                
                # ... or it might just be lazy. let's give it some more time
                sh.sleep(0.05)
                pass

        return False

    def shutdown(self):
        logger.info(f"terminating daemon")

        try:
            self._proc.terminate()
            err = self._proc.wait()
        except sh.SignalException_SIGTERM:
            pass

    @property
    def cwd(self):
        return self._workspace.twd

    @property
    def rootdir(self):
        return self._workspace.rootdir

    @property
    def mountdir(self):
        return self._workspace.mountdir

    @property
    def logdir(self):
        return self._workspace.logdir

    @property
    def interface(self):
        return self._interface

class _proxy_exec():
    def __init__(self, client, name):
        self._client = client
        self._name = name

    def __call__(self, *args):
        return self._client.run(self._name, *args)

class Client:
    def __init__(self, workspace):
        self._parser = IOParser()
        self._workspace = workspace
        self._cmd = sh.Command(gkfs_client_cmd, self._workspace.bindirs)
        self._env = os.environ.copy()

        self._patched_env = {
            'LD_LIBRARY_PATH'      : ':'.join([os.environ.get('LD_LIBRARY_PATH', '')] + [str(p) for p in self._workspace.libdirs]),
            'LD_PRELOAD'           : Path(gkfs_client_lib_file),
            'LIBGKFS_HOSTS_FILE'   : self.cwd / gkfs_hosts_file,
            'LIBGKFS_LOG'          : gkfs_client_log_level,
            'LIBGKFS_LOG_OUTPUT'   : self._workspace.logdir / gkfs_client_log_file
        }

        self._env.update(self._patched_env)

    def run(self, cmd, *args):
        logger.info(f"running client")
        logger.info(f"cmdline: {self._cmd} " + " ".join(map(str, list(args))))
        logger.info(f"env: {self._patched_env}")

        out = self._cmd(
            [ cmd ] + list(args),
            _env = self._env,
    #        _out=sys.stdout,
    #        _err=sys.stderr,
            )

        logger.debug(f"command output: {out.stdout}")
        return self._parser.parse(cmd, out.stdout)

    def __getattr__(self, name):
        return _proxy_exec(self, name)

    @property
    def cwd(self):
        return self._workspace.twd