Skip to content
Snippets Groups Projects
BaseTest.py 52.1 KiB
Newer Older
#####################################################################################
# (c) Copyright 1998-2023 CERN for the benefit of the LHCb and ATLAS collaborations #
#                                                                                   #
# This software is distributed under the terms of the Apache version 2 licence,     #
# copied verbatim in the file "LICENSE".                                            #
#                                                                                   #
# In applying this licence, CERN does not waive the privileges and immunities       #
# granted to it by virtue of its status as an Intergovernmental Organization        #
# or submit itself to any jurisdiction.                                             #
#####################################################################################
import logging
import os
import platform
import re
import signal
import sys
import tempfile
import threading
import time
from subprocess import PIPE, STDOUT, Popen
Marco Clemencic's avatar
Marco Clemencic committed

try:
    from html import escape as escape_for_html
except ImportError:  # Python2
    from cgi import escape as escape_for_html

Rosen Matev's avatar
Rosen Matev committed
import six

if sys.version_info < (3, 5):
    # backport of 'backslashreplace' handling of UnicodeDecodeError
    # to Python < 3.5
    from codecs import backslashreplace_errors, register_error

    def _new_backslashreplace_errors(exc):
        if isinstance(exc, UnicodeDecodeError):
            code = hex(ord(exc.object[exc.start]))
            return ("\\" + code[1:], exc.start + 1)
        else:
            return backslashreplace_errors(exc)

    register_error("backslashreplace", _new_backslashreplace_errors)
    del register_error
    del backslashreplace_errors
    del _new_backslashreplace_errors

Marco Clemencic's avatar
Marco Clemencic committed
def sanitize_for_xml(data):
Marco Clemencic's avatar
Marco Clemencic committed
    Take a string with invalid ASCII/UTF characters and quote them so that the
    string can be used in an XML text.

    >>> sanitize_for_xml('this is \x1b')
    'this is [NON-XML-CHAR-0x1B]'
    """
    bad_chars = re.compile("[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]")
Marco Clemencic's avatar
Marco Clemencic committed
    def quote(match):
        "helper function"
        return "".join("[NON-XML-CHAR-0x%2X]" % ord(c) for c in match.group())
Gitlab CI's avatar
Gitlab CI committed

Marco Clemencic's avatar
Marco Clemencic committed
    return bad_chars.sub(quote, data)

def dumpProcs(name):
    """helper to debug GAUDI-1084, dump the list of processes"""
    from getpass import getuser

    if "WORKSPACE" in os.environ:
        p = Popen(["ps", "-fH", "-U", getuser()], stdout=PIPE)
        with open(os.path.join(os.environ["WORKSPACE"], name), "wb") as f:
            f.write(p.communicate()[0])
    Send a signal to a process and all its child processes (starting from the
    leaves).
    """
    log = logging.getLogger("kill_tree")
    ps_cmd = ["ps", "--no-headers", "-o", "pid", "--ppid", str(ppid)]
    # Note: start in a clean env to avoid a freeze with libasan.so
    # See https://sourceware.org/bugzilla/show_bug.cgi?id=27653
    get_children = Popen(ps_cmd, stdout=PIPE, stderr=PIPE, env={})
    children = map(int, get_children.communicate()[0].split())
    for child in children:
        kill_tree(child, sig)
    try:
        log.debug("killing process %d", ppid)
    except OSError as err:
        if err.errno != 3:  # No such process
        log.debug("no such process %d", ppid)
Gitlab CI's avatar
Gitlab CI committed

