Skip to content
Snippets Groups Projects
Forked from atlas / athena
97256 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
CheckSteps.py 26.79 KiB
#
# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
#

'''
Definitions of post-exec check steps in Trigger ART tests
'''

import sys
import os
import re
import subprocess
import json
import six

from TrigValTools.TrigValSteering.Step import Step
from TrigValTools.TrigValSteering.Common import art_input_eos, art_input_cvmfs


class RefComparisonStep(Step):
    '''Base class for steps comparing a file to a reference'''

    def __init__(self, name):
        super(RefComparisonStep, self).__init__(name)
        self.reference = None
        self.input_file = None

    def configure(self, test):
        if self.reference is not None:
            # Do nothing if the reference exists
            if os.path.isfile(self.reference):
                return
            # Try to find the file in DATAPATH
            full_path = subprocess.check_output('find_data.py {}'.format(self.reference), shell=True).strip()
            if os.path.isfile(full_path):
                self.log.debug('%s using reference %s', self.name, full_path)
                self.reference = full_path
                return
            else:
                self.log.warning(
                    '%s failed to find reference %s - wrong path?',
                    self.name, self.reference)
                return

        if self.input_file is None:
            self.log.error('Cannot configure %s because input_file not specified',
                           self.name)
            self.report_result(1, 'TestConfig')
            sys.exit(1)

        branch = os.environ.get('AtlasBuildBranch')  # Available after asetup
        if branch is None:
            branch = os.environ.get('gitlabTargetBranch')  # Available in CI
        if branch is None:
            self.log.warning('Cannot determine the branch name, both variables '
                             'AtlasBuildBranch and gitlabTargetBranch are empty')
            branch = 'UNKNOWN_BRANCH'

        sub_path = '{}/ref/{}/test_{}/'.format(
            test.package_name, branch, test.name)
        ref_eos = art_input_eos + sub_path + self.input_file
        ref_cvmfs = art_input_cvmfs + sub_path + self.input_file
        if os.path.isfile(ref_eos):
            self.log.debug('%s using reference from EOS: %s',
                           self.name, ref_eos)
            self.reference = ref_eos
        elif os.path.isfile(ref_cvmfs):
            self.log.debug('%s using reference from CVMFS: %s',
                           self.name, ref_cvmfs)
            self.reference = ref_cvmfs
        else:
            self.log.warning('%s failed to find reference %s in %s or %s',
                             self.name, sub_path + self.input_file,
                             art_input_eos, art_input_cvmfs)
            self.reference = None

        super(RefComparisonStep, self).configure(test)


class InputDependentStep(Step):
    '''Base class for steps executed only if the input file exists'''

    def __init__(self, name=None):
        super(InputDependentStep, self).__init__(name)
        self.input_file = None

    def run(self, dry_run=False):
        if self.input_file is None:
            self.log.error('%s misconfiguration - no input file specified',
                           self.name)
            self.result = 1
            if self.auto_report_result:
                self.report_result()
            return self.result, '# (internal) {} -> failed'.format(self.name)

        if not os.path.isfile(self.input_file):
            self.log.debug('Skipping %s because %s does not exist',
                           self.name, self.input_file)
            self.result = 0
            return self.result, '# (internal) {} -> skipped'.format(self.name)

        return super(InputDependentStep, self).run(dry_run)


class LogMergeStep(Step):
    '''Merge several log files into one for post-processing'''

    def __init__(self, name='LogMerge'):
        super(LogMergeStep, self).__init__(name)
        self.log_files = None
        self.extra_log_regex = None
        self.merged_name = 'athena.merged.log'
        self.warn_if_missing = True

    def configure(self, test):
        if self.log_files is None:
            self.log_files = []
            for step in test.exec_steps:
                self.log_files.append(step.name)
        super(LogMergeStep, self).configure(test)

    def process_extra_regex(self):
        if self.extra_log_regex:
            files = os.listdir('.')
            r = re.compile(self.extra_log_regex)
            match_files = filter(r.match, files)
            for f in match_files:
                self.log_files.append(f)

    def merge_logs(self):
        encargs = {} if six.PY2 else {'encoding' : 'utf-8'}
        try:
            with open(self.merged_name, 'w', **encargs) as merged_file:
                for log_name in self.log_files:
                    if not os.path.isfile(log_name):
                        if self.warn_if_missing:
                            self.log.warning('Cannot open %s', log_name)
                            merged_file.write(
                                '### WARNING Missing {} ###\n'.format(log_name))
                        continue
                    with open(log_name, **encargs) as log_file:
                        merged_file.write('### {} ###\n'.format(log_name))
                        for line in log_file:
                            merged_file.write(line)
            return 0
        except OSError as e:
            self.log.error('%s merging failed due to OSError: %s',
                           self.name, e.strerror)
            return 1

    def run(self, dry_run=False):
        self.process_extra_regex()
        self.log.info('Running %s merging logs %s into %s',
                      self.name, self.log_files, self.merged_name)
        if dry_run:
            self.result = 0
        else:
            self.result = self.merge_logs()
        return self.result, '# (internal) {} in={} out={}'.format(self.name, self.log_files, self.merged_name)


