Skip to content
Snippets Groups Projects
Forked from atlas / athena
111931 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
HLTCFConfig.py 21.70 KiB
# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration

# Classes to configure the CF graph, via Nodes
from AthenaCommon.CFElements import parOR, seqAND, seqOR
from AthenaCommon.Logging import logging
from AthenaCommon.AlgSequence import dumpSequence
from TriggerMenuMT.HLTMenuConfig.Menu.HLTCFDot import  stepCF_DataFlow_to_dot, stepCF_ControlFlow_to_dot, all_DataFlow_to_dot
from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponentsNaming import CFNaming
from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator

import sys
import copy
from AthenaCommon.Constants import VERBOSE,INFO,DEBUG
log = logging.getLogger('HLTCFConfig')
log.setLevel( VERBOSE )


#### Here functions to create the CF tree from CF configuration objects
def makeSummary(name, flatDecisions):
    """ Returns a TriggerSummaryAlg connected to given decisions"""
    from DecisionHandling.DecisionHandlingConf import TriggerSummaryAlg
    summary = TriggerSummaryAlg( name, OutputLevel = 2 )
    summary.InputDecision = "L1DecoderSummary"
    summary.FinalDecisions = flatDecisions
    return summary


def createStepRecoNode(name, seq_list, dump=False):
    """ elementary HLT reco step, contianing all sequences of the step """

    log.debug("Create reco step %s with %d sequences", name, len(seq_list))
    stepCF = parOR(name + CFNaming.RECO_POSTFIX)
    for seq in seq_list:        
        stepCF += createCFTree(seq)
    
    if dump: dumpSequence (stepCF, indent=0)        
    return stepCF


def createStepFilterNode(name, seq_list, dump=False):
    """ elementary HLT filter step: OR node containing all Filters of the sequences. The node gates execution of next reco step """

    log.debug("Create filter step %s with %d filters", name, len(seq_list))
    stepCF = parOR(name + CFNaming.FILTER_POSTFIX)
    for seq in seq_list:
        filterAlg = seq.filter.Alg
        log.info("Add  %s to filter node %s", filterAlg.name(), name)
        stepCF += filterAlg
    
    if dump: dumpSequence (stepCF, indent=0)        
    return stepCF


def createCFTree(CFseq):
    """ Creates AthSequencer nodes with sequences attached """

    log.debug(" *** Create CF Tree for CFSequence %s", CFseq.step.name)

    filterAlg = CFseq.filter.Alg
    stepReco = parOR(CFseq.step.name + CFNaming.RECO_POSTFIX)  # all reco algoritms from al lthe sequences in a parallel sequence
    seqAndView = seqAND(CFseq.step.name + CFNaming.VIEW_POSTFIX, [stepReco])  # include in seq:And to run in views: add here the Hypo
    seqAndWithFilter = seqAND(CFseq.step.name, [filterAlg, seqAndView])  # add to the main step+filter

    already_connected = []
    for menuseq in CFseq.step.sequences:
        ath_sequence = menuseq.sequence.Alg
        name = ath_sequence.name()
        print('5555555 ATH SEQUENCE NAME ---- ', name)
        if name in already_connected:
            log.debug("AthSequencer %s already in the Tree, not added again",name)
        else:
            already_connected.append(name)
            stepReco += ath_sequence
        seqAndView += menuseq.hypo.Alg

    if CFseq.step.isCombo:
        seqAndView += CFseq.step.combo.Alg

    return seqAndWithFilter



#######################################
## CORE of Decision Handling
#######################################