# -------------------------------------------------------------------------#
class BaseTest(object):
    _common_tmpdir = None

    def __init__(self):
        self.program = ""
        self.reference = ""
        self.error_reference = ""
        self.options = ""
        self.stderr = ""
        self.timeout = 600
        self.exit_code = None
        self.environment = dict(os.environ)
        self.unsupported_platforms = []
        self.signal = None
        self.workdir = os.curdir
        self.use_temp_dir = False
        # Variables not for users
        self.causes = []
        self.result = Result(self)
        self.returnedCode = 0
        self.out = ""
        self.err = ""
        self.stack_trace = None
        self.basedir = os.getcwd()
        logging.debug("running test %s", self.name)

        self.result = Result(
            {
                "CAUSE": None,
                "EXCEPTION": None,
                "RESOURCE": None,
                "TARGET": None,
                "TRACEBACK": None,
                "START_TIME": None,
                "END_TIME": None,
                "TIMEOUT_DETAIL": None,
            }
        )
        if self.options:
Gitlab CI's avatar
Gitlab CI committed
            if re.search(
                r"from\s+Gaudi.Configuration\s+import\s+\*|"
                r"from\s+Configurables\s+import",
                self.options,
            ):
                suffix, lang = ".py", "python"
                suffix, lang = ".opts", "c++"
            self.result["Options"] = '<code lang="{}"><pre>{}</pre></code>'.format(
                lang, escape_for_html(self.options)
            )
            optionFile = tempfile.NamedTemporaryFile(suffix=suffix)
            optionFile.file.write(self.options.encode("utf-8"))
            optionFile.seek(0)
            self.args.append(RationalizePath(optionFile.name))

        platform_id = (
            self.environment.get("BINARY_TAG")
            or self.environment.get("CMTCONFIG")
            or platform.platform()
        )
        # If at least one regex matches we skip the test.
        skip_test = bool(
            [
                None
                for prex in self.unsupported_platforms
                if re.search(prex, platform_id)
            ]
        )
            # handle working/temporary directory options
            workdir = self.workdir
            if self.use_temp_dir:
                if self._common_tmpdir:
                    workdir = self._common_tmpdir
                else:
                    workdir = tempfile.mkdtemp()

            # prepare the command to execute
            prog = ""
            if self.program != "":
                prog = self.program
            elif "GAUDIEXE" in self.environment:
                prog = self.environment["GAUDIEXE"]
            else:
            prog_ext = os.path.splitext(prog)[1]
            if prog_ext not in [".exe", ".py", ".bat"]:
                prog += ".exe"
                prog_ext = ".exe"

            prog = which(prog) or prog

Rosen Matev's avatar
Rosen Matev committed
            args = list(map(RationalizePath, self.args))
            if prog_ext == ".py":
                params = ["python3", RationalizePath(prog)] + args
            else:
                params = [RationalizePath(prog)] + args

            # we need to switch directory because the validator expects to run
            # in the same dir as the program
            os.chdir(workdir)

            # launching test in a different thread to handle timeout exception
            def target():
                logging.debug("executing %r in %s", params, workdir)
Gitlab CI's avatar
Gitlab CI committed
                self.proc = Popen(
                    params, stdout=PIPE, stderr=PIPE, env=self.environment
                )
                logging.debug("(pid: %d)", self.proc.pid)
Rosen Matev's avatar
Rosen Matev committed
                out, err = self.proc.communicate()
                self.out = out.decode("utf-8", errors="backslashreplace")
                self.err = err.decode("utf-8", errors="backslashreplace")

            thread = threading.Thread(target=target)
            thread.start()
            thread.join(self.timeout)

            if thread.is_alive():
                logging.debug("time out in test %s (pid %d)", self.name, self.proc.pid)
                # get the stack trace of the stuck process