class RootMergeStep(Step):
    '''
    Merge root files with hadd. Parameters are:
    input_file - file(s) to be merged
    merged_file - output file name
    rename_suffix - if merged_file exists, it is renamed by adding this suffix
    '''

    def __init__(self, name='RootMerge'):
        super(RootMergeStep, self).__init__(name)
        self.input_file = None
        self.merged_file = None
        self.rename_suffix = None
        self.executable = 'hadd'

    def configure(self, test=None):
        self.args += ' ' + self.merged_file + ' ' + self.input_file
        super(RootMergeStep, self).configure(test)

    def run(self, dry_run=False):
        if os.path.isfile(self.merged_file) and self.rename_suffix:
            old_name = os.path.splitext(self.merged_file)
            new_name = old_name[0] + self.rename_suffix + old_name[1]
            self.executable = 'mv {} {}; {}'.format(self.merged_file, new_name, self.executable)
        return super(RootMergeStep, self).run(dry_run)


class ZipStep(Step):
    '''Compress a large log file'''

    def __init__(self, name='Zip'):
        super(ZipStep, self).__init__(name)
        self.zip_output = None
        self.zip_input = None
        self.executable = 'tar'
        self.args = '-czf'
        self.output_stream = Step.OutputStream.STDOUT_ONLY

    def configure(self, test=None):
        self.args += ' '+self.zip_output+' '+self.zip_input
        # Remove the file after zipping
        self.args += ' && rm ' + self.zip_input
        super(ZipStep, self).configure(test)


class CheckLogStep(Step):
    '''Execute CheckLog looking for errors or warnings in a log file'''

    def __init__(self, name):
        super(CheckLogStep, self).__init__(name)
        self.executable = 'check_log.py'
        self.log_file = None
        self.check_errors = True
        self.check_warnings = False
        self.config_file = None
        self.args = '--showexcludestats'

    def configure(self, test):
        if self.config_file is None:
            if test.package_name == 'TrigUpgradeTest':
                self.config_file = 'checklogTrigUpgradeTest.conf'
            elif test.package_name == 'TrigP1Test':
                self.config_file = 'checklogTrigP1Test.conf'
            elif test.package_name == 'TrigValTools':
                self.config_file = 'checklogTrigValTools.conf'
            else:
                self.config_file = 'checklogTriggerTest.conf'
        if self.log_file is None:
            if len(test.exec_steps) == 1:
                self.log_file = test.exec_steps[0].name+'.log'
            else:
                self.log_file = 'athena.log'
        if self.check_errors:
            self.args += ' --errors'
        if self.check_warnings:
            self.args += ' --warnings'
        if self.check_errors and not self.check_warnings:
            self.output_stream = Step.OutputStream.FILE_AND_STDOUT
            self.auto_report_result = True
            self.required = True

        self.args += ' --config {} {}'.format(self.config_file, self.log_file)

        super(CheckLogStep, self).configure(test)