def makeHLTTree(HLTChains, triggerConfigHLT = None):

    # Check if triggerConfigHLT exits, if yes, derive information from this
    # this will be in use once TrigUpgrade test has migrated to TriggerMenuMT completely 
    allChainDicts = []  
    allChainConfigs = []
    if triggerConfigHLT:
        log.info("Obtaining chain dictionaries and configuration from triggerConfigHLT")
        allChainConfigs = triggerConfigHLT.allChainConfigs
        allChainDicts = triggerConfigHLT.allChainDicts
    else:
        log.info("No triggerConfigHLT was passed, only relying on HLTChains now")
        log.info("Creating necessary chainDict info now")
        allChainConfigs = HLTChains
        # call dictfrom chain name maybe here once to obtain list of dictionaries for all chains
        # loop over HLT Chains
        from TriggerMenuMT.HLTMenuConfig.Menu import DictFromChainName
        decodeChainName = DictFromChainName.DictFromChainName()
        for chain in allChainConfigs:
            chainDict = decodeChainName.getChainDict(chain.name)
            allChainDicts.append(chainDict)

    """ creates the full HLT tree"""

    # lock flags
    from AthenaConfiguration.AllConfigFlags import ConfigFlags
    ConfigFlags.lock()

    # get topSequnece
    from AthenaCommon.AlgSequence import AlgSequence
    topSequence = AlgSequence()

    #add the L1Upcacking
    #    TopHLTRootSeq += L1UnpackingSeq

    # connect to L1Decoder
    l1decoder = [ d for d in topSequence.getChildren() if d.getType() == "L1Decoder" ]
    if len(l1decoder)  != 1 :
        raise RuntimeError(" Can't find 1 instance of L1Decoder in topSequence, instead found this in topSequence "+str(topSequence.getChildren()) )


    # set CTP chains before creating the full tree (and the monitor)
    EnabledChainNamesToCTP = dict([ (c.name, c.seed)  for c in HLTChains])
    l1decoder[0].ChainToCTPMapping = EnabledChainNamesToCTP

    # main HLT top sequence
    hltTop = seqOR("hltTop")
 
    # add the HLT steps Node
    steps = seqAND("HLTAllSteps")
    hltTop +=  steps
    
    # make CF tree

    finalDecisions = decisionTree_From_Chains(steps, allChainConfigs, allChainDicts)
    EnabledChainNames = [c.name for c in allChainConfigs]
    
    flatDecisions=[]
    for step in finalDecisions: flatDecisions.extend (step)
    summary= makeSummary("TriggerSummaryFinal", flatDecisions)
    hltTop += summary

    # add signature monitor
    from TriggerJobOpts.TriggerConfig import collectHypos, triggerMonitoringCfg, triggerSummaryCfg
    hypos = collectHypos(steps)
    summaryAcc, summaryAlg = triggerSummaryCfg( ConfigFlags, hypos )
    hltTop += summaryAlg
    summaryAcc.appendToGlobals()
    
    monAcc, monAlg = triggerMonitoringCfg( ConfigFlags, hypos, l1decoder[0] )
    monAcc.appendToGlobals()    
    hltTop += monAlg
    
    topSequence += hltTop


def matrixDisplay( allSeq ):
    from collections import defaultdict
    longestName = 5
    mx = defaultdict(lambda: dict())
    for stepNumber,step in enumerate(allSeq, 1):
        for seq in step:
            mx[stepNumber][seq.step.name] = seq
            
            longestName = max(longestName, len(seq.step.name) )
            
    longestName = longestName + 1
    def __getHyposOfStep( s ):
        return s.step.sequences[0].hypo.tools
    

    
    def __nextSteps( index, stepName ):
        nextStepName = "Step%s_"%index + "_".join(stepName.split("_")[1:])
        for sname, seq in mx[index].iteritems():
            if sname == nextStepName:
                return sname.ljust( longestName ) + __nextSteps( index + 1, nextStepName )
        return ""
            
    from pprint import pprint
    log.debug("" )
    log.debug("chains^ vs steps ->")
    log.debug( "="*90 )    
    for sname, seq in mx[1].iteritems():
        guessChainName = '_'.join( sname.split( "_" )[1:] )
        log.debug( " Reco chain: %s: %s" % ( guessChainName.rjust(longestName),  __nextSteps( 1, sname ) ) )
        log.debug( " "+ " ".join( __getHyposOfStep( seq ) ) )
        log.debug( "" )
        
    log.debug( "="*90 )
    log.debug( "" )

        