Gitlab CI's avatar
Gitlab CI committed
                cmd = [
                    "gdb",
                    "--pid",
                    str(self.proc.pid),
                    "--batch",
                    "--eval-command=thread apply all backtrace",
Gitlab CI's avatar
Gitlab CI committed
                ]
                gdb = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
                self.stack_trace = gdb.communicate()[0].decode(
                    "utf-8", errors="backslashreplace"
                )
                kill_tree(self.proc.pid, signal.SIGTERM)
                thread.join(60)
                if thread.is_alive():
                    kill_tree(self.proc.pid, signal.SIGKILL)
                self.causes.append("timeout")
                self.returnedCode = self.proc.returncode
                if self.returnedCode != SKIP_RETURN_CODE:
                    logging.debug(
                        f"completed test {self.name} with returncode = {self.returnedCode}"
                    logging.debug("validating test...")
                    val_start_time = time.perf_counter()
                    self.result, self.causes = self.ValidateOutput(
                        stdout=self.out, stderr=self.err, result=self.result
                    )
                    self.validate_time = round(time.perf_counter() - val_start_time, 2)
                    logging.debug(f"skipped test {self.name}")
            # remove the temporary directory if we created it
            if self.use_temp_dir and not self._common_tmpdir:
                shutil.rmtree(workdir, True)

            os.chdir(self.basedir)

            if self.status != "skipped":
                # handle application exit code
                if self.signal is not None:
                    if int(self.returnedCode) != -int(self.signal):
                        self.causes.append("exit code")
                elif self.exit_code is not None:
                    if int(self.returnedCode) != int(self.exit_code):
                        self.causes.append("exit code")
                elif self.returnedCode != 0:
                    self.causes.append("exit code")
                if self.causes:
                    self.status = "failed"
                else:
                    self.status = "passed"
Marco Clemencic's avatar
Marco Clemencic committed

        else:
        logging.debug("%s: %s", self.name, self.status)
Gitlab CI's avatar
Gitlab CI committed
        field_mapping = {
            "Exit Code": "returnedCode",
            "stderr": "err",
            "Arguments": "args",
            "Runtime Environment": "environment",
            "Status": "status",
            "stdout": "out",
            "Program Name": "program",
            "Name": "name",
            "Validator": "validator",
            "Validation execution time": "validate_time",
            "Output Reference File": "reference",
            "Error Reference File": "error_reference",
            "Causes": "causes",
Gitlab CI's avatar
Gitlab CI committed
            # 'Validator Result': 'result.annotations',
            "Unsupported Platforms": "unsupported_platforms",
            "Stack Trace": "stack_trace",
Gitlab CI's avatar
Gitlab CI committed
        }
        resultDict = [
            (key, getattr(self, attr))
            for key, attr in field_mapping.items()
            if getattr(self, attr)
        ]
        resultDict.append(
            (
                "Working Directory",
                RationalizePath(os.path.join(os.getcwd(), self.workdir)),
            )
        )
Rosen Matev's avatar
Rosen Matev committed
        # print(dict(resultDict).keys())
        resultDict.extend(self.result.annotations.items())
        # print(self.result.annotations.keys())
        resultDict = dict(resultDict)

        # Special cases
        if "Validator" in resultDict:
            resultDict["Validator"] = '<code lang="{}"><pre>{}</pre></code>'.format(
                "python", escape_for_html(resultDict["Validator"])
            )
    # -------------------------------------------------#
    # ----------------Validating tool------------------#
    # -------------------------------------------------#

    def ValidateOutput(self, stdout, stderr, result):
        if not self.stderr:
            self.validateWithReference(stdout, stderr, result, self.causes)
        elif stderr.strip() != self.stderr.strip():
            self.causes.append("standard error")
    def findReferenceBlock(
        self,
        reference=None,
        stdout=None,
        result=None,
        causes=None,
        signature_offset=0,
        signature=None,
        id=None,
    ):
        """
        Given a block of text, tries to find it in the output. The block had to be identified by a signature line. By default, the first line is used as signature, or the line pointed to by signature_offset. If signature_offset points outside the block, a signature line can be passed as signature argument. Note: if 'signature' is None (the default), a negative signature_offset is interpreted as index in a list (e.g. -1 means the last line), otherwise the it is interpreted as the number of lines before the first one of the block the signature must appear. The parameter 'id' allow to distinguish between different calls to this function in the same validation code.
        if reference is None:
            reference = self.reference
        if stdout is None:
            stdout = self.out
        if result is None:
            result = self.result
        if causes is None:
            causes = self.causes

        reflines = list(filter(None, map(lambda s: s.rstrip(), reference.splitlines())))
        if not reflines:
            raise RuntimeError("Empty (or null) reference")
        # the same on standard output
        outlines = list(filter(None, map(lambda s: s.rstrip(), stdout.splitlines())))

        res_field = "GaudiTest.RefBlock"
        if id:
            res_field += "_%s" % id

        if signature is None:
            if signature_offset < 0:
                signature_offset = len(reference) + signature_offset
            signature = reflines[signature_offset]
        # find the reference block in the output file
        try:
            pos = outlines.index(signature)
            outlines = outlines[
                pos - signature_offset : pos + len(reflines) - signature_offset
            ]
            if reflines != outlines:
                msg = "standard output"
                # I do not want 2 messages in causes if the function is called
                # twice
                    causes.append(msg)
                result[res_field + ".observed"] = result.Quote("\n".join(outlines))
        except ValueError:
            causes.append("missing signature")
        result[res_field + ".signature"] = result.Quote(signature)
        if len(reflines) > 1 or signature != reflines[0]:
            result[res_field + ".expected"] = result.Quote("\n".join(reflines))
        return causes

    def countErrorLines(
        self, expected={"ERROR": 0, "FATAL": 0}, stdout=None, result=None, causes=None
    ):
        """
        Count the number of messages with required severity (by default ERROR and FATAL)
        and check if their numbers match the expected ones (0 by default).
        The dictionary "expected" can be used to tune the number of errors and fatals
        allowed, or to limit the number of expected warnings etc.
        if stdout is None:
            stdout = self.out
        if result is None:
            result = self.result
        if causes is None:
            causes = self.causes

        # prepare the dictionary to record the extracted lines
        errors = {}
        for sev in expected:
            errors[sev] = []

        outlines = stdout.splitlines()
        from math import log10
        fmt = "%%%dd - %%s" % (int(log10(len(outlines) + 1)))

        linecount = 0
        for l in outlines:
            linecount += 1
            words = l.split()
            if len(words) >= 2 and words[1] in errors:
                errors[words[1]].append(fmt % (linecount, l.rstrip()))

        for e in errors:
            if len(errors[e]) != expected[e]:
                causes.append("%s(%d)" % (e, len(errors[e])))
                result["GaudiTest.lines.%s" % e] = result.Quote("\n".join(errors[e]))
Gitlab CI's avatar
Gitlab CI committed
                result["GaudiTest.lines.%s.expected#" % e] = result.Quote(
    def CheckTTreesSummaries(
        self,
        stdout=None,
        result=None,
        causes=None,
        trees_dict=None,
        ignore=r"Basket|.*size|Compression",
    ):
        """
        Compare the TTree summaries in stdout with the ones in trees_dict or in
        the reference file. By default ignore the size, compression and basket
        fields.
        The presence of TTree summaries when none is expected is not a failure.
        if stdout is None:
            stdout = self.out
        if result is None:
            result = self.result
        if causes is None:
            causes = self.causes
        if trees_dict is None:
            lreference = self._expandReferenceFileName(self.reference)
            # call the validator if the file exists
            if lreference and os.path.isfile(lreference):
                trees_dict = findTTreeSummaries(open(lreference).read())
            else:
                trees_dict = {}

        from pprint import PrettyPrinter
        pp = PrettyPrinter()
        if trees_dict:
            result["GaudiTest.TTrees.expected"] = result.Quote(pp.pformat(trees_dict))
            if ignore:
                result["GaudiTest.TTrees.ignore"] = result.Quote(ignore)

        trees = findTTreeSummaries(stdout)
        failed = cmpTreesDicts(trees_dict, trees, ignore)
        if failed:
            causes.append("trees summaries")
            msg = "%s: %s != %s" % getCmpFailingValues(trees_dict, trees, failed)
            result["GaudiTest.TTrees.failure_on"] = result.Quote(msg)
            result["GaudiTest.TTrees.found"] = result.Quote(pp.pformat(trees))

        return causes

    def CheckHistosSummaries(
        self, stdout=None, result=None, causes=None, dict=None, ignore=None
    ):
        """
        Compare the TTree summaries in stdout with the ones in trees_dict or in
        the reference file. By default ignore the size, compression and basket
        fields.
        The presence of TTree summaries when none is expected is not a failure.
        if stdout is None:
            stdout = self.out
        if result is None:
            result = self.result
        if causes is None:
            causes = self.causes
            lreference = self._expandReferenceFileName(self.reference)
            # call the validator if the file exists
            if lreference and os.path.isfile(lreference):
                dict = findHistosSummaries(open(lreference).read())
            else:
                dict = {}

        from pprint import PrettyPrinter
        pp = PrettyPrinter()
        if dict:
            result["GaudiTest.Histos.expected"] = result.Quote(pp.pformat(dict))
            if ignore:
                result["GaudiTest.Histos.ignore"] = result.Quote(ignore)

        histos = findHistosSummaries(stdout)
        failed = cmpTreesDicts(dict, histos, ignore)
        if failed:
            causes.append("histos summaries")
            msg = "%s: %s != %s" % getCmpFailingValues(dict, histos, failed)
            result["GaudiTest.Histos.failure_on"] = result.Quote(msg)
            result["GaudiTest.Histos.found"] = result.Quote(pp.pformat(histos))

        return causes

    def validateWithReference(
        self, stdout=None, stderr=None, result=None, causes=None, preproc=None
    ):
        """
Marco Clemencic's avatar
Marco Clemencic committed
        Default validation acti*on: compare standard output and error to the
        reference files.
        if stdout is None:
            stdout = self.out
        if stderr is None:
            stderr = self.err
        if result is None:
            result = self.result
        if causes is None:
            causes = self.causes

        # set the default output preprocessor
        if preproc is None:
            preproc = normalizeExamples
        # check standard output
        lreference = self._expandReferenceFileName(self.reference)
        # call the validator if the file exists
        if lreference and os.path.isfile(lreference):
Gitlab CI's avatar
Gitlab CI committed
            causes += ReferenceFileValidator(
                lreference, "standard output", "Output Diff", preproc=preproc
            )(stdout, result)
        elif lreference:
            causes += ["missing reference file"]
        # Compare TTree summaries
        causes = self.CheckTTreesSummaries(stdout, result, causes)
        causes = self.CheckHistosSummaries(stdout, result, causes)
        if causes and lreference:  # Write a new reference file for stdout
                newrefname = ".".join([lreference, "new"])
                while os.path.exists(newrefname):
                    cnt += 1
                    newrefname = ".".join([lreference, "~%d~" % cnt, "new"])
                newref = open(newrefname, "w")
                # sanitize newlines
                for l in stdout.splitlines():
                    newref.write(l.rstrip() + "\n")
                del newref  # flush and close
                result["New Output Reference File"] = os.path.relpath(
                    newrefname, self.basedir
                )
            except IOError:
                # Ignore IO errors when trying to update reference files
                # because we may be in a read-only filesystem
                pass

        # check standard error
        lreference = self._expandReferenceFileName(self.error_reference)
        # call the validator if we have a file to use
        if lreference:
            if os.path.isfile(lreference):
Gitlab CI's avatar
Gitlab CI committed
                newcauses = ReferenceFileValidator(
                    lreference, "standard error", "Error Diff", preproc=preproc
                )(stderr, result)
                newcauses = ["missing error reference file"]
            causes += newcauses
            if newcauses and lreference:  # Write a new reference file for stdedd
                cnt = 0
                newrefname = ".".join([lreference, "new"])
                while os.path.exists(newrefname):
                    cnt += 1
                    newrefname = ".".join([lreference, "~%d~" % cnt, "new"])
                newref = open(newrefname, "w")
                # sanitize newlines
                for l in stderr.splitlines():
                    newref.write(l.rstrip() + "\n")
                del newref  # flush and close
                result["New Error Reference File"] = os.path.relpath(
                    newrefname, self.basedir
                )
            causes += BasicOutputValidator(
                lreference, "standard error", "ExecTest.expected_stderr"
            )(stderr, result)
    def validateJSONWithReference(
        self,
        output_file,
        reference_file,
        result=None,
        causes=None,
        detailed=True,
    ):
        """
        JSON validation action: compare json file to reference file
        """

        if result is None:
            result = self.result
        if causes is None:
            causes = self.causes

        if not os.path.isfile(output_file):
            causes.append(f"output file {output_file} does not exist")
            return causes

        try:
            with open(output_file) as f:
                output = json.load(f)
        except json.JSONDecodeError as err:
            causes.append("json parser error")
            result["output_parse_error"] = f"json parser error in {output_file}: {err}"
            return causes

        lreference = self._expandReferenceFileName(reference_file)
        if not lreference:
            causes.append("reference file not set")
        elif not os.path.isfile(lreference):
            causes.append("reference file does not exist")
        else:
            causes += JSONOutputValidator()(lreference, output, result, detailed)
        if causes and lreference:  # Write a new reference file for output
            try:
                cnt = 0
                newrefname = ".".join([lreference, "new"])
                while os.path.exists(newrefname):
                    cnt += 1
                    newrefname = ".".join([lreference, "~%d~" % cnt, "new"])
                with open(newrefname, "w") as newref:
                    json.dump(output, newref, indent=4)
                result["New JSON Output Reference File"] = os.path.relpath(
                    newrefname, self.basedir
                )
            except IOError:
                # Ignore IO errors when trying to update reference files
                # because we may be in a read-only filesystem
                pass
        return causes

    def _expandReferenceFileName(self, reffile):
        # if no file is passed, do nothing
        if not reffile:
            return ""

        # function to split an extension in constituents parts
        def platformSplit(p):
            return set(re.split(r"[-+]", p))
Gitlab CI's avatar
Gitlab CI committed
        reference = os.path.normpath(
            os.path.join(self.basedir, os.path.expandvars(reffile))
        )

        # old-style platform-specific reference name
        spec_ref = reference[:-3] + GetPlatform(self)[0:3] + reference[-3:]
        if os.path.isfile(spec_ref):
            reference = spec_ref
        else:  # look for new-style platform specific reference files:
            # get all the files whose name start with the reference filename
            dirname, basename = os.path.split(reference)
            if not dirname:
            head = basename + "."
            head_len = len(head)
            platform = platformSplit(GetPlatform(self))
            if "do0" in platform:
                platform.add("dbg")
            candidates = []
            for f in os.listdir(dirname):
                if f.startswith(head):
                    req_plat = platformSplit(f[head_len:])
                    if platform.issuperset(req_plat):
                        candidates.append((len(req_plat), f))
            if candidates:  # take the one with highest matching
                # FIXME: it is not possible to say if x86_64-slc5-gcc43-dbg
                #        has to use ref.x86_64-gcc43 or ref.slc5-dbg
                candidates.sort()
                reference = os.path.join(dirname, candidates[-1][1])
        return reference

Gitlab CI's avatar
Gitlab CI committed
# ======= GAUDI TOOLS =======
import shutil

try:
    from GaudiKernel import ROOT6WorkAroundEnabled
except ImportError:
Gitlab CI's avatar
Gitlab CI committed

    def ROOT6WorkAroundEnabled(id=None):
        # dummy implementation
        return False

Gitlab CI's avatar
Gitlab CI committed

# --------------------------------- TOOLS ---------------------------------#

def RationalizePath(p):
    """
    Function used to normalize the used path
    """
    newPath = os.path.normpath(os.path.expandvars(p))
    if os.path.exists(newPath):
        p = os.path.realpath(newPath)
    return p


def which(executable):
    """
    Locates an executable in the executables path ($PATH) and returns the full
    path to it.  An application is looked for with or without the '.exe' suffix.
    If the executable cannot be found, None is returned
    """
    if os.path.isabs(executable):
        if not os.path.isfile(executable):
            if executable.endswith(".exe"):
                if os.path.isfile(executable[:-4]):
                    return executable[:-4]
            else:
                executable = os.path.split(executable)[1]
        else:
            return executable
    for d in os.environ.get("PATH").split(os.pathsep):
        fullpath = os.path.join(d, executable)
        if os.path.isfile(fullpath):
            return fullpath
        elif executable.endswith(".exe") and os.path.isfile(fullpath[:-4]):
            return fullpath[:-4]
# -------------------------------------------------------------------------#
# ----------------------------- Result Classe -----------------------------#
# -------------------------------------------------------------------------#
    PASS = "PASS"
    FAIL = "FAIL"
    ERROR = "ERROR"
    UNTESTED = "UNTESTED"

    EXCEPTION = ""
    RESOURCE = ""
    TARGET = ""
    TRACEBACK = ""
    START_TIME = ""
    END_TIME = ""
    TIMEOUT_DETAIL = ""

    def __init__(self, kind=None, id=None, outcome=PASS, annotations={}):
        self.annotations = annotations.copy()

    def __getitem__(self, key):
Rosen Matev's avatar
Rosen Matev committed
        assert isinstance(key, six.string_types)
        return self.annotations[key]

    def __setitem__(self, key, value):
Rosen Matev's avatar
Rosen Matev committed
        assert isinstance(key, six.string_types)
        assert isinstance(value, six.string_types), "{!r} is not a string".format(value)
        self.annotations[key] = value
    def Quote(self, text):
        """
        Convert text to html by escaping special chars and adding <pre> tags.
        """
        return "<pre>{}</pre>".format(escape_for_html(text))
# -------------------------------------------------------------------------#
# --------------------------- Validator Classes ---------------------------#
# -------------------------------------------------------------------------#
# Basic implementation of an option validator for Gaudi test. This
# implementation is based on the standard (LCG) validation functions used
# in QMTest.


class BasicOutputValidator:
    def __init__(self, ref, cause, result_key):
        self.ref = ref
        self.cause = cause
        self.result_key = result_key
    def __call__(self, out, result):
        """Validate the output of the program.
        'stdout' -- A string containing the data written to the standard output
        stream.
        'stderr' -- A string containing the data written to the standard error
        stream.
        'result' -- A 'Result' object. It may be used to annotate
        the outcome according to the content of stderr.
        returns -- A list of strings giving causes of failure."""
        causes = []
        # Check the output
        if not self.__CompareText(out, self.ref):
            causes.append(self.cause)
            result[self.result_key] = result.Quote(self.ref)

        return causes

    def __CompareText(self, s1, s2):
        """Compare 's1' and 's2', ignoring line endings.
        's1' -- A string.
        's2' -- A string.
        returns -- True if 's1' and 's2' are the same, ignoring
        differences in line endings."""
        if ROOT6WorkAroundEnabled("ReadRootmapCheck"):
            # FIXME: (MCl) Hide warnings from new rootmap sanity check until we
            # can fix them
            to_ignore = re.compile(
                r"Warning in <TInterpreter::ReadRootmapFile>: .* is already in .*"
Gitlab CI's avatar
Gitlab CI committed
            )
Gitlab CI's avatar
Gitlab CI committed
            def keep_line(l):
                return not to_ignore.match(l)

Rosen Matev's avatar
Rosen Matev committed
            return list(filter(keep_line, s1.splitlines())) == list(
                filter(keep_line, s2.splitlines())
            )
        else:
            return s1.splitlines() == s2.splitlines()


# ------------------------ Preprocessor elements ------------------------#
class FilePreprocessor:
    """Base class for a callable that takes a file and returns a modified
    version of it."""
    def __processLine__(self, line):
        return line
    def __processFile__(self, lines):
        output = []
        for l in lines:
            l = self.__processLine__(l)
            if l:
                output.append(l)
    def __call__(self, input):
Rosen Matev's avatar
Rosen Matev committed
        if not isinstance(input, six.string_types):
            lines = input
            mergeback = False
        else:
            lines = input.splitlines()
            mergeback = True
        output = self.__processFile__(lines)
        if mergeback:
            output = "\n".join(output)
    def __add__(self, rhs):
        return FilePreprocessorSequence([self, rhs])


class FilePreprocessorSequence(FilePreprocessor):
    def __init__(self, members=[]):
        self.members = members
    def __add__(self, rhs):
        return FilePreprocessorSequence(self.members + [rhs])
    def __call__(self, input):
        output = input
        for pp in self.members:
            output = pp(output)
        return output

class LineSkipper(FilePreprocessor):
    def __init__(self, strings=[], regexps=[]):
        self.strings = strings
Rosen Matev's avatar
Rosen Matev committed
        self.regexps = list(map(re.compile, regexps))

    def __processLine__(self, line):
        for s in self.strings:
            if line.find(s) >= 0:
                return None
        for r in self.regexps:
            if r.search(line):
                return None
class BlockSkipper(FilePreprocessor):
    def __init__(self, start, end):
        self.start = start
        self.end = end
        self._skipping = False

    def __processLine__(self, line):
        if self.start in line:
            self._skipping = True
            return None
        elif self.end in line:
            self._skipping = False
        elif self._skipping:
            return None
        return line

class RegexpReplacer(FilePreprocessor):
    def __init__(self, orig, repl="", when=None):
        if when:
            when = re.compile(when)
        self._operations = [(when, re.compile(orig), repl)]

    def __add__(self, rhs):
        if isinstance(rhs, RegexpReplacer):
            res = RegexpReplacer("", "", None)
            res._operations = self._operations + rhs._operations
        else:
            res = FilePreprocessor.__add__(self, rhs)
        return res
    def __processLine__(self, line):
        for w, o, r in self._operations:
            if w is None or w.search(line):
                line = o.sub(r, line)
        return line

# Common preprocessors
maskPointers = RegexpReplacer("0x[0-9a-fA-F]{4,16}", "0x########")
normalizeDate = RegexpReplacer(
    "[0-2]?[0-9]:[0-5][0-9]:[0-5][0-9] [0-9]{4}[-/][01][0-9][-/][0-3][0-9][ A-Z]*",
    "00:00:00 1970-01-01",
)
normalizeEOL = FilePreprocessor()
normalizeEOL.__processLine__ = lambda line: str(line).rstrip() + "\n"

skipEmptyLines = FilePreprocessor()
# FIXME: that's ugly
skipEmptyLines.__processLine__ = lambda line: (line.strip() and line) or None

# Special preprocessor sorting the list of strings (whitespace separated)
#  that follow a signature on a single line
class LineSorter(FilePreprocessor):
    def __init__(self, signature):
        self.signature = signature
        self.siglen = len(signature)
    def __processLine__(self, line):
        pos = line.find(self.signature)
        if pos >= 0:
            line = line[: (pos + self.siglen)]
            lst = line[(pos + self.siglen) :].split()
            lst.sort()
            line += " ".join(lst)
        return line

class SortGroupOfLines(FilePreprocessor):
    Sort group of lines matching a regular expression