class RegTestStep(RefComparisonStep):
    '''Execute RegTest comparing a log file against a reference'''

    def __init__(self, name='RegTest'):
        super(RegTestStep, self).__init__(name)
        self.regex = 'REGTEST'
        self.executable = 'regtest.pl'
        self.input_base_name = 'athena'
        self.args += ' --linematch ".*"'
        self.auto_report_result = True
        self.output_stream = Step.OutputStream.FILE_AND_STDOUT

    def configure(self, test):
        self.input_file = self.input_base_name+'.regtest'
        RefComparisonStep.configure(self, test)
        self.args += ' --inputfile {} --reffile {}'.format(self.input_file, self.reference)
        Step.configure(self, test)

    def prepare_inputs(self):
        log_file = self.input_base_name+'.log'
        if not os.path.isfile(log_file):
            self.log.error('%s input file %s is missing', self.name, log_file)
            return False
        encargs = {} if six.PY2 else {'encoding' : 'utf-8'}
        with open(log_file, **encargs) as f_in:
            matches = re.findall('{}.*$'.format(self.regex),
                                 f_in.read(), re.MULTILINE)
            with open(self.input_file, 'w', **encargs) as f_out:
                for line in matches:
                    f_out.write(line+'\n')
        return True

    def rename_ref(self):
        try:
            if self.reference:
                new_name = os.path.basename(self.reference) + '.new'
            else:
                new_name = os.path.basename(self.input_file) + '.new'
            os.rename(self.input_file, new_name)
            self.log.debug('Renamed %s to %s', self.input_file, new_name)
        except OSError:
            self.log.warning('Failed to rename %s to %s',
                             self.input_file, new_name)

    def run(self, dry_run=False):
        if not dry_run and not self.prepare_inputs():
            self.log.error('%s failed in prepare_inputs()', self.name)
            self.result = 1
            if self.auto_report_result:
                self.report_result()
            return self.result, '# (internal) {} -> failed'.format(self.name)
        if self.reference is None:
            self.log.error('Missing reference for %s', self.name)
            if not dry_run:
                self.rename_ref()
            self.result = 999
            if self.auto_report_result:
                self.report_result()
            return self.result, '# (internal) {} -> failed'.format(self.name)
        retcode, cmd = super(RegTestStep, self).run(dry_run)
        if not dry_run:
            self.rename_ref()
        return retcode, cmd


class RootCompStep(RefComparisonStep):
    '''Execute RootComp comparing histograms against a reference'''

    def __init__(self, name='RootComp'):
        super(RootCompStep, self).__init__(name)
        self.input_file = 'expert-monitoring.root'
        self.executable = 'rootcomp.py'
        self.auto_report_result = True

    def configure(self, test):
        RefComparisonStep.configure(self, test)
        self.args += ' {} {}'.format(self.reference, self.input_file)
        Step.configure(self, test)

    def run(self, dry_run=False):
        if self.reference is None:
            if not os.path.isfile(self.input_file):
                self.log.debug(
                    'Skipping %s because both reference and input are missing',
                    self.name)
                return 0, '# (internal) {} -> skipped'.format(self.name)
            else:  # input exists but reference not
                self.log.error('Missing reference for %s', self.name)
                self.result = 999
                if self.auto_report_result:
                    self.report_result()
                return self.result, '# (internal) {} -> failed'.format(self.name)
        retcode, cmd = super(RootCompStep, self).run(dry_run)
        return retcode, cmd


class PerfMonStep(InputDependentStep):
    '''Execute the PerfMon ntuple post-processing'''

    def __init__(self, name='PerfMon'):
        super(PerfMonStep, self).__init__(name)
        self.input_file = None
        self.executable = 'perfmon.py'
        self.args = '-f 0.90'

    def configure(self, test):
        if not self.input_file:
            num_athenaHLT_steps = sum([1 for step in test.exec_steps if step.type == 'athenaHLT'])
            if num_athenaHLT_steps > 0:
                self.input_file = 'athenaHLT_workers/athenaHLT-01/ntuple.pmon.gz'
            else:
                self.input_file = 'ntuple.pmon.gz'
        self.args += ' '+self.input_file
        super(PerfMonStep, self).configure(test)


class TailStep(Step):
    '''Copy the last N lines of a log file into a separate file'''

    def __init__(self, name='Tail'):
        super(TailStep, self).__init__(name)
        self.log_file = 'athena.log'
        self.output_name = None
        self.executable = 'tail'
        self.num_lines = 5000
        self.output_stream = Step.OutputStream.STDOUT_ONLY

    def configure(self, test):
        if self.output_name is None:
            split = os.path.splitext(self.log_file)
            self.output_name = split[0]+'.tail'
            if len(split) > 1:
                self.output_name += split[1]
        self.args += ' -n {:d}'.format(self.num_lines)
        self.args += ' '+self.log_file
        self.args += ' >'+self.output_name
        super(TailStep, self).configure(test)


