Newer
Older
#####################################################################################
# (c) Copyright 1998-2023 CERN for the benefit of the LHCb and ATLAS collaborations #
# #
# This software is distributed under the terms of the Apache version 2 licence, #
# copied verbatim in the file "LICENSE". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
#####################################################################################
import json
import os
import platform
import re
import signal
import sys
import tempfile
import threading
import time
from subprocess import PIPE, STDOUT, Popen
from unittest import TestCase
try:
from html import escape as escape_for_html
except ImportError: # Python2
from cgi import escape as escape_for_html
if sys.version_info < (3, 5):
# backport of 'backslashreplace' handling of UnicodeDecodeError
# to Python < 3.5
from codecs import backslashreplace_errors, register_error
def _new_backslashreplace_errors(exc):
if isinstance(exc, UnicodeDecodeError):
code = hex(ord(exc.object[exc.start]))
return ("\\" + code[1:], exc.start + 1)
else:
return backslashreplace_errors(exc)
register_error("backslashreplace", _new_backslashreplace_errors)
del register_error
del backslashreplace_errors
del _new_backslashreplace_errors
SKIP_RETURN_CODE = 77
Take a string with invalid ASCII/UTF characters and quote them so that the
string can be used in an XML text.
>>> sanitize_for_xml('this is \x1b')
'this is [NON-XML-CHAR-0x1B]'
"""
bad_chars = re.compile("[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]")
"helper function"
return "".join("[NON-XML-CHAR-0x%2X]" % ord(c) for c in match.group())
"""helper to debug GAUDI-1084, dump the list of processes"""
from getpass import getuser
if "WORKSPACE" in os.environ:
p = Popen(["ps", "-fH", "-U", getuser()], stdout=PIPE)
with open(os.path.join(os.environ["WORKSPACE"], name), "wb") as f:
f.write(p.communicate()[0])
def kill_tree(ppid, sig):
Send a signal to a process and all its child processes (starting from the
leaves).
"""
log = logging.getLogger("kill_tree")
ps_cmd = ["ps", "--no-headers", "-o", "pid", "--ppid", str(ppid)]
# Note: start in a clean env to avoid a freeze with libasan.so
# See https://sourceware.org/bugzilla/show_bug.cgi?id=27653
get_children = Popen(ps_cmd, stdout=PIPE, stderr=PIPE, env={})
children = map(int, get_children.communicate()[0].split())
for child in children:
kill_tree(child, sig)
try:
log.debug("killing process %d", ppid)
os.kill(ppid, sig)
log.debug("no such process %d", ppid)
# -------------------------------------------------------------------------#
class BaseTest(object):
_common_tmpdir = None
self.reference = ""
self.error_reference = ""
self.options = ""
self.stderr = ""
self.environment = dict(os.environ)
self.unsupported_platforms = []
self.signal = None
self.workdir = os.curdir
self.use_temp_dir = False
self.causes = []
self.result = Result(self)
self.out = ""
self.err = ""
self.basedir = os.getcwd()
self.validate_time = None

Marco Clemencic
committed
def run(self):
logging.debug("running test %s", self.name)
self.result = Result(
{
"CAUSE": None,
"EXCEPTION": None,
"RESOURCE": None,
"TARGET": None,
"TRACEBACK": None,
"START_TIME": None,
"END_TIME": None,
"TIMEOUT_DETAIL": None,
}
)
r"from\s+Gaudi.Configuration\s+import\s+\*|"
r"from\s+Configurables\s+import",
self.options,
):
suffix, lang = ".py", "python"
suffix, lang = ".opts", "c++"
self.result["Options"] = '<code lang="{}"><pre>{}</pre></code>'.format(
lang, escape_for_html(self.options)
)
optionFile = tempfile.NamedTemporaryFile(suffix=suffix)
optionFile.file.write(self.options.encode("utf-8"))
optionFile.seek(0)
self.args.append(RationalizePath(optionFile.name))
platform_id = (
self.environment.get("BINARY_TAG")
or self.environment.get("CMTCONFIG")
or platform.platform()
)
# If at least one regex matches we skip the test.
skip_test = bool(
[
None
for prex in self.unsupported_platforms
if re.search(prex, platform_id)
]
)

Marco Clemencic
committed
# handle working/temporary directory options
workdir = self.workdir
if self.use_temp_dir:
if self._common_tmpdir:
workdir = self._common_tmpdir
else:
workdir = tempfile.mkdtemp()
# prepare the command to execute
prog = ""
if self.program != "":
elif "GAUDIEXE" in self.environment:
prog = self.environment["GAUDIEXE"]
prog_ext = os.path.splitext(prog)[1]
prog += ".exe"
prog_ext = ".exe"
prog = which(prog) or prog
params = ["python3", RationalizePath(prog)] + args
params = [RationalizePath(prog)] + args
# we need to switch directory because the validator expects to run
# in the same dir as the program
os.chdir(workdir)
# launching test in a different thread to handle timeout exception
def target():
logging.debug("executing %r in %s", params, workdir)
params, stdout=PIPE, stderr=PIPE, env=self.environment
)
logging.debug("(pid: %d)", self.proc.pid)
self.out = out.decode("utf-8", errors="backslashreplace")
self.err = err.decode("utf-8", errors="backslashreplace")
thread = threading.Thread(target=target)
thread.start()
# catching timeout
thread.join(self.timeout)
if thread.is_alive():
logging.debug("time out in test %s (pid %d)", self.name, self.proc.pid)
# get the stack trace of the stuck process
"gdb",
"--pid",
str(self.proc.pid),
"--batch",
"--eval-command=thread apply all backtrace",
gdb = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
self.stack_trace = gdb.communicate()[0].decode(
"utf-8", errors="backslashreplace"
)
kill_tree(self.proc.pid, signal.SIGTERM)
thread.join(60)
if thread.is_alive():
kill_tree(self.proc.pid, signal.SIGKILL)
self.returnedCode = self.proc.returncode
if self.returnedCode != SKIP_RETURN_CODE:
logging.debug(
f"completed test {self.name} with returncode = {self.returnedCode}"
)
logging.debug("validating test...")
val_start_time = time.perf_counter()
self.result, self.causes = self.ValidateOutput(
stdout=self.out, stderr=self.err, result=self.result
)
self.validate_time = round(time.perf_counter() - val_start_time, 2)
else:
logging.debug(f"skipped test {self.name}")
self.status = "skipped"
# remove the temporary directory if we created it
if self.use_temp_dir and not self._common_tmpdir:
shutil.rmtree(workdir, True)
os.chdir(self.basedir)
if self.status != "skipped":
# handle application exit code
if self.signal is not None:
if int(self.returnedCode) != -int(self.signal):
self.causes.append("exit code")
elif self.exit_code is not None:
if int(self.returnedCode) != int(self.exit_code):
self.causes.append("exit code")
elif self.returnedCode != 0:
self.causes.append("exit code")
if self.causes:
self.status = "failed"
else:
self.status = "passed"
self.status = "skipped"
logging.debug("%s: %s", self.name, self.status)
"Exit Code": "returnedCode",
"stderr": "err",
"Arguments": "args",
"Runtime Environment": "environment",
"Status": "status",
"stdout": "out",
"Program Name": "program",
"Name": "name",
"Validator": "validator",
"Validation execution time": "validate_time",
"Output Reference File": "reference",
"Error Reference File": "error_reference",
"Causes": "causes",
"Unsupported Platforms": "unsupported_platforms",
"Stack Trace": "stack_trace",
resultDict = [
(key, getattr(self, attr))
for key, attr in field_mapping.items()
if getattr(self, attr)
]
resultDict.append(
(
"Working Directory",
RationalizePath(os.path.join(os.getcwd(), self.workdir)),
)
)
# print(dict(resultDict).keys())
resultDict.extend(self.result.annotations.items())
# print(self.result.annotations.keys())
resultDict = dict(resultDict)
# Special cases
if "Validator" in resultDict:
resultDict["Validator"] = '<code lang="{}"><pre>{}</pre></code>'.format(
"python", escape_for_html(resultDict["Validator"])
)
# -------------------------------------------------#
# ----------------Validating tool------------------#
# -------------------------------------------------#
def ValidateOutput(self, stdout, stderr, result):
if not self.stderr:
self.validateWithReference(stdout, stderr, result, self.causes)
elif stderr.strip() != self.stderr.strip():
self.causes.append("standard error")
return result, self.causes
def findReferenceBlock(
self,
reference=None,
stdout=None,
result=None,
causes=None,
signature_offset=0,
signature=None,
id=None,
):
"""
Given a block of text, tries to find it in the output. The block had to be identified by a signature line. By default, the first line is used as signature, or the line pointed to by signature_offset. If signature_offset points outside the block, a signature line can be passed as signature argument. Note: if 'signature' is None (the default), a negative signature_offset is interpreted as index in a list (e.g. -1 means the last line), otherwise the it is interpreted as the number of lines before the first one of the block the signature must appear. The parameter 'id' allow to distinguish between different calls to this function in the same validation code.
if reference is None:
reference = self.reference
if stdout is None:
stdout = self.out
if result is None:
result = self.result
if causes is None:
causes = self.causes
reflines = list(filter(None, map(lambda s: s.rstrip(), reference.splitlines())))
if not reflines:
raise RuntimeError("Empty (or null) reference")
# the same on standard output
outlines = list(filter(None, map(lambda s: s.rstrip(), stdout.splitlines())))
res_field = "GaudiTest.RefBlock"
if id:
res_field += "_%s" % id
if signature is None:
if signature_offset < 0:
signature_offset = len(reference) + signature_offset
signature = reflines[signature_offset]
# find the reference block in the output file
try:
pos = outlines.index(signature)
outlines = outlines[
pos - signature_offset : pos + len(reflines) - signature_offset
]
if reflines != outlines:
msg = "standard output"
# I do not want 2 messages in causes if the function is called
# twice
if msg not in causes:
result[res_field + ".observed"] = result.Quote("\n".join(outlines))
except ValueError:
causes.append("missing signature")
result[res_field + ".signature"] = result.Quote(signature)
if len(reflines) > 1 or signature != reflines[0]:
result[res_field + ".expected"] = result.Quote("\n".join(reflines))
return causes
def countErrorLines(
self, expected={"ERROR": 0, "FATAL": 0}, stdout=None, result=None, causes=None
):
"""
Count the number of messages with required severity (by default ERROR and FATAL)
and check if their numbers match the expected ones (0 by default).
The dictionary "expected" can be used to tune the number of errors and fatals
allowed, or to limit the number of expected warnings etc.
if stdout is None:
stdout = self.out
if result is None:
result = self.result
if causes is None:
causes = self.causes
# prepare the dictionary to record the extracted lines
errors = {}
for sev in expected:
errors[sev] = []
outlines = stdout.splitlines()
from math import log10
fmt = "%%%dd - %%s" % (int(log10(len(outlines) + 1)))
linecount = 0
for l in outlines:
linecount += 1
words = l.split()
if len(words) >= 2 and words[1] in errors:
errors[words[1]].append(fmt % (linecount, l.rstrip()))
for e in errors:
if len(errors[e]) != expected[e]:
causes.append("%s(%d)" % (e, len(errors[e])))
result["GaudiTest.lines.%s" % e] = result.Quote("\n".join(errors[e]))
def CheckTTreesSummaries(
self,
stdout=None,
result=None,
causes=None,
trees_dict=None,
ignore=r"Basket|.*size|Compression",
):
"""
Compare the TTree summaries in stdout with the ones in trees_dict or in
the reference file. By default ignore the size, compression and basket
fields.
The presence of TTree summaries when none is expected is not a failure.
if stdout is None:
stdout = self.out
if result is None:
result = self.result
if causes is None:
causes = self.causes
lreference = self._expandReferenceFileName(self.reference)
# call the validator if the file exists
if lreference and os.path.isfile(lreference):
trees_dict = findTTreeSummaries(open(lreference).read())
else:
trees_dict = {}
from pprint import PrettyPrinter
pp = PrettyPrinter()
if trees_dict:
result["GaudiTest.TTrees.expected"] = result.Quote(pp.pformat(trees_dict))
if ignore:
result["GaudiTest.TTrees.ignore"] = result.Quote(ignore)
trees = findTTreeSummaries(stdout)
failed = cmpTreesDicts(trees_dict, trees, ignore)
if failed:
causes.append("trees summaries")
msg = "%s: %s != %s" % getCmpFailingValues(trees_dict, trees, failed)
result["GaudiTest.TTrees.failure_on"] = result.Quote(msg)
result["GaudiTest.TTrees.found"] = result.Quote(pp.pformat(trees))
return causes
def CheckHistosSummaries(
self, stdout=None, result=None, causes=None, dict=None, ignore=None
):
"""
Compare the TTree summaries in stdout with the ones in trees_dict or in
the reference file. By default ignore the size, compression and basket
fields.
The presence of TTree summaries when none is expected is not a failure.
if stdout is None:
stdout = self.out
if result is None:
result = self.result
if causes is None:
causes = self.causes
lreference = self._expandReferenceFileName(self.reference)
# call the validator if the file exists
if lreference and os.path.isfile(lreference):
dict = findHistosSummaries(open(lreference).read())
else:
dict = {}
from pprint import PrettyPrinter
pp = PrettyPrinter()
if dict:
result["GaudiTest.Histos.expected"] = result.Quote(pp.pformat(dict))
if ignore:
result["GaudiTest.Histos.ignore"] = result.Quote(ignore)
histos = findHistosSummaries(stdout)
failed = cmpTreesDicts(dict, histos, ignore)
if failed:
causes.append("histos summaries")
msg = "%s: %s != %s" % getCmpFailingValues(dict, histos, failed)
result["GaudiTest.Histos.failure_on"] = result.Quote(msg)
result["GaudiTest.Histos.found"] = result.Quote(pp.pformat(histos))
return causes
def validateWithReference(
self, stdout=None, stderr=None, result=None, causes=None, preproc=None
):
"""
Default validation acti*on: compare standard output and error to the
reference files.
if stdout is None:
stdout = self.out
if stderr is None:
stderr = self.err
if result is None:
result = self.result
if causes is None:
causes = self.causes
# set the default output preprocessor
if preproc is None:
preproc = normalizeExamples
# check standard output
lreference = self._expandReferenceFileName(self.reference)
# call the validator if the file exists
if lreference and os.path.isfile(lreference):
lreference, "standard output", "Output Diff", preproc=preproc
)(stdout, result)
elif lreference:
causes += ["missing reference file"]
# Compare TTree summaries
causes = self.CheckTTreesSummaries(stdout, result, causes)
causes = self.CheckHistosSummaries(stdout, result, causes)
if causes and lreference: # Write a new reference file for stdout
newrefname = ".".join([lreference, "new"])
while os.path.exists(newrefname):
cnt += 1
newrefname = ".".join([lreference, "~%d~" % cnt, "new"])
newref = open(newrefname, "w")
# sanitize newlines
for l in stdout.splitlines():
newref.write(l.rstrip() + "\n")
result["New Output Reference File"] = os.path.relpath(
newrefname, self.basedir
)
except IOError:
# Ignore IO errors when trying to update reference files
# because we may be in a read-only filesystem
pass
# check standard error
lreference = self._expandReferenceFileName(self.error_reference)
# call the validator if we have a file to use
if lreference:
if os.path.isfile(lreference):
lreference, "standard error", "Error Diff", preproc=preproc
)(stderr, result)
newcauses = ["missing error reference file"]
if newcauses and lreference: # Write a new reference file for stdedd
cnt = 0
newrefname = ".".join([lreference, "new"])
while os.path.exists(newrefname):
cnt += 1
newrefname = ".".join([lreference, "~%d~" % cnt, "new"])
newref = open(newrefname, "w")
# sanitize newlines
for l in stderr.splitlines():
newref.write(l.rstrip() + "\n")
result["New Error Reference File"] = os.path.relpath(
newrefname, self.basedir
)
causes += BasicOutputValidator(
lreference, "standard error", "ExecTest.expected_stderr"
)(stderr, result)
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
def validateJSONWithReference(
self,
output_file,
reference_file,
result=None,
causes=None,
detailed=True,
):
"""
JSON validation action: compare json file to reference file
"""
if result is None:
result = self.result
if causes is None:
causes = self.causes
if not os.path.isfile(output_file):
causes.append(f"output file {output_file} does not exist")
return causes
try:
with open(output_file) as f:
output = json.load(f)
except json.JSONDecodeError as err:
causes.append("json parser error")
result["output_parse_error"] = f"json parser error in {output_file}: {err}"
return causes
lreference = self._expandReferenceFileName(reference_file)
if not lreference:
causes.append("reference file not set")
elif not os.path.isfile(lreference):
causes.append("reference file does not exist")
else:
causes += JSONOutputValidator()(lreference, output, result, detailed)
if causes and lreference: # Write a new reference file for output
try:
cnt = 0
newrefname = ".".join([lreference, "new"])
while os.path.exists(newrefname):
cnt += 1
newrefname = ".".join([lreference, "~%d~" % cnt, "new"])
with open(newrefname, "w") as newref:
json.dump(output, newref, indent=4)
result["New JSON Output Reference File"] = os.path.relpath(
newrefname, self.basedir
)
except IOError:
# Ignore IO errors when trying to update reference files
# because we may be in a read-only filesystem
pass
return causes
def _expandReferenceFileName(self, reffile):
# if no file is passed, do nothing
if not reffile:
return ""
# function to split an extension in constituents parts
import re
def platformSplit(p):
return set(re.split(r"[-+]", p))
os.path.join(self.basedir, os.path.expandvars(reffile))
)
# old-style platform-specific reference name
spec_ref = reference[:-3] + GetPlatform(self)[0:3] + reference[-3:]
if os.path.isfile(spec_ref):
reference = spec_ref
else: # look for new-style platform specific reference files:
# get all the files whose name start with the reference filename
dirname, basename = os.path.split(reference)
head = basename + "."
head_len = len(head)
platform = platformSplit(GetPlatform(self))
if "do0" in platform:
platform.add("dbg")
candidates = []
for f in os.listdir(dirname):
if f.startswith(head):
req_plat = platformSplit(f[head_len:])
if platform.issuperset(req_plat):
candidates.append((len(req_plat), f))
if candidates: # take the one with highest matching
# FIXME: it is not possible to say if x86_64-slc5-gcc43-dbg
# has to use ref.x86_64-gcc43 or ref.slc5-dbg
candidates.sort()
reference = os.path.join(dirname, candidates[-1][1])
return reference
import shutil
try:
from GaudiKernel import ROOT6WorkAroundEnabled
except ImportError:
def ROOT6WorkAroundEnabled(id=None):
# dummy implementation
return False
# --------------------------------- TOOLS ---------------------------------#
"""
Function used to normalize the used path
"""
newPath = os.path.normpath(os.path.expandvars(p))
p = os.path.realpath(newPath)
return p
def which(executable):
"""
Locates an executable in the executables path ($PATH) and returns the full
path to it. An application is looked for with or without the '.exe' suffix.
If the executable cannot be found, None is returned
"""
if os.path.isabs(executable):
if not os.path.isfile(executable):
if executable.endswith(".exe"):
if os.path.isfile(executable[:-4]):
executable = os.path.split(executable)[1]
return executable
for d in os.environ.get("PATH").split(os.pathsep):
fullpath = os.path.join(d, executable)
elif executable.endswith(".exe") and os.path.isfile(fullpath[:-4]):
# -------------------------------------------------------------------------#
# ----------------------------- Result Classe -----------------------------#
# -------------------------------------------------------------------------#
PASS = "PASS"
FAIL = "FAIL"
ERROR = "ERROR"
UNTESTED = "UNTESTED"
EXCEPTION = ""
RESOURCE = ""
TARGET = ""
TRACEBACK = ""
START_TIME = ""
END_TIME = ""
TIMEOUT_DETAIL = ""
def __init__(self, kind=None, id=None, outcome=PASS, annotations={}):
self.annotations = annotations.copy()
return self.annotations[key]
assert isinstance(value, six.string_types), "{!r} is not a string".format(value)
def Quote(self, text):
"""
Convert text to html by escaping special chars and adding <pre> tags.
"""
return "<pre>{}</pre>".format(escape_for_html(text))
# -------------------------------------------------------------------------#
# --------------------------- Validator Classes ---------------------------#
# -------------------------------------------------------------------------#
# Basic implementation of an option validator for Gaudi test. This
# implementation is based on the standard (LCG) validation functions used
# in QMTest.
class BasicOutputValidator:
def __init__(self, ref, cause, result_key):
self.ref = ref
self.cause = cause
self.result_key = result_key
"""Validate the output of the program.
'stdout' -- A string containing the data written to the standard output
stream.
'stderr' -- A string containing the data written to the standard error
stream.
'result' -- A 'Result' object. It may be used to annotate
the outcome according to the content of stderr.
returns -- A list of strings giving causes of failure."""
causes = []
# Check the output
if not self.__CompareText(out, self.ref):
result[self.result_key] = result.Quote(self.ref)
return causes
def __CompareText(self, s1, s2):
"""Compare 's1' and 's2', ignoring line endings.
's1' -- A string.
's2' -- A string.
returns -- True if 's1' and 's2' are the same, ignoring
differences in line endings."""
if ROOT6WorkAroundEnabled("ReadRootmapCheck"):
# FIXME: (MCl) Hide warnings from new rootmap sanity check until we
# can fix them
r"Warning in <TInterpreter::ReadRootmapFile>: .* is already in .*"
return list(filter(keep_line, s1.splitlines())) == list(
filter(keep_line, s2.splitlines())
)
else:
return s1.splitlines() == s2.splitlines()
# ------------------------ Preprocessor elements ------------------------#
"""Base class for a callable that takes a file and returns a modified
version of it."""
def __processLine__(self, line):
return line
def __processFile__(self, lines):
output = []
for l in lines:
l = self.__processLine__(l)
return output
lines = input
mergeback = False
else:
lines = input.splitlines()
mergeback = True
output = self.__processFile__(lines)
return FilePreprocessorSequence([self, rhs])
class FilePreprocessorSequence(FilePreprocessor):
def __add__(self, rhs):
return FilePreprocessorSequence(self.members + [rhs])
def __call__(self, input):
output = input
for pp in self.members:
output = pp(output)
return output
class LineSkipper(FilePreprocessor):
def __processLine__(self, line):
for s in self.strings:
class BlockSkipper(FilePreprocessor):
def __init__(self, start, end):
self.start = start
self.end = end
self._skipping = False
def __processLine__(self, line):
if self.start in line:
self._skipping = True
return None
elif self.end in line:
self._skipping = False
elif self._skipping:
return None
return line
class RegexpReplacer(FilePreprocessor):
def __init__(self, orig, repl="", when=None):
if when:
when = re.compile(when)
self._operations = [(when, re.compile(orig), repl)]
def __add__(self, rhs):
if isinstance(rhs, RegexpReplacer):
res._operations = self._operations + rhs._operations
else:
res = FilePreprocessor.__add__(self, rhs)
return res
def __processLine__(self, line):
if w is None or w.search(line):
line = o.sub(r, line)
return line
maskPointers = RegexpReplacer("0x[0-9a-fA-F]{4,16}", "0x########")
normalizeDate = RegexpReplacer(
"[0-2]?[0-9]:[0-5][0-9]:[0-5][0-9] [0-9]{4}[-/][01][0-9][-/][0-3][0-9][ A-Z]*",
normalizeEOL = FilePreprocessor()
normalizeEOL.__processLine__ = lambda line: str(line).rstrip() + "\n"
skipEmptyLines = FilePreprocessor()
# FIXME: that's ugly
skipEmptyLines.__processLine__ = lambda line: (line.strip() and line) or None
# Special preprocessor sorting the list of strings (whitespace separated)
# that follow a signature on a single line
class LineSorter(FilePreprocessor):
def __init__(self, signature):
self.signature = signature
self.siglen = len(signature)
def __processLine__(self, line):
pos = line.find(self.signature)
line = line[: (pos + self.siglen)]
lst = line[(pos + self.siglen) :].split()
lst.sort()
line += " ".join(lst)
return line
class SortGroupOfLines(FilePreprocessor):
Sort group of lines matching a regular expression