def decisionTree_From_Chains(HLTNode, chains, allDicts):
    """ creates the decision tree, given the starting node and the chains containing the sequences  """

    log.debug("Run decisionTree_From_Chains on %s", HLTNode.name())

    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import CFSequence, RoRSequenceFilterNode, ComboMaker
    HLTNodeName= HLTNode.name()

    # find nsteps
    chainWithMaxSteps = max(chains, key=lambda chain: len(chain.steps))
    NSTEPS = len(chainWithMaxSteps.steps)

    # add chains to multiplicity map (new step here, as this was originally in the __init__ of Chain class

    #loop over chains to configure hypotools
    # must be done after all chains are created, to avoid conflicts 
    log.debug("Loop over chains to decode hypo tools")
    for chain in chains:
        chain.decodeHypoToolConfs(allDicts) 

    finalDecisions = [] # needed for monitor
    allSeq_list = []

    for nstep in range(0, NSTEPS):
        finalDecisions.append([]) # list of final deciisons per step
        stepCF_name =  CFNaming.stepName(nstep)
        CFseq_list = []
        step_decisions = []

        for chain in chains:
            # skip this step if missing
            if len(chain.steps) <= nstep:               
                continue

            chain_step=chain.steps[nstep]
            log.debug("\n************* Start step %d %s for chain %s", nstep+1, stepCF_name, chain.name)
            # one filter per ChainStep - same name indeed
            # one filter input per previous menusequence output (the L1Seed for first step)
            # one filter output per menuSeq
            
            filter_input =[]
            if nstep == 0: # L1 seeding
                seeds = chain.group_seed
                filter_input.extend( seeds )
                log.debug("Found these seeds from the chain: %s", seeds)
                log.debug("Seeds added; having in the filter now: %s", filter_input)
            else:
                prev = chain.steps[nstep-1].sequences
                ## for seq in prev:                   
                ##     filter_input.extend(seq.outputs)
                ## log.debug("Connect to previous sequence through these filter inputs: %s" %str( filter_input) )


                # previous filter name
                pre_filter_name = CFNaming.filterName(chain.steps[nstep-1].name)
                # searching the correct sequence outut to connect to the filter
                for seq in prev:
                    for out in seq.outputs:
                        if pre_filter_name in out:
                            filter_input.append(out)
                            log.debug("Connect to previous sequence through these filter inputs: %s" %str( filter_input) )

            if len(filter_input) == 0 or (len(filter_input) != 1 and not chain_step.isCombo):
                log.error("ERROR: Filter for step %s has %d inputs! One is expected"%(chain_step.name, len(filter_input)))
                sys.exit("ERROR, in configuration of sequence "+seq.name)
                    

            # get the filter:
            filter_name = CFNaming.filterName(chain_step.name)
            (foundFilter, sfilter) = findFilter(filter_name, CFseq_list)
            if not foundFilter:           
                sfilter = buildFilter(filter_name, filter_input)
                CF_seq = CFSequence( ChainStep=chain_step, FilterAlg=sfilter)
                CFseq_list.append(CF_seq)
                for sequence in chain_step.sequences:                
                    step_decisions.extend(sequence.outputs)
            else:
                log.debug("foundFilter %s", filter_name)

            sfilter.setChains(chain.name)
            log.debug("Adding chain %s to %s", chain.name,sfilter.Alg.name())
            log.debug(sfilter.getChains())

            if len(chain.steps) == nstep+1:  
                log.debug("Adding finalDecisions for chain %s at step %d:"%(chain.name, nstep+1))
                for seq in chain_step.sequences:
                    finalDecisions[nstep].extend(seq.outputs)
                    log.debug(seq.outputs)
#            else:
#                log.debug("len(chain.steps) != nstep+1")
            #end of loop over menu sequences
                
        #end of loop over chains for this step, now implement CF:
    
        log.debug("\n******** Create CF Tree %s with AthSequencers", stepCF_name)

        #first make the filter step
        stepFilter = createStepFilterNode(stepCF_name, CFseq_list, dump=False)
        HLTNode += stepFilter
        allSeq_list.append(CFseq_list)

        # then the reco step
        recoNodeName = CFNaming.stepRecoNodeName(HLTNodeName, stepCF_name)
        stepCF = createStepRecoNode(recoNodeName, CFseq_list, dump=False)
        HLTNode += stepCF


        # then the monitor summary
        summary=makeSummary(CFNaming.stepSummaryName(stepCF_name), step_decisions)
        HLTNode += summary


        log.debug("Now Draw...")
        stepCF_DataFlow_to_dot(recoNodeName, CFseq_list)
        stepCF_ControlFlow_to_dot(stepCF)

        log.info("************* End of step %d, %s", nstep+1, stepCF_name)
        # end of steps



    log.debug("finalDecisions: %s" %str( finalDecisions) )
    all_DataFlow_to_dot(HLTNodeName, allSeq_list)

    # matrix display
    matrixDisplay( allSeq_list )


    return finalDecisions


"""
Not used, kept for reference and testing purposes
To be removed in future
"""
def generateDecisionTreeOld(HLTNode, chains, allChainDicts):
    log.debug("Run decisionTree_From_Chains on %s", HLTNode.name())
    from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
    acc = ComponentAccumulator()
    from collections import defaultdict
    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import CFSequence

    chainStepsMatrix = defaultdict(lambda: defaultdict(lambda: list()))

    ## Fill chain steps matrix
    for chain in chains:
        chain.decodeHypoToolConfs(allChainDicts)
        for stepNumber, chainStep in enumerate(chain.steps):
            chainName = chainStep.name.split('_')[0]
            chainStepsMatrix[stepNumber][chainName].append(chain)

    allSequences = []

    ## Matrix with steps lists generated. Creating filters for each cell
    for nstep in chainStepsMatrix:
        CFsequences = []
        stepDecisions = []
        stepAccs = []
        stepHypos = []

        for chainName in chainStepsMatrix[nstep]:
            chainsInCell = chainStepsMatrix[nstep][chainName]

            if not chainsInCell:
                continue


            stepCategoryAcc = ComponentAccumulator()            

            stepHypo = None

            for chain in chainsInCell:
                for seq in chain.steps[nstep].sequences:
                    if seq.ca:
                        stepCategoryAcc.merge( seq.ca )

                    alg = seq.hypo.Alg
                    if stepHypo is None:
                        stepHypo = alg
                        stepHypos.append( alg )
                    stepCategoryAcc.addEventAlgo( alg )

            stepAccs.append( stepCategoryAcc )
                
            stepCategoryAcc.printConfig( True, True )
            firstChain = chainsInCell[0]

            if nstep == 0:
                filter_input = firstChain.group_seed
#                previous_seeds = firstChain.group_seed

            else:
                filter_input = []
                for sequence in firstChain.steps[nstep - 1].sequences:
                    filter_input += sequence.outputs

            # One aggregated filter per chain (one per column in matrix)
            filterName = 'Filter_{}'.format( firstChain.steps[nstep].name )
            sfilter = buildFilter(filterName,  filter_input)

            chainStep = firstChain.steps[nstep]

            CFseq = CFSequence( ChainStep=chainStep, FilterAlg=sfilter )
            CFsequences.append( CFseq )


            for sequence in chainStep.sequences:
                stepDecisions += sequence.outputs
        
            for chain in chainsInCell:
                sfilter.setChains(chain.name)

        allSequences.append(CFsequences)

        stepName = 'Step{}'.format(nstep)
        stepFilter = createStepFilterNode(stepName, CFsequences, dump=False)
        stepCF = createStepRecoNode('{}_{}'.format(HLTNode.name(), stepName), CFsequences, dump=False)

        from AthenaCommon.CFElements import findOwningSequence
        for oneAcc, cfseq, hypo in zip( stepAccs, CFsequences, stepHypos):
            owning = findOwningSequence( stepCF, hypo.getName() )
            acc.addSequence( owning )
            acc.merge( oneAcc, sequenceName = owning.getName() )
        summary = makeSummary('TriggerSummary{}'.format(stepName), stepDecisions)

        HLTNode += stepFilter
        HLTNode += stepCF
        HLTNode += summary

        stepCF_DataFlow_to_dot('{}_{}'.format(HLTNode.name(), stepName), CFsequences)
        stepCF_ControlFlow_to_dot(stepCF)

    all_DataFlow_to_dot(HLTNode.name(), allSequences)
    matrixDisplay( allSequences )
    return acc


def connectStepToFilter(chainStep, filter):
    filter_output = filter.getOutputList()
    if len(filter_output) == 0:
        raise ValueError('ERROR: no filter outputs are set')

    if len(filter_output) != len(chainStep.sequences):
        msg = 'ERROR: found {} filter outputs and {} MenuSequences in step {}'.format(len(filter_output),
            len(chainStep.sequences), chainStep.name)
        raise ValueError(msg)

    for nseq, sequence in enumerate(chainStep.sequences):
        output = filter_output[nseq]
        log.debug("Found input %s to sequence::%s from Filter::%s (from seed %s)", output,
                  sequence.name, filter.Alg.name(), sequence.seed)
        sequence.connectToFilter(output)


def generateDecisionTree(mainSequenceName, chains, allChainDicts):
    from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
    from collections import defaultdict
    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import CFSequence
    from AthenaCommon.CFElements import findOwningSequence

    log.debug("Run decisionTree_From_Chains on %s", mainSequenceName)

    acc = ComponentAccumulator()
    mainSequenceName = 'HLTAllSteps'
    acc.addSequence( seqAND(mainSequenceName) )

    chainStepsMatrix = defaultdict(lambda: defaultdict(list))

    ## Fill chain steps matrix
    for chain in chains:
        chain.decodeHypoToolConfs(allChainDicts)
        for stepNumber, chainStep in enumerate(chain.steps):
            chainName = chainStep.name.split('_')[0]
            chainStepsMatrix[stepNumber][chainName].append(chain)

    allSequences = []

    ## Matrix with steps lists generated. Creating filters for each cell
    for nstep in chainStepsMatrix:
        stepDecisions = []

        stepName = 'Step{}'.format(nstep)

        stepFilterNodeName = '{}{}'.format(stepName, CFNaming.FILTER_POSTFIX)
        filterAcc = ComponentAccumulator()
        filterAcc.addSequence( parOR(stepFilterNodeName) )

        stepRecoNodeName = '{}_{}'.format(mainSequenceName, stepName)
        recoAcc = ComponentAccumulator()
        recoAcc.addSequence( parOR(stepRecoNodeName) )

        for chainName in chainStepsMatrix[nstep]:
            chainsInCell = chainStepsMatrix[nstep][chainName]

            if not chainsInCell:
                # If cell is empty, there is nothing to do
                continue

            firstChain = chainsInCell[0]
            if nstep == 0:
                filter_input = firstChain.group_seed
            else:
                filter_input = [output for sequence in firstChain.steps[nstep - 1].sequences for output in sequence.outputs]

            # One aggregated filter per chain (one per column in matrix)
            filterName = 'Filter_{}'.format( firstChain.steps[nstep].name )
            sfilter = buildFilter(filterName, filter_input)
            filterAcc.addEventAlgo(sfilter.Alg, sequenceName = stepFilterNodeName)

            chainStep = firstChain.steps[nstep]
            stepReco = parOR('{}{}'.format(chainStep.name, CFNaming.RECO_POSTFIX))
            stepView = seqAND('{}{}'.format(chainStep.name, CFNaming.VIEW_POSTFIX), [stepReco])
            viewWithFilter = seqAND(chainStep.name, [sfilter.Alg, stepView])
            recoAcc.addSequence(viewWithFilter, parentName = stepRecoNodeName)

            stepsAcc = ComponentAccumulator()

            for chain in chainsInCell:
                for seq in chain.steps[nstep].sequences:
                    if seq.ca:
                        stepsAcc.merge( seq.ca )
                    recoAcc.addEventAlgo( seq.hypo.Alg, sequenceName = stepView.getName() )
                sfilter.setChains(chain.name)

            recoAcc.merge(stepsAcc, sequenceName = stepReco.getName())

            connectStepToFilter(chainStep, sfilter)

            for sequence in chainStep.sequences:
                stepDecisions += sequence.outputs

        acc.merge(filterAcc, sequenceName = mainSequenceName)
        acc.merge(recoAcc, sequenceName = mainSequenceName)

        summary = makeSummary('TriggerSummary{}'.format(stepName), stepDecisions)
        acc.addSequence(summary, parentName = mainSequenceName)

    return acc


def findFilter(filter_name, cfseqList):
      """
      searches for a filter, with given name, in the CF sequence list of this step
      """
      log.debug( "findFilter: filter name %s" % filter_name )
      foundFilters = [cfseq.filter for cfseq in cfseqList if filter_name in cfseq.filter.Alg.name()]
      if len(foundFilters) > 1:
          log.error("found %d filters  with name %s", len( foundFilters ), filter_name)
          sys.exit("ERROR, in filter configuration")

      found = bool(foundFilters)
      if found:          
          log.debug("Filter %s already exists", filter_name)        
          return (found, foundFilters[0])
      return (found, None)



def buildFilter(filter_name,  filter_input):
    """
     Build the FILTER
     one filter per previous sequence at the start of the sequence: check if it exists or create a new one        
     if the previous hypo has more than one output, try to get all of them
     one filter per previous sequence: 1 input/previous seq, 1 output/next seq 
    """
    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import  RoRSequenceFilterNode       
    sfilter = RoRSequenceFilterNode(name=filter_name)
    for i in filter_input: sfilter.addInput(i)
    for i in filter_input: sfilter.addOutput(CFNaming.filterOutName(filter_name, i))

    log.debug("Added inputs to filter: %s", sfilter.getInputList())
    log.debug("Added outputs to filter: %s", sfilter.getOutputList())
    log.debug("Filter Done: %s", sfilter.Alg.name())

    return (sfilter)