class HistCountStep(InputDependentStep):
    '''Execute histSizes.py to count histograms in a ROOT file'''

    def __init__(self, name='HistCount'):
        super(HistCountStep, self).__init__(name)
        self.input_file = 'expert-monitoring.root'
        self.executable = 'histSizes.py'
        self.args = '-t'

    def configure(self, test):
        self.args += ' '+self.input_file
        super(HistCountStep, self).configure(test)


class ChainDumpStep(InputDependentStep):
    '''
    Execute chainDump.py to print trigger counts from histograms to text files
    '''

    def __init__(self, name='ChainDump'):
        super(ChainDumpStep, self).__init__(name)
        self.input_file = 'expert-monitoring.root'
        self.executable = 'chainDump.py'
        self.args = '--json'

    def configure(self, test):
        self.args += ' -f '+self.input_file
        super(ChainDumpStep, self).configure(test)


class TrigTestJsonStep(Step):
    '''Execute trig-test-json.py to create extra-results.json file'''

    def __init__(self, name='TrigTestJson'):
        super(TrigTestJsonStep, self).__init__(name)
        self.executable = 'trig-test-json.py'


class CheckFileStep(InputDependentStep):
    '''
    Execute checkFile and checkxAOD for POOL files.
    executable and input_file can have multiple comma-separated values
    '''

    def __init__(self, name='CheckFile'):
        super(CheckFileStep, self).__init__(name)
        self.input_file = 'AOD.pool.root,ESD.pool.root,RDO_TRIG.pool.root'
        self.executable = 'checkFile.py,checkxAOD.py'
        self.__executables__ = None
        self.__input_files__ = None

    def configure(self, test):
        # Skip the check if all test steps are athenaHLT (no POOL files)
        test_types = [step.type for step in test.exec_steps]
        num_athenaHLT = sum(1 for tt in test_types if tt == 'athenaHLT')
        if num_athenaHLT == test_types:
            self.log.debug('%s will be skipped because all exec steps use athenaHLT')
            self.__executables__ = None
            self.__input_files__ = None
            return
        self.__executables__ = self.executable.split(',')
        self.__input_files__ = self.input_file.split(',')
        super(CheckFileStep, self).configure(test)

    def run(self, dry_run=False):
        ret_codes = []
        commands = []
        for f in self.__input_files__:
            for ex in self.__executables__:
                self.executable = ex
                self.input_file = f
                self.args = f
                ex_base = ex.split('.')[0:-1]
                self.log_file_name = f + '.' + ''.join(ex_base)
                ret, cmd = super(CheckFileStep, self).run(dry_run)
                ret_codes.append(ret)
                commands.append(cmd)

        # Merge executed commands for logging
        merged_cmd = ''
        if len(commands) == 1:
            merged_cmd = commands[0]
        else:
            for cmd in commands:
                if '(internal)' not in cmd:
                    merged_cmd += cmd+'; '
        if len(merged_cmd) == 0:
            merged_cmd = commands[-1]

        return max(ret_codes), merged_cmd


class ZeroCountsStep(Step):
    '''
    Check if all counts are zero.
    input_file can have multiple comma-separated values
    '''

    def __init__(self, name='ZeroCounts'):
        super(ZeroCountsStep, self).__init__(name)
        self.input_file = 'HLTChain.txt,HLTTE.txt,L1AV.txt'
        self.auto_report_result = True
        self.__input_files__ = None

    def configure(self, test=None):
        self.__input_files__ = self.input_file.split(',')

    def check_zero_counts(self, input_file):
        if not os.path.isfile(input_file):
            self.log.debug(
                'Skipping %s for %s because the file does not exist',
                self.name, input_file)
            return -1
        lines_checked = 0
        encargs = {} if six.PY2 else {'encoding' : 'utf-8'}
        with open(input_file, **encargs) as f_in:
            for line in f_in.readlines():
                split_line = line.split()
                lines_checked += 1
                if int(split_line[-1]) != 0:
                    return 0  # at least one non-zero count
        if lines_checked == 0:
            self.log.error('Failed to read counts from %s', input_file)
        return 1  # all counts are zero

    def run(self, dry_run=False):
        results = []
        for input_file in self.__input_files__:
            results.append(self.check_zero_counts(input_file))

        self.result = max(results)
        cmd = '# (internal) {} for {}'.format(self.name, self.__input_files__)
        if self.result < 0:
            cmd = '# (internal) {} -> skipped'.format(self.name)
            self.result = 0
            return self.result, cmd
        self.log.info('Running %s step', self.name)
        if self.auto_report_result:
            self.report_result()
        return self.result, cmd


class MessageCountStep(Step):
    '''Count messages printed inside event loop'''

    def __init__(self, name='MessageCount'):
        super(MessageCountStep, self).__init__(name)
        self.executable = 'messageCounter.py'
        self.log_regex = r'(athena\..*log$|athenaHLT:.*\.out$|^log\..*to.*)'
        self.start_pattern = r'(HltEventLoopMgr|AthenaHiveEventLoopMgr).*INFO Starting loop on events'
        self.end_pattern = r'(HltEventLoopMgr.*INFO All events processed|AthenaHiveEventLoopMgr.*INFO.*Loop Finished)'
        self.info_threshold = None
        self.debug_threshold = None
        self.verbose_threshold = None
        self.other_threshold = None
        self.auto_report_result = True

    def configure(self, test):
        self.args += ' -s "{:s}"'.format(self.start_pattern)
        self.args += ' -e "{:s}"'.format(self.end_pattern)
        if self.info_threshold is None:
            self.info_threshold = test.exec_steps[0].max_events
        if self.debug_threshold is None:
            self.debug_threshold = 0
        if self.verbose_threshold is None:
            self.verbose_threshold = 0
        if self.other_threshold is None:
            self.other_threshold = test.exec_steps[0].max_events
        super(MessageCountStep, self).configure(test)

    def run(self, dry_run=False):
        files = os.listdir('.')
        r = re.compile(self.log_regex)
        log_files = filter(r.match, files)
        self.args += ' ' + ' '.join(log_files)
        auto_report = self.auto_report_result
        self.auto_report_result = False
        ret, cmd = super(MessageCountStep, self).run(dry_run)
        self.auto_report_result = auto_report
        if ret != 0:
            self.log.error('%s failed')
            self.result = 1
            if self.auto_report_result:
                self.report_result()
            return self.result, cmd
        (num_info, num_debug, num_verbose, num_other) = (0, 0, 0, 0)
        for log_file in log_files:
            json_file = 'MessageCount.{:s}.json'.format(log_file)
            if not os.path.isfile(json_file):
                self.log.warning('%s cannot open file %s', self.name, json_file)
            with open(json_file) as f:
                summary = json.load(f)
                num_info += summary['INFO']
                num_debug += summary['DEBUG']
                num_verbose += summary['VERBOSE']
                num_other += summary['other']
        if num_info > self.info_threshold:
            self.log.info(
                '%s Number of INFO messages %s is higher than threshold %s',
                self.name, num_info, self.info_threshold)
            self.result += 1
        if num_debug > self.debug_threshold:
            self.log.info(
                '%s Number of DEBUG messages %s is higher than threshold %s',
                self.name, num_debug, self.debug_threshold)
            self.result += 1
        if num_verbose > self.verbose_threshold:
            self.log.info(
                '%s Number of VERBOSE messages %s is higher than threshold %s',
                self.name, num_verbose, self.verbose_threshold)
            self.result += 1
        if num_other > self.other_threshold:
            self.log.info(
                '%s Number of "other" messages %s is higher than threshold %s',
                self.name, num_other, self.other_threshold)
            self.result += 1
        if self.auto_report_result:
            self.report_result()
        return self.result, cmd


def default_check_steps(test):
    '''
    Create the default list of check steps for a test. The configuration
    depends on the package name and the type of exec steps (athena or
    athenaHLT or transforms).
    '''

    check_steps = []

    # Log merging
    if len(test.exec_steps) == 1:
        exec_step = test.exec_steps[0]
        if exec_step.type == 'athenaHLT':
            logmerge = LogMergeStep()
            logmerge.merged_name = 'athena.log'
            logmerge.log_files = ['athenaHLT.log']
            nforks = 1 if exec_step.forks is None else exec_step.forks
            for n in range(1, 1+nforks):
                logmerge.log_files.append('athenaHLT:{:02d}.out'.format(n))
                logmerge.log_files.append('athenaHLT:{:02d}.err'.format(n))
            check_steps.append(logmerge)
    else:
        logmerge = LogMergeStep()
        logmerge.merged_name = 'athena.log'
        logmerge.log_files = []
        for exec_step in test.exec_steps:
            logmerge.log_files.append(exec_step.get_log_file_name())
            if exec_step.type == 'athenaHLT':
                logmerge.extra_log_regex = 'athenaHLT:.*(.out|.err)'
        check_steps.append(logmerge)
    log_to_check = None
    log_to_zip = None
    if len(check_steps) > 0 and isinstance(check_steps[-1], LogMergeStep):
        log_to_check = check_steps[-1].merged_name
        log_to_zip = check_steps[-1].merged_name

    # Reco_tf log merging
    step_types = [step.type for step in test.exec_steps]
    if 'Reco_tf' in step_types:
        reco_tf_logmerge = LogMergeStep('LogMerge_Reco_tf')
        reco_tf_logmerge.warn_if_missing = False
        tf_names = ['HITtoRDO', 'RDOtoRDOTrigger', 'RAWtoESD', 'ESDtoAOD',
                    'PhysicsValidation', 'RAWtoALL', 'BSFTKCreator']
        reco_tf_logmerge.log_files = ['log.'+tf_name for tf_name in tf_names]
        reco_tf_logmerge.merged_name = 'athena.merged.log'
        log_to_zip = reco_tf_logmerge.merged_name
        if log_to_check is not None:
            reco_tf_logmerge.log_files.append(log_to_check)
            log_to_check = reco_tf_logmerge.merged_name
        log_to_check = reco_tf_logmerge.merged_name
        check_steps.append(reco_tf_logmerge)

    # Histogram merging for athenaHLT forks
    num_athenaHLT_steps = sum([1 for step in test.exec_steps if step.type == 'athenaHLT'])
    if num_athenaHLT_steps > 0:
        histmerge = RootMergeStep('HistMerge')
        histmerge.merged_file = 'expert-monitoring.root'
        histmerge.input_file = 'athenaHLT_workers/*/expert-monitoring.root expert-monitoring-mother.root'
        histmerge.rename_suffix = '-mother'
        check_steps.append(histmerge)

    # CheckLog for errors
    checklog = CheckLogStep('CheckLog')
    if log_to_check is not None:
        checklog.log_file = log_to_check
    check_steps.append(checklog)

    # CheckLog for warnings
    checkwarn = CheckLogStep('Warnings')
    checkwarn.check_errors = False
    checkwarn.check_warnings = True
    if log_to_check is not None:
        checkwarn.log_file = log_to_check
    check_steps.append(checkwarn)

    # MessageCount
    msgcount = MessageCountStep('MessageCount')
    check_steps.append(msgcount)

    # RegTest
    regtest = RegTestStep()
    if log_to_check is not None:
        regtest.input_base_name = os.path.splitext(log_to_check)[0]
    if 'athenaHLT' in step_types:
        regtest.regex = r'(?:HltEventLoopMgr(?!.*athenaHLT-)|REGTEST)'
    check_steps.append(regtest)

    # Tail (probably not so useful these days)
    tail = TailStep()
    if log_to_check is not None:
        tail.log_file = log_to_check
    check_steps.append(tail)

    # PerfMon
    check_steps.append(PerfMonStep())

    # Histogram-based steps
    check_steps.append(RootCompStep())
    check_steps.append(ChainDumpStep())
    check_steps.append(HistCountStep())

    # ZeroCounts
    check_steps.append(ZeroCountsStep())

    # Extra JSON
    check_steps.append(TrigTestJsonStep())

    # CheckFile
    check_steps.append(CheckFileStep())

    # Zip the merged log (can be large and duplicates information)
    if log_to_zip is not None:
        zip_step = ZipStep()
        zip_step.zip_input = log_to_zip
        zip_step.zip_output = log_to_zip+'.tar.gz'
        check_steps.append(zip_step)

    # return the steps
    return check_steps