Skip to content
Snippets Groups Projects
Verified Commit 9ec12e3b authored by Tadej Novak's avatar Tadej Novak
Browse files

Initial setup of new workflow tests for Run 2

parent e199ac64
No related merge requests found
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
atlas_subdir( WorkflowTestRunner )
# Install files from the package:
atlas_install_python_modules( python/*.py POST_BUILD_CMD ${ATLAS_FLAKE8} )
atlas_install_scripts( scripts/*.py POST_BUILD_CMD ${ATLAS_FLAKE8} )
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from pathlib import Path
import subprocess
from .Helpers import warnings_count
from .Inputs import references_CVMFS_path, references_EOS_path
from .References import references_map
from .Test import TestSetup, WorkflowCheck, WorkflowTest
class FailedOrPassedCheck(WorkflowCheck):
"""Was the q test successful? To check simply count the number of lines containing the string "successful run"."""
def run(self, test: WorkflowTest) -> bool:
self.logger.info("-----------------------------------------------------" )
result = True
for step in test.steps:
log = test.validation_path / f"log.{step}"
counter = 0
with log.open() as file:
for line in file:
if '"successful run"' in line:
counter += 1
if counter:
self.logger.info(f"{step} Validation test successful")
else :
self.logger.error(f"{step} Validation test failed")
result = False
if self.setup.validation_only:
continue # Skip checking reference test because in this mode the clean tests have not been run
log = test.reference_path / f"log.{step}"
counter = 0
with log.open() as file:
for line in file:
if '"successful run"' in line:
counter += 1
if counter:
self.logger.info(f"{step} Reference test successful")
else :
self.logger.error(f"{step} Reference test failed")
result = False
if result:
self.logger.info(f"All {test.ID} athena steps completed successfully\n")
else :
self.logger.error(f"One or more {test.ID} Athena steps failed. Please investigate the cause.\n")
return result
class FrozenTier0PolicyCheck(WorkflowCheck):
"""Run Frozen Tier0 Policy Test."""
def __init__(self, setup: TestSetup, input_format: str, max_events: int) -> None:
super().__init__(setup)
self.format = input_format
self.max_events = str(max_events)
def run(self, test: WorkflowTest) -> bool:
self.logger.info("---------------------------------------------------------------------------------------" )
self.logger.info(f"Running {test.ID} Frozen Tier0 Policy Test on {self.format} for {self.max_events} events" )
reference_path: Path = test.reference_path
diff_rules_file: Path = self.setup.diff_rules_path
# Read references from EOS/CVMFS
if self.setup.validation_only:
# Resolve the subfolder first. Results are stored like: main_folder/q-test/branch/version/.
# This should work both in standalone and CI
# Use EOS if mounted, otherwise CVMFS
reference_revision = references_map[f"{test.ID}-{self.setup.release_ID}"]
eos_path = Path(references_EOS_path)
reference_path = eos_path / test.ID / self.setup.release_ID / reference_revision
diff_rules_file = eos_path / test.ID / self.setup.release_ID
if reference_path.exists():
self.logger.info("EOS is mounted, going to read the reference files from there instead of CVMFS")
else:
self.logger.info("EOS is not mounted, going to read the reference files from CVMFS")
cvmfs_path = Path(references_CVMFS_path)
reference_path = cvmfs_path / test.ID / self.setup.release_ID / reference_revision
diff_rules_file = cvmfs_path / test.ID / self.setup.release_ID
diff_rules_file /= f"{test.ID}_{self.format}_diff-exclusion-list.txt"
self.logger.info(f"Reading the reference file from location {reference_path}")
if diff_rules_file.exists():
self.logger.info(f"Reading the diff rules file from location {diff_rules_file}")
exclusion_list = []
with diff_rules_file.open() as f:
for line in f:
exclusion_list.append(r"'{}'".format(line.rstrip()))
else:
self.logger.info("No diff rules file exists, using the default list")
exclusion_list = [r"'index_ref'", r"'(.*)_timings\.(.*)'", r"'(.*)_mems\.(.*)'", r"'(.*)TrigCostContainer(.*)'"]
file_name = f"my{self.format}.pool.root"
reference_file = reference_path / file_name
validation_file = test.validation_path / file_name
log_file = test.validation_path / f"diff-root-{test.ID}.{self.format}.log"
exclusion_list = ' '.join(exclusion_list)
comparison_command = f"acmd.py diff-root {reference_file} {validation_file} --nan-equal --error-mode resilient --ignore-leaves {exclusion_list} --entries {self.max_events} > {log_file} 2>&1"
output, error = subprocess.Popen(['/bin/bash', '-c', comparison_command], stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
output, error = output.decode('utf-8'), error.decode('utf-8')
# We want to catch/print both container additions/subtractions as well as
# changes in these containers. `allGood_return_code` is meant to catch
# other issues found in the diff (not expected, but just to be safe)
passed_frozen_tier0_test = True
all_good = False
with log_file.open() as file:
for line in file:
if "WARNING" in line: # Catches container addition/subtractions
self.logger.error(line)
passed_frozen_tier0_test = False
if "leaves differ" in line: # Catches changes in branches
self.logger.error(line)
passed_frozen_tier0_test = False
if "INFO all good." in line:
all_good = True
result = passed_frozen_tier0_test and all_good
if result:
self.logger.info("Passed!\n")
else:
self.logger.error(f"Your tag breaks the frozen tier0 policy in test {test.ID}. See {log_file} file for more information.\n")
return result
class SimpleCheck(WorkflowCheck):
"""Run A Very Simple Test."""
def __init__(self, setup: TestSetup, name: str, quantity: str, unit: str, field: int, threshold: float):
super().__init__(setup)
self.name = name
self.quantity = quantity
self.unit = unit
self.field = field
self.threshold = threshold
def run(self, test: WorkflowTest) -> bool:
self.logger.info("-----------------------------------------------------")
self.logger.info(f"Running {test.ID} {self.name} Test" )
result = True
for step in test.steps:
log_name = f"log.{step}"
reference_log = test.reference_path / log_name
validation_log = test.validation_path / log_name
reference_value = 0
with reference_log.open() as file:
found = False
for line in file:
if self.quantity in line:
reference_value = float(line.split()[self.field])
found = True
break
if not found:
self.logger.error(f"No data available in {reference_log}. Job failed.")
return False
validation_value = 0
with validation_log.open() as file:
found = False
for line in file:
if self.quantity in line:
validation_value = float(line.split()[self.field])
found = True
break
if not found:
self.logger.error(f"No data available in {validation_log}. Job failed.")
return False
if reference_value != 0:
factor = validation_value / reference_value
# Error if the factor increases (very bad things)
# Warning if the factor decreases (should be an understood feature)
if factor > 1. + self.threshold:
self.logger.error(f"{self.quantity} in the {step} step with(out) your change is {validation_value} ({reference_value}) {self.unit}")
self.logger.error(f"Your change changes {self.quantity} by a factor {factor}")
self.logger.error("Is this an expected outcome of your change(s)?")
result = False
self.logger.error(f"{step}: {self.name}")
self.logger.error(f"ref {reference_value} {self.unit}")
self.logger.error(f"val {validation_value} {self.unit}")
if factor < 1. - self.threshold:
self.logger.warning(f"{self.quantity} in the {step} step with(out) your change is {validation_value} ({reference_value}) {self.unit}")
self.logger.warning(f"Your change changes {self.quantity} by a factor {factor}")
self.logger.warning("Is this an expected outcome of your change(s)?")
result = True
self.logger.warning(f"{step}: {self.name}")
self.logger.warning(f"ref {reference_value} {self.unit}")
self.logger.warning(f"val {validation_value} {self.unit}")
if result:
self.logger.info("Passed!\n")
else :
self.logger.error("Failed!\n")
return result
class WarningsCheck(WorkflowCheck):
"""Run WARNINGS test."""
def run(self, test: WorkflowTest):
self.logger.info("-----------------------------------------------------")
self.logger.info(f"Running {test.ID} WARNINGS Test\n")
result = True
for step in test.steps:
log_name = f"log.{step}"
reference_log = test.reference_path / log_name
validation_log = test.validation_path / log_name
warnings_reference = warnings_count(reference_log)
warnings_validation = warnings_count (validation_log)
wr=[]
for w in warnings_reference:
wr.append(w[9:])
wv=[]
for w in warnings_validation:
wv.append(w[9:])
wn = list(set(wv)-set(wr))
wo = list(set(wr)-set(wv))
if len(warnings_validation) > len(warnings_reference):
self.logger.error(f"Validation log file {validation_log} has {len(warnings_validation) - len(warnings_reference)} more warning(s) than the reference log file {reference_log}")
self.logger.error("Please remove the new warning message(s):")
for w in wn:
self.logger.error(w)
result = False
elif len(warnings_validation) < len(warnings_reference):
self.logger.info(f"Validation log file {validation_log} has {len(warnings_reference) - len(warnings_validation)} less warnings than the reference log file {reference_log}")
self.logger.info("The reduction of unnecessary WARNINGs is much appreciated. Is it expected?")
self.logger.info("The following warning messages have been removed:")
for w in wo:
self.logger.info(w)
result = True
else :
self.logger.info(f"Validation log file {validation_log} has the same number of warnings as the reference log file {reference_log}")
result = True
if result:
self.logger.info("Passed!\n")
else :
self.logger.error("Failed!\n")
return result
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from glob import glob
from logging import Logger
from os import environ, path
from pathlib import Path
from typing import List
def get_pwd() -> Path:
return Path.cwd()
def get_release_setup(logger: Logger, no_setup=False) -> str:
"""Get release setup."""
if no_setup:
logger.info("No release information is available when a release is not set-up.\n")
return ""
current_nightly = environ["AtlasBuildStamp"]
release_base = environ["AtlasBuildBranch"]
release_head = environ["AtlasVersion"]
platform = environ["LCG_PLATFORM"]
project = environ["AtlasProject"]
builds_dir_search_str = f"/cvmfs/atlas-nightlies.cern.ch/repo/sw/{release_base}_{project}_{platform}/[!latest_]*/{project}/{release_head}"
# finds all directories matching above search pattern, and sorts by modification time
# suggest to use latest opt over dbg
sorted_list = sorted(glob(builds_dir_search_str), key=path.getmtime)
latest_nightly = ""
for folder in reversed(sorted_list):
if not glob(f"{folder}/../../{release_base}__{project}*-opt*.log"):
continue
latest_nightly = folder.split("/")[-3]
break
if current_nightly != latest_nightly:
logger.info(f"Please be aware that you are not testing your tags in the latest available nightly, which is {latest_nightly}")
setup = "%s,%s,%s,Athena" % (release_base, platform.replace("-", ","), current_nightly)
logger.info(f"Your tags will be tested in environment {setup}")
return setup
def list_changed_packages(logger: Logger, no_setup=False) -> None:
"""List packages that have changed."""
if no_setup:
logger.info("The list of changed packages is not available when the relase is not set-up.\n")
return
if "WorkDir_DIR" in environ:
logger.info("Changed packages in your build to be tested:\n")
file_path = Path(environ["WorkDir_DIR"])
fname = file_path / "packages.txt"
with fname.open() as fp:
lines = fp.readlines()
last_line = lines[-1].strip() if lines else None
for line in lines:
line = line.strip()
if "#" not in line:
if line == last_line:
logger.info(f"{line}\n")
else:
logger.info(line)
else:
logger.warning("A release area with locally installed packages has not been setup.")
logger.warning("quit by executing <CONTROL-C> if this is not your intention, and")
logger.warning("source <YOUR_CMake_BUILD_AREA>/setup.sh")
logger.warning("to pickup locally built packages in your environment.\n")
pass
def warnings_count(file_name: Path) -> List[str]:
"""Run a WARNING helper function."""
warnings = []
with file_name.open() as file:
for line in file:
if "WARNING" in line:
if "| WARNING |" not in line:
warnings.append(line)
return warnings
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
references_CVMFS_path = "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/WorkflowReferences"
references_EOS_path = "/eos/atlas/atlascerngroupdisk/data-art/grid-input/WorkflowReferences"
#####
# CI special input files
#####
from .Test import WorkflowRun
# common
input_HITS = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.410470.PhPy8EG_A14_ttbar_hdamp258p75_nonallhad.simul.HITS.e6337_s3681/HITS.25836812._004813.pool.root.1",
}
input_HITS_minbias_low = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.900311.Epos_minbias_inelastic_lowjetphoton.simul.HITS_FILT.e8341_s3687_s3704/*",
}
input_HITS_minbias_high = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.800831.Py8EG_minbias_inelastic_highjetphotonlepton.simul.HITS_FILT.e8341_s3687_s3704/*",
}
input_HITS_neutrino = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayTests/mc16_13TeV.900149.PG_single_nu_Pt50.simul.HITS.e8307_s3482/HITS.24078104._234467.pool.root.1",
}
# simulation
input_EVNT = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/SimCoreTests/valid1.410000.PowhegPythiaEvtGen_P2012_ttbar_hdamp172p5_nonallhad.evgen.EVNT.e4993.EVNT.08166201._000012.pool.root.1",
WorkflowRun.Run3: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/SimCoreTests/valid1.410000.PowhegPythiaEvtGen_P2012_ttbar_hdamp172p5_nonallhad.evgen.EVNT.e4993.EVNT.08166201._000012.pool.root.1",
WorkflowRun.Run4: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/PhaseIIUpgrade/EVNT/mc15_14TeV.422036.ParticleGun_single_mu_Pt100.evgen.EVNT.e5286/EVNT.09244578._000001.pool.root.1",
}
# overlay
input_HITS_MC_overlay = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayMonitoringRTT/mc16_13TeV.424000.ParticleGun_single_mu_Pt100.simul.HITS.e3580_s3126/HITS.11330296._000376.pool.root.1",
}
input_RDO_BKG = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayTests/PresampledPileUp/22.0/Run2/large/mc20_13TeV.900149.PG_single_nu_Pt50.digit.RDO.e8307_s3482_s3136_d1715/RDO.26811908._031801.pool.root.1",
}
input_HITS_data_overlay = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayMonitoringRTT/mc16_13TeV.361107.PowhegPythia8EvtGen_AZNLOCTEQ6L1_Zmumu.OverlaySim/22.0/v1/HITS.pool.root",
}
input_BS_SKIM = {
WorkflowRun.Run2: "/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayMonitoringRTT/mc15_valid.00200010.overlay_streamsAll_2016_pp_1.skim.DRAW.r8381/DRAW.09331084._000146.pool.root.1",
}
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
#####
# CI Reference Files Map
#####
# The top-level directory for the files is /eos/atlas/atlascerngroupdisk/data-art/grid-input/WorkflowReferences/
# Then the subfolders follow the format test/branch/version, i.e. for q221 in 21.0 the reference files are under
# /eos/atlas/atlascerngroupdisk/data-art/grid-input/WorkflowReferences/q221/21.0/v1 for v1 version
# Format is "test-branch" : "version"
references_map = {
# qTestsTier0_required-test
'q221-21.0': 'v4',
'q431-21.0': 'v2',
'q221-21.3': 'v1',
'q431-21.3': 'v1',
'q221-22.0': 'v1',
'q431-22.0': 'v1',
# SimulationTier0Test_required-test
's3126-21.0': 'v1',
's3126-21.3': 'v1',
's3126-21.9': 'v1',
's3126-22.0': 'v8',
's3505-21.0': 'v2',
's3505-21.3': 'v1',
's3505-21.9': 'v1',
's3505-22.0': 'v13',
# OverlayTier0Test_required-test
'overlay-d1498-21.0': 'v2',
'overlay-d1498-22.0': 'v38',
'overlay-d1592-22.0': 'v14',
'overlay-bkg-21.0': 'v1',
'overlay-bkg-22.0': 'v4',
'dataoverlay-d1590-22.0': 'v14',
'dataoverlay-hits-22.0': 'v1',
}
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
import logging
import threading
from argparse import ArgumentParser, Namespace
from os import environ
from pathlib import Path
from sys import exit
from typing import List
from .Checks import FailedOrPassedCheck, SimpleCheck, WarningsCheck
from .Inputs import references_CVMFS_path
from .Test import TestSetup, WorkflowCheck, WorkflowTest
def setup_logger(name: str) -> logging.Logger:
# Setup global logging
logging.basicConfig(level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%m-%d %H:%M",
filename=f"./{name}.log",
filemode="w")
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter("%(levelname)-8s %(message)s")
console.setFormatter(formatter)
logger = logging.getLogger("")
logger.addHandler(console)
return logger
def setup_parser() -> ArgumentParser:
parser = ArgumentParser()
common = parser.add_argument_group("common")
common.add_argument("-e", "--extra", type=str, dest="extra_args", default="",
help="Define additional args to pass e.g. --preExec 'r2e':'...' ")
common.add_argument("-f", "--fast", action="store_true", dest="fast_mode", default=False,
help="""Fast option will run all q tests simultaneously,
such that it will run faster if you have 4 cpu core slots on which to run. Be
warned! Only recommended when running on a high performance machine, not
lxplus!""")
common.add_argument("-v", "--validation", action="store_true", dest="validation_only", default=False,
help=f"""Run validation only.
File output comparisons will only be performed against pre-defined
reference files stored in the directory
{references_CVMFS_path}
and performance comparison tests will not be run.""")
common.add_argument("-c", "--check-only", type=str, dest="unique_ID", default=None,
help="Re-run only the checks.")
advanced = parser.add_argument_group("advanced")
advanced.add_argument("--CI", action="store_true", dest="ci_mode", default=False,
help="Will not setup Athena - only for CI tests!")
advanced.add_argument("--ref", type=str, dest="reference_release", default=None,
help="Define a particular reference release.")
advanced.add_argument("--val", type=str, dest="validation_release", default=None,
help="Define a particular validation release")
advanced.add_argument("--reference-path", type=str, dest="reference_run_path", default="",
help="Specify the head directory for running the reference tests. The default is /tmp/${USER}")
advanced.add_argument("-z", "--exclusion-lists", type=str, dest="diff_rules_path", default=".",
help="""Specify the directory that contains the lists of variables that will be omitted
while comparing the outputs. The default is ./ and the format of the files is
${q-test}_${format}_diff-exclusion-list.txt, e.g. q431_AOD_diff-exclusion-list.txt.""")
tests = parser.add_argument_group("tests")
tests.add_argument("-t", "--test", type=str, dest="test", default=None,
help="Specify a test to run. Supported options are: ")
tests.add_argument("-a", "--tag", type=str, dest="ami_tag", default=None,
help="Override the AMI tag of the test.")
# shortcuts
tests.add_argument("-s", "--sim", action="store_true", dest="simulation", default=False,
help="Run simulation test using Sim_tf.py")
tests.add_argument("-o", "--overlay", action="store_true", dest="overlay", default=False,
help="Run overlay test using Overlay_tf.py")
tests.add_argument("-p", "--pileup", action="store_true", dest="pileup", default=False,
help="Run MC reconstruction chain with pile-up")
return parser
def get_test_setup(name: str, options: Namespace, log: logging.Logger) -> TestSetup:
# define test setup
setup = TestSetup(log)
setup.reference_run_path = Path(options.reference_run_path) if options.reference_run_path else Path(f"/tmp/{environ['USER']}")
setup.diff_rules_path = Path(options.diff_rules_path)
setup.disable_release_setup = options.ci_mode
setup.validation_only = options.validation_only
if options.unique_ID:
setup.checks_only = True
setup.unique_ID = options.unique_ID
setup.parallel_execution = options.fast_mode
# not in global setup:
# options.extra_args
if options.ami_tag:
log.error("Custom AMI tags not supported yet!")
exit(1)
# Are we running in CI
if setup.disable_release_setup:
log.info("You're running in CI mode.")
log.info("This mode assumes athena is setup w/ necessary changes and only runs validation tests.")
log.info("Then results are checked against reference files and no performance test is run.")
log.info("If you don't know what this mode does, you shouldn't be using it.\n")
setup.validation_only = True
# Does the clean run head directory exist?
if setup.validation_only:
log.info("You are running in validation-only mode whereby only tests against your build are being run.")
log.info("In this mode ESD and AOD outputs are compared with pre-defined reference files found in the directory")
log.info(f"{references_CVMFS_path}\n")
if not Path(references_CVMFS_path).exists():
log.error(f"Exit. Validation-only mode can only be run on nodes with access to {references_CVMFS_path}")
exit(2)
elif setup.reference_run_path.exists():
log.info(f"The job unique ID is '{setup.unique_ID}' (can be used to re-run the checks)\n")
else:
log.error("Exit. Please specify a directory that exists for the argument of the --reference-path option\n")
log.error(f"{name}.py --reference-path <ExistingDirectory>")
exit(1)
# Is an ATLAS release setup?
if 'AtlasPatchVersion' not in environ and 'AtlasArea' not in environ and 'AtlasBaseDir' not in environ and 'AtlasVersion' not in environ:
log.error("Exit. Please setup the an ATLAS release")
exit(3)
if 'AtlasPatchVersion' not in environ and 'AtlasArea' not in environ and 'AtlasBaseDir' in environ and 'AtlasVersion' not in environ:
log.warning("Please be aware that you are running a release which seems to not be a Tier0 release, where in general q-tests are not guaranteed to work.")
# setup reference path
setup.reference_run_path /= f"reference_test_{setup.unique_ID}"
# Release setup & list the packages in the local InstallArea
setup.setup_release(options.reference_release, options.validation_release)
# Parse test string if needed
parse_test_string(setup, options)
return setup
def parse_test_string(setup: TestSetup, options: Namespace) -> None:
if not options.test:
return
test_string = options.test.lower()
# simulation
if test_string in ['s', 'sim', 'simulation', 'Sim_tf', 'Sim_tf.py']:
options.simulation = True
return
# overlay
if test_string in ['o', 'overlay', 'Overlay_tf', 'Overlay_tf.py']:
options.overlay = True
return
# pile-up
if test_string in ['p', 'pileup', 'pile-up']:
options.pileup = True
return
# reco
if test_string in ['r', 'reco', 'reconstruction', 'Reco_tf', 'Reco_tf.py']:
return
def get_standard_performance_checks(setup: TestSetup) -> List[WorkflowCheck]:
return [
SimpleCheck(setup, "CPU Time" , "evtloop_time", "msec/event", 4, 0.4),
SimpleCheck(setup, "Physical Memory", "VmRSS", "kBytes", 4, 0.2),
SimpleCheck(setup, "Virtual Memory" , "VmSize", "kBytes", 4, 0.2),
SimpleCheck(setup, "Memory Leak" , "leakperevt_evt11", "kBytes/event", 7, 0.05),
WarningsCheck(setup),
]
def run_tests(setup: TestSetup, tests: List[WorkflowTest]) -> None:
if not setup.checks_only:
threads = {}
setup.logger.info("------------------ Run Athena workflow test jobs---------------")
if setup.parallel_execution:
for test in tests:
threads[f"{test.ID}_reference"] = threading.Thread(target=lambda test=test: test.run_reference())
threads[f"{test.ID}_validation"] = threading.Thread(target=lambda test=test: test.run_validation())
threads[f"{test.ID}_reference"].start()
threads[f"{test.ID}_validation"].start()
for thread in threads:
threads[thread].join()
elif setup.validation_only:
for test in tests:
threads[f"{test.ID}_validation"] = threading.Thread(target=lambda test=test: test.run_validation())
threads[f"{test.ID}_validation"].start()
for thread in threads:
threads[thread].join()
else:
for test in tests:
threads[f"{test.ID}_reference"] = threading.Thread(target=lambda test=test: test.run_reference())
threads[f"{test.ID}_validation"] = threading.Thread(target=lambda test=test: test.run_validation())
threads[f"{test.ID}_reference"].start()
threads[f"{test.ID}_validation"].start()
threads[f"{test.ID}_reference"].join()
threads[f"{test.ID}_validation"].join()
def run_checks(setup: TestSetup, tests: List[WorkflowTest], performance_checks: List[WorkflowCheck]) -> bool:
all_passed = True
# define common checks
main_check = FailedOrPassedCheck(setup)
# run checks
for test in tests:
all_passed = all_passed and test.run_checks(main_check, performance_checks)
return all_passed
def run_summary(setup: TestSetup, tests: List[WorkflowTest], status: bool) -> None:
setup.logger.info("-----------------------------------------------------")
setup.logger.info("---------------------- Summary ----------------------")
if status:
setup.logger.info("ALL TESTS: PASSED (0)")
else:
setup.logger.error("ALL TESTS: FAILED (10)")
exit(10)
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from typing import List
from .Checks import FrozenTier0PolicyCheck
from .Inputs import input_EVNT, input_HITS, \
input_HITS_MC_overlay, input_RDO_BKG, \
input_HITS_data_overlay, input_BS_SKIM, \
input_HITS_minbias_low, input_HITS_minbias_high, input_HITS_neutrino
from .Test import TestSetup, WorkflowRun, WorkflowTest, WorkflowType
class QTest(WorkflowTest):
"""General workflow q-test."""
def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = '') -> None:
if "maxEvents" not in extra_args:
if type == WorkflowType.MCPileUpReco:
extra_args += " --maxEvents 5"
else:
extra_args += " --maxEvents 20"
if "input" not in extra_args and type == WorkflowType.MCPileUpReco:
extra_args += f" --inputHITSFile {input_HITS[run]} --inputRDO_BKGFile ../run_d*/myRDO.pool.root"
self.command = \
(f"ATHENA_CORE_NUMBER=1 Reco_tf.py --multithreaded --AMIConfig {ID}"
f" --imf False {extra_args}")
self.output_checks = []
# TODO: disable RDO comparison for now
# if type == WorkflowType.MCReco:
# self.output_checks.append(FrozenTier0PolicyCheck(setup, "RDO", 10))
self.output_checks.append(FrozenTier0PolicyCheck(setup, "ESD", 10))
self.output_checks.append(FrozenTier0PolicyCheck(setup, "AOD", 20))
super().__init__(ID, run, type, steps, setup)
class SimulationTest(WorkflowTest):
"""Simulation workflow test."""
def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = '') -> None:
if "maxEvents" not in extra_args:
extra_args += " --maxEvents 20"
self.command = \
(f"Sim_tf.py --AMIConfig {ID}"
f" --inputEVNTFile {input_EVNT[run]} --outputHITSFile myHITS.pool.root"
f" --imf False {extra_args}")
self.output_checks = [
FrozenTier0PolicyCheck(setup, "HITS", 10)
]
super().__init__(ID, run, type, steps, setup)
class OverlayTest(WorkflowTest):
"""MC overlay workflow test."""
def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = '') -> None:
if "maxEvents" not in extra_args:
extra_args += " --maxEvents 10"
self.command = \
(f"Overlay_tf.py --AMIConfig {ID}"
f" --inputHITSFile {input_HITS_MC_overlay[run]} --inputRDO_BKGFile {input_RDO_BKG[run]} --outputRDOFile myRDO.pool.root"
f" --imf False --athenaopts=\"--pmon=sdmonfp\" {extra_args}")
# skip performance checks for now due to CA
self.skip_performance_checks = True
self.output_checks = [
FrozenTier0PolicyCheck(setup, "RDO", 10)
]
super().__init__(ID, run, type, steps, setup)
class DataOverlayTest(WorkflowTest):
"""Data overlay workflow test."""
def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = '') -> None:
if "maxEvents" not in extra_args:
extra_args += " --maxEvents 10"
self.command = \
(f"Overlay_tf.py --AMIConfig {ID}"
f" --inputHITSFile {input_HITS_data_overlay[run]} --inputBS_SKIMFile {input_BS_SKIM[run]} --outputRDOFile myRDO.pool.root"
f" --imf False --athenaopts=\"--pmon=sdmonfp\" {extra_args}")
self.output_checks = [
FrozenTier0PolicyCheck(setup, "RDO", 10)
]
super().__init__(ID, run, type, steps, setup)
class PileUpTest(WorkflowTest):
"""Digitization with pile-up workflow test."""
def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup, extra_args: str = '') -> None:
if "maxEvents" not in extra_args:
extra_args += " --maxEvents 5"
self.command = \
(f"Digi_tf.py --AMIConfig {ID} --jobNumber 1 --digiSeedOffset1 1 --digiSeedOffset2 1"
f" --inputHITSFile {input_HITS_neutrino[run]} --inputHighPtMinbiasHitsFile {input_HITS_minbias_high[run]} --inputLowPtMinbiasHitsFile {input_HITS_minbias_low[run]} --outputRDOFile myRDO.pool.root"
f" --imf False --athenaopts=\"--pmon=sdmonfp\" {extra_args}")
self.output_checks = [
FrozenTier0PolicyCheck(setup, "RDO", 5)
]
super().__init__(ID, run, type, steps, setup)
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from enum import Enum
from logging import Logger
from os import environ
from pathlib import Path
from typing import List
from uuid import uuid4
import subprocess
from .Helpers import get_pwd, get_release_setup, list_changed_packages
class TestSetup:
"""Test setup."""
def __init__(self, logger: Logger) -> None:
self.logger = logger
self.pwd = get_pwd()
self.reference_run_path = Path("/tmp")
self.diff_rules_path = Path()
self.unique_ID = str(uuid4())
self.disable_release_setup = False
self.validation_only = False
self.checks_only = False
self.release_reference = ""
self.release_validation = ""
self.release_ID = environ['AtlasVersion'][0:4]
self.parallel_execution = False
def setup_release(self, reference=None, validation=None) -> None:
if reference and validation:
self.release_reference = reference
self.release_validation = validation
self.logger.info(f"WARNING: You have specified a dedicated release as reference {reference} and as validation {validation} release.")
self.logger.info("Your local setup area will not be considered!!!")
self.logger.info("this option is mainly designed for comparing release versions!!")
else:
self.release_reference = get_release_setup(self.logger, self.disable_release_setup)
self.release_validation = self.release_reference
try:
list_changed_packages(self.logger, self.disable_release_setup)
except Exception:
self.logger.warning("Cannot list changed packages...\n")
class WorkflowRun(Enum):
Run2 = 'Run2'
Run3 = 'Run3'
Run4 = 'Run4'
class WorkflowType(Enum):
FullSim = 'FullSim'
AF3 = 'AF3'
Overlay = 'Overlay'
MCReco = 'MCReco'
MCPileUpReco = 'MCPileUpReco'
DataReco = 'DataReco'
PileUpPresampling = 'PileUpPresampling'
class WorkflowCheck:
"""Workflow check base class."""
def __init__(self, setup: TestSetup) -> None:
self.setup = setup
self.logger = setup.logger
class WorkflowTest:
"""Workflow test base class."""
def __init__(self, ID: str, run: WorkflowRun, type: WorkflowType, steps: List[str], setup: TestSetup) -> None:
if not hasattr(self, "ID"):
self.ID = ID
if not hasattr(self, "tag"):
self.tag = ID
if not hasattr(self, "steps"):
self.steps = steps
if not self.command:
raise NotImplementedError("Command needs to be defined")
if not hasattr(self, "output_checks"):
self.output_checks = []
if not hasattr(self, "skip_performance_checks"):
self.skip_performance_checks = False
self.run = run
self.type = type
self.setup = setup
self.logger = setup.logger
self.validation_path: Path = Path(f"run_{self.ID}")
self.reference_path: Path = self.setup.reference_run_path / self.validation_path
def run_reference(self) -> None:
self.logger.info(f"Running reference in rel {self.setup.release_reference}")
self.logger.info(f"\"{self.command}\"")
self.reference_path.mkdir(parents=True, exist_ok=True)
cmd = (f"cd {self.reference_path};"
f"source $AtlasSetup/scripts/asetup.sh {self.setup.release_reference} >& /dev/null;")
cmd += f"{self.command} > {self.ID}.log 2>&1"
subprocess.call(cmd, shell=True)
self.logger.info(f"Finished clean in rel {self.setup.release_reference}")
self.logger.info(f"\"{self.command}\"")
def run_validation(self) -> None:
self.logger.info(f"Running validation in rel {self.setup.release_validation}")
self.logger.info(f"\"{self.command}\"")
self.validation_path.mkdir(parents=True, exist_ok=True)
cmd = f"cd {self.setup.pwd};"
if self.setup.disable_release_setup:
pass
elif "WorkDir_DIR" in environ:
cmake_build_dir = environ["WorkDir_DIR"]
cmd += (f"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
f"source {cmake_build_dir}/setup.sh;")
else:
cmd += f"source $AtlasSetup/scripts/asetup.sh {self.setup.release_validation} >& /dev/null;"
cmd += f"cd run_{self.ID};"
cmd += f"{self.command} > {self.ID}.log 2>&1"
subprocess.call(cmd, shell=True)
self.logger.info(f"Finished validation in rel {self.setup.release_validation}")
self.logger.info(f"\"{self.command}\"")
def run_checks(self, main_check: WorkflowCheck, performance_checks: List[WorkflowCheck]) -> bool:
self.logger.info("-----------------------------------------------------")
self.logger.info(f"----------- Post-processing of {self.ID} Test -----------")
result = True
# HAZ: Open question -- is there a cleaner way to do this?
# HAZ: adding a decorator to `logging` would be nicest (require 0 errors)...
if not main_check.run(self):
return False
# output checks
for check in self.output_checks:
result = result and check.run(self)
if self.setup.validation_only or self.skip_performance_checks:
return result # Performance checks against static references not possible
# performance checks
for check in performance_checks:
result = result and check.run(self)
return result
#!/usr/bin/env python
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from sys import exit
from WorkflowTestRunner.ScriptUtils import setup_logger, setup_parser, get_test_setup, get_standard_performance_checks, \
run_tests, run_checks, run_summary
from WorkflowTestRunner.StandardTests import QTest, SimulationTest, OverlayTest, DataOverlayTest, PileUpTest
from WorkflowTestRunner.Test import WorkflowRun, WorkflowType
def main():
name = "Run2Tests"
run = WorkflowRun.Run2
# Setup the environment
log = setup_logger(name)
parser = setup_parser()
options = parser.parse_args()
setup = get_test_setup(name, options, log)
# Define which tests to run
tests_to_run = []
if options.simulation:
tests_to_run.append(SimulationTest("s3759", run, WorkflowType.FullSim, ["EVNTtoHITS"], setup, options.extra_args))
elif options.overlay:
tests_to_run.append(OverlayTest("d1726", run, WorkflowType.Overlay, ["Overlay"], setup, options.extra_args))
tests_to_run.append(DataOverlayTest("d1590", run, WorkflowType.Overlay, ["Overlay"], setup, options.extra_args))
elif options.pileup:
if setup.parallel_execution:
log.error("Parallel execution not supported for pile-up workflow")
exit(1)
tests_to_run.append(PileUpTest("d1715", run, WorkflowType.PileUpPresampling, ["HITtoRDO"], setup, options.extra_args))
tests_to_run.append(QTest("q444", run, WorkflowType.MCPileUpReco, ["RAWtoESD", "ESDtoAOD"], setup, options.extra_args))
else:
tests_to_run.append(QTest("q443", run, WorkflowType.MCReco, ["HITtoRDO", "RAWtoESD", "ESDtoAOD"], setup, options.extra_args))
tests_to_run.append(QTest("q442", run, WorkflowType.DataReco, ["RAWtoALL"], setup, options.extra_args))
# Define which perfomance checks to run
performance_checks = get_standard_performance_checks(setup)
# Define and run jobs
run_tests(setup, tests_to_run)
# Run post-processing checks
all_passed = run_checks(setup, tests_to_run, performance_checks)
# final report
run_summary(setup, tests_to_run, all_passed)
if __name__ == "__main__":
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment