Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
HLTCFConfig.py 22.61 KiB
# Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration

"""
    ------ Documentation on HLT Tree creation -----

++ Filter creation strategy

++ Connections between InputMaker/HypoAlg/Filter

++ Seeds

++ Combined chain strategy

- The combined chains use duplicates of the single-object-HypoAlg, called HypoAlgName_for_stepName.
  These duplicates are connected to a dedicated ComboHypoAlg (added by the framework), able to count object multiplicity
     -- This is needed for two reasons:
           -- the HypoAlg is designed to have only one input TC (that is already for the single object)
           -- otherwise the HypoAlg would be equipped with differnt HypoTools with the same name (see for example e3_e8)
     -- If the combined chain is symmetric (with multiplicity >1), the Hypo is duplicated only once,
        equipped with a HypoTool configured as single object and followed by one ComboHypoAlg





"""

from builtins import zip
from builtins import str
from builtins import map
from builtins import range
from collections import OrderedDict
# Classes to configure the CF graph, via Nodes
from AthenaCommon.CFElements import parOR, seqAND
from AthenaCommon.AlgSequence import dumpSequence
from TriggerMenuMT.HLTMenuConfig.Menu.HLTCFDot import  stepCF_DataFlow_to_dot, stepCF_ControlFlow_to_dot, all_DataFlow_to_dot, create_dot
from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponentsNaming import CFNaming
from AthenaCommon.Configurable import Configurable

from AthenaCommon.CFElements import getSequenceChildren, isSequence, compName
import re



from AthenaCommon.Logging import logging
log = logging.getLogger( __name__ )

#### Here functions to create the CF tree from CF configuration objects
def makeSummary(name, flatDecisions):
    """ Returns a TriggerSummaryAlg connected to given decisions"""
    from DecisionHandling.DecisionHandlingConfig import TriggerSummaryAlg    
    summary = TriggerSummaryAlg( CFNaming.stepSummaryName(name) )
    summary.InputDecision = "L1DecoderSummary"
    summary.FinalDecisions = list(OrderedDict.fromkeys(flatDecisions))
    return summary


def createStepRecoNode(name, seq_list, dump=False):
    """ elementary HLT reco step, contianing all sequences of the step """

    log.debug("Create reco step %s with %d sequences", name, len(seq_list))
    stepCF = parOR(name + CFNaming.RECO_POSTFIX)
    for seq in seq_list:
        stepCF += createCFTree(seq)

    if dump:
        dumpSequence (stepCF, indent=0)
    return stepCF


def createStepFilterNode(name, seq_list, dump=False):
    """ elementary HLT filter step: OR node containing all Filters of the sequences. The node gates execution of next reco step """

    log.debug("Create filter step %s with %d filters", name, len(seq_list))
    filter_list=[]
    for seq in seq_list:
        filterAlg = seq.filter.Alg
        log.info("createStepFilterNode: Add  %s to filter node %s", filterAlg.name(), name)
        if filterAlg not in filter_list:
            filter_list.append(filterAlg)


    stepCF = parOR(name + CFNaming.FILTER_POSTFIX, subs=filter_list)

    if dump:
        dumpSequence (stepCF, indent=0)
    return stepCF


def createCFTree(CFseq):
    """ Creates AthSequencer nodes with sequences attached """

    log.debug(" *** Create CF Tree for CFSequence %s", CFseq.step.name)
    filterAlg = CFseq.filter.Alg

    #empty step:
    if len(CFseq.step.sequences)==0:
        seqAndWithFilter = seqAND(CFseq.step.name, [filterAlg])
        return seqAndWithFilter

    stepReco = parOR(CFseq.step.name + CFNaming.RECO_POSTFIX)  # all reco algorithms from all the sequences in a parallel sequence
    seqAndView = seqAND(CFseq.step.name + CFNaming.VIEW_POSTFIX, [stepReco])  # include in seq:And to run in views: add here the Hypo
    seqAndWithFilter = seqAND(CFseq.step.name, [filterAlg, seqAndView])  # add to the main step+filter

    recoSeqSet=set()
    hypoSet=set()
    for menuseq in CFseq.step.sequences:
        menuseq.addToSequencer(recoSeqSet,hypoSet)
  
    stepReco += sorted(list(recoSeqSet), key=lambda t: t.name())
    seqAndView += sorted(list(hypoSet), key=lambda t: t.name())
       
    if CFseq.step.isCombo:
        seqAndView += CFseq.step.combo.Alg

    return seqAndWithFilter


#######################################
## CORE of Decision Handling
#######################################

def makeHLTTree(newJO=False, triggerConfigHLT = None):
    """ creates the full HLT tree"""

    # Check if triggerConfigHLT exits, if yes, derive information from this
    # this will be in use once TrigUpgrade test has migrated to TriggerMenuMT completely

    # get topSequnece
    from AthenaCommon.AlgSequence import AlgSequence
    topSequence = AlgSequence()


    # find main HLT top sequence (already set up in runHLT_standalone)
    from AthenaCommon.CFElements import findSubSequence,findAlgorithm
    l1decoder = findAlgorithm(topSequence, "L1Decoder")

    # add the HLT steps Node
    steps = seqAND("HLTAllSteps")
    hltTop = findSubSequence(topSequence, "HLTTop")
    hltTop +=  steps

    hltEndSeq = parOR("HLTEndSeq")
    hltTop += hltEndSeq

    hltFinalizeSeq = seqAND("HLTFinalizeSeq")

    # make DF and CF tree from chains
    finalDecisions = decisionTreeFromChains(steps, triggerConfigHLT.configsList(), triggerConfigHLT.dictsList(), newJO)

    flatDecisions=[]
    for step in finalDecisions:
        flatDecisions.extend (step)

    summary = makeSummary("Final", flatDecisions)
    hltEndSeq += summary

    # TODO - check we are not running things twice. Once here and once in TriggerConfig.py

    from TriggerJobOpts.TriggerConfig import collectHypos, collectFilters, collectViewMakers, collectDecisionObjects,\
        triggerMonitoringCfg, triggerSummaryCfg, triggerMergeViewsAndAddMissingEDMCfg, collectHypoDecisionObjects
    from AthenaConfiguration.AllConfigFlags import ConfigFlags

    from AthenaConfiguration.ComponentAccumulator import conf2toConfigurable, appendCAtoAthena

    # Collections required to configure the algs below
    hypos = collectHypos(steps)
    filters = collectFilters(steps)
    viewMakers = collectViewMakers(steps)

    Configurable.configurableRun3Behavior=1
    summaryAcc, summaryAlg = triggerSummaryCfg( ConfigFlags, hypos )
    Configurable.configurableRun3Behavior=0
    # A) First we check if any chain accepted the event
    hltFinalizeSeq += conf2toConfigurable( summaryAlg )
    appendCAtoAthena( summaryAcc )

    # B) Then (if true), we run the accepted event algorithms.
    # Add any required algs to hltFinalizeSeq here

    # More collections required to configure the algs below
    decObj = collectDecisionObjects( hypos, filters, l1decoder, summaryAlg )
    decObjHypoOut = collectHypoDecisionObjects(hypos, inputs=False, outputs=True)

    Configurable.configurableRun3Behavior=1
    monAcc, monAlg = triggerMonitoringCfg( ConfigFlags, hypos, filters, l1decoder )
    Configurable.configurableRun3Behavior=0
    hltEndSeq += conf2toConfigurable( monAlg )
    appendCAtoAthena( monAcc )

    Configurable.configurableRun3Behavior=1
    edmAlg = triggerMergeViewsAndAddMissingEDMCfg(ConfigFlags, ['AOD', 'ESD'], hypos, viewMakers, decObj, decObjHypoOut)
    Configurable.configurableRun3Behavior=0
    # C) Finally, we create the EDM output
    hltFinalizeSeq += conf2toConfigurable(edmAlg)

    hltEndSeq += hltFinalizeSeq

    # Test the configuration
    from TriggerMenuMT.HLTMenuConfig.Menu.CFValidation import testHLTTree
    testHLTTree( hltTop )


def matrixDisplayOld( allCFSeq ):
    from collections import defaultdict
    longestName = 5
    mx = defaultdict(lambda: dict())
    for stepNumber,step in enumerate(allCFSeq, 1):
        for seq in step:
            mx[stepNumber][seq.step.name] = seq # what if ther eare more sequences in one step?

            longestName = max(longestName, len(seq.step.name) )

    longestName = longestName + 1

    def __getHyposOfStep( s ):
        if len(s.step.sequences):
            if len(s.step.sequences)==1:
                ## if type(s.step.sequences[0].hypo) is list:
                ##     return s.step.sequences[0].hypo[0].tools
                ## else:
                return s.step.sequences[0].hypo.tools
            else:
                return list(s.step.combo.getChains())
        return []
   



    def __nextSteps( index, stepName ):
        nextStepName = "Step%s_"%index + "_".join(stepName.split("_")[1:])
        for sname, seq in mx[index].items():
            if sname == nextStepName:
                return sname.ljust( longestName ) + __nextSteps( index + 1, nextStepName )
        return ""

    log.debug("" )
    log.debug("chains^ vs steps ->")
    log.debug( "="*90 )
    for sname, seq in mx[1].items():
        guessChainName = '_'.join( sname.split( "_" )[1:] )
        log.debug( " Reco chain: %s: %s", guessChainName.rjust(longestName),  __nextSteps( 1, sname ) )
        log.debug( " %s", " ".join( __getHyposOfStep( seq ) ) )
        log.debug( "" )

    log.debug( "%s", "="*90 )
    log.debug( "" )


    
def matrixDisplay( allCFSeq ):

    def __getHyposOfStep( step ):
        if len(step.sequences):
            step.getChainNames()           
        return []
   
    # fill dictionary to cumulate chains on same sequences, in steps (dict with composite keys)
    from collections import defaultdict
    mx = defaultdict(list)

    for stepNumber,cfseq_list in enumerate(allCFSeq, 1):
        for cfseq in cfseq_list:
            chains = __getHyposOfStep(cfseq.step)
            for seq in cfseq.step.sequences:
                if seq.name == "Empty":
                    mx[stepNumber, "Empty"].extend(chains)
                else:
                    mx[stepNumber, seq.sequence.Alg.name()].extend(chains)

    # sort dictionary by fist key=step
    from collections import  OrderedDict
    sorted_mx = OrderedDict(sorted( list(mx.items()), key= lambda k: k[0]))

    log.info( "" )
    log.info( "="*90 )
    log.info( "Cumulative Summary of steps")
    log.info( "="*90 )
    for (step, seq), chains in list(sorted_mx.items()):
        log.info( "(step, sequence)  ==> (%d, %s) is in chains: ",  step, seq)
        for chain in chains:
            log.info( "              %s",chain)

    log.info( "="*90 )


def sequenceScanner( HLTNode ):
    """ Checks the alignement of sequences and steps in the tree"""
    # +-- AthSequencer/HLTAllSteps
    #   +-- AthSequencer/Step1_filter
    #   +-- AthSequencer/Step1_reco

    from collections import defaultdict
    _seqMapInStep = defaultdict(set)
    _status = True

    def _mapSequencesInSteps(seq, stepIndex):
        """ Recursively finds the steps in which sequences are used"""
        if not isSequence(seq):
            return stepIndex
        name = compName(seq)                
        match=re.search('^Step([0-9])_filter',name)
        if match:
            stepIndex = match.group(1)
            log.debug("sequenceScanner: This is another step: %s %s", name, stepIndex)            
        for c in getSequenceChildren( seq ):
            if isSequence(c):
                stepIndex = _mapSequencesInSteps(c, stepIndex)
                _seqMapInStep[compName(c)].add(stepIndex)
        return stepIndex

    # do the job:
    final_step=_mapSequencesInSteps(HLTNode, 0)

    for alg, steps in _seqMapInStep.items():
        if len(steps)> 1:
            log.error("sequenceScanner: Sequence %s is expected in more than one step: %s", alg, steps)
            match=re.search('Step([0-9])',alg)
            if match:
                candidateStep=match.group(1)
                log.error("sequenceScanner:         ---> candidate good step is %s", candidateStep)
            _status=False     

    log.debug("sequenceScanner: scanned %s steps with status %d", final_step, _status)
    return _status
   

def decisionTreeFromChains(HLTNode, chains, allDicts, newJO):
    """ creates the decision tree, given the starting node and the chains containing the sequences  """

    log.debug("Run decisionTreeFromChains on %s", HLTNode.name())
    HLTNodeName= HLTNode.name()
    if len(chains) == 0:
        log.info("Configuring empty decisionTree")
        return []

    (finalDecisions, CFseq_list) = createDataFlow(chains, allDicts)
    if not newJO:
        createControlFlow(HLTNode, CFseq_list)
    else:
        from TriggerMenuMT.HLTMenuConfig.Menu.HLTCFConfig_newJO import createControlFlowNewJO
        createControlFlowNewJO(HLTNode, CFseq_list)

    sequenceScanner( HLTNode )
    
    # decode and attach HypoTools:
    for chain in chains:
        chain.createHypoTools()

    # create dot graphs
    log.debug("finalDecisions: %s", finalDecisions)
    if create_dot():
        all_DataFlow_to_dot(HLTNodeName, CFseq_list)

    # matrix display
    matrixDisplay( CFseq_list )

    return finalDecisions


def createDataFlow(chains, allDicts):
    """ Creates the filters and connect them to the menu sequences"""

    # find tot nsteps
    chainWithMaxSteps = max(chains, key=lambda chain: len(chain.steps))
    NSTEPS = len(chainWithMaxSteps.steps)

    log.debug("createDataFlow for %d chains and total %d steps", len(chains), NSTEPS)

    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import CFSequence
    # initialize arrays for monitor
    finalDecisions = [ [] for n in range(NSTEPS) ]
    CFseqList = [ [] for n in range(NSTEPS) ]

    # loop over chains
    for chain in chains:
        log.debug("\n Configuring chain %s with %d steps: \n   - %s ", chain.name,len(chain.steps),'\n   - '.join(map(str, [{step.name:step.multiplicity} for step in chain.steps])))

        lastCFseq = None
        for nstep, chainStep in enumerate( chain.steps ):
            log.debug("\n************* Start connecting step %d %s for chain %s", nstep+1, chainStep.name, chain.name)

            filterInput = chain.L1decisions if nstep == 0 else lastCFseq.decisions
            log.debug("Seeds added; having in the filter now: %s", filterInput)


            if len(filterInput) == 0 :
                log.error("ERROR: Filter for step %s has %d inputs! At least one is expected", chainStep.name, len(filterInput))


            # make one filter per step:
            sequenceFilter= None
            filterName = CFNaming.filterName(chainStep.name)
            filterOutput =[ CFNaming.filterOutName(filterName, inputName) for inputName in filterInput ]
            log.debug("Filter outputps: %s", filterOutput)

            (foundFilter, foundCFSeq) = findCFSequences(filterName, CFseqList[nstep])
            log.debug("Found %d CF sequences with filter name %s", foundFilter, filterName)
             # add error if more than one
            if not foundFilter:
                sequenceFilter = buildFilter(filterName, filterInput)
                CFseq = CFSequence( ChainStep=chainStep, FilterAlg=sequenceFilter)
                CFseq.connect(filterOutput)
                CFseqList[nstep].append(CFseq)
                lastCFseq=CFseq
            else:
                if len(foundCFSeq) > 1:
                    log.error("Found more than one seuqences containig this filter %s", filterName)
                lastCFseq=foundCFSeq[0]
                sequenceFilter=lastCFseq.filter
                lastCFseq.connect(filterOutput)
                [ sequenceFilter.addInput(inputName) for inputName in filterInput ]
                [ sequenceFilter.addOutput(outputName) for outputName in  filterOutput ]


            # add chains to the filter:
            chainLegs = chain.getChainLegs()
            for leg in chainLegs:
                sequenceFilter.addChain(leg)
                log.debug("Adding chain %s to %s", leg, sequenceFilter.Alg.name())
            log.debug("Now Filter has chains: %s", sequenceFilter.getChains())

            if chainStep.isCombo:
                if chainStep.combo is not None:
                    chainStep.combo.addChain( [d for d in allDicts if d['chainName'] == chain.name ][0])
                    log.debug("Added chains to ComboHypo: %s",chainStep.combo.getChains())

            if len(chain.steps) == nstep+1:
                log.debug("Adding finalDecisions for chain %s at step %d:", chain.name, nstep+1)
                for dec in lastCFseq.decisions:
                    finalDecisions[nstep].append(dec)
                    log.debug(dec)
                    
        #end of loop over steps
        log.info("\n Built CF for chain %s with %d steps: \n   - %s ", chain.name,len(chain.steps),'\n   - '.join(map(str, [{step.name:step.multiplicity} for step in chain.steps])))
    #end of loop over chains


    log.debug("End of createDataFlow for %d chains and total %d steps", len(chains), NSTEPS)
    return (finalDecisions, CFseqList)


def createControlFlow(HLTNode, CFseqList):
    """ Creates Control Flow Tree starting from the CFSequences"""

    HLTNodeName= HLTNode.name()
    log.debug("createControlFlow on node %s",HLTNodeName)

    for nstep in range(len(CFseqList)):
        stepSequenceName =  CFNaming.stepName(nstep)
        log.debug("\n******** Create CF Tree %s with AthSequencers", stepSequenceName)

        #first make the filter step
        stepFilterNode = createStepFilterNode(stepSequenceName, CFseqList[nstep], dump=False)
        HLTNode += stepFilterNode

        # then the reco step
        stepRecoNode = createStepRecoNode(stepSequenceName, CFseqList[nstep], dump=False)
        HLTNode += stepRecoNode


        # then the monitor summary
        stepDecisions = []
        for CFseq in CFseqList[nstep]:
            stepDecisions.extend(CFseq.decisions)

        summary=makeSummary( stepSequenceName, stepDecisions )

        HLTNode += summary

        if create_dot():
            log.debug("Now Draw...")
            stepCF_DataFlow_to_dot(stepRecoNode.name(), CFseqList[nstep])
            stepCF_ControlFlow_to_dot(stepRecoNode)

        log.info("************* End of step %d, %s", nstep+1, stepSequenceName)

    return




"""
Not used, kept for reference and testing purposes
To be removed in future
"""
def generateDecisionTreeOld(HLTNode, chains, allChainDicts):
    log.debug("Run generateDecisionTreeOld on %s", HLTNode.name())
    from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
    acc = ComponentAccumulator()
    from collections import defaultdict
    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import CFSequence

    chainStepsMatrix = defaultdict(lambda: defaultdict(lambda: list()))

    ## Fill chain steps matrix
    for chain in chains:
        chain.createHypoTools()#allChainDicts)
        for stepNumber, chainStep in enumerate(chain.steps):
            chainName = chainStep.name.split('_')[0]
            chainStepsMatrix[stepNumber][chainName].append(chain)

    allSequences = []

    ## Matrix with steps lists generated. Creating filters for each cell
    for nstep in chainStepsMatrix:
        CFsequences = []
        stepDecisions = []
        stepAccs = []
        stepHypos = []

        for chainName in chainStepsMatrix[nstep]:
            chainsInCell = chainStepsMatrix[nstep][chainName]

            if not chainsInCell:
                continue


            stepCategoryAcc = ComponentAccumulator()

            stepHypo = None

            for chain in chainsInCell:
                for seq in chain.steps[nstep].sequences:
                    if seq.ca:
                        stepCategoryAcc.merge( seq.ca )

                    alg = seq.hypo.Alg
                    if stepHypo is None:
                        stepHypo = alg
                        stepHypos.append( alg )
                    stepCategoryAcc.addEventAlgo( alg )

            stepAccs.append( stepCategoryAcc )

            stepCategoryAcc.printConfig( True, True )
            firstChain = chainsInCell[0]

            if nstep == 0:
                filter_input = firstChain.L1decisions
            else:
                filter_input = []
                for sequence in firstChain.steps[nstep - 1].sequences:
                    filter_input += sequence.outputs

            # One aggregated filter per chain (one per column in matrix)
            filterName = 'Filter_{}'.format( firstChain.steps[nstep].name )
            filter_output =[]
            for i in filter_input:
                filter_output.append( CFNaming.filterOutName(filterName, i))
            sfilter = buildFilter(filterName,  filter_input)

            chainStep = firstChain.steps[nstep]

            CFseq = CFSequence( ChainStep=chainStep, FilterAlg=sfilter, connections=filter_output )
            CFsequences.append( CFseq )


            for sequence in chainStep.sequences:
                stepDecisions += sequence.outputs

            for chain in chainsInCell:
                sfilter.addChain(chain.name)

        allSequences.append(CFsequences)

        stepName = 'Step{}'.format(nstep)
        stepFilter = createStepFilterNode(stepName, CFsequences, dump=False)
        stepCF = createStepRecoNode('{}_{}'.format(HLTNode.name(), stepName), CFsequences, dump=False)

        from AthenaCommon.CFElements import findOwningSequence
        for oneAcc, cfseq, hypo in zip( stepAccs, CFsequences, stepHypos):
            owning = findOwningSequence( stepCF, hypo.getName() )
            acc.addSequence( owning )
            acc.merge( oneAcc, sequenceName = owning.getName() )
        summary = makeSummary('TriggerSummary{}'.format(stepName), stepDecisions)

        HLTNode += stepFilter
        HLTNode += stepCF
        HLTNode += summary
        if create_dot():
            stepCF_DataFlow_to_dot('{}_{}'.format(HLTNode.name(), stepName), CFsequences)
            stepCF_ControlFlow_to_dot(stepCF)
            all_DataFlow_to_dot(HLTNode.name(), allSequences)

        matrixDisplay( allSequences )
    return acc



def findCFSequences(filter_name, cfseqList):
      """
      searches for a filter, with given name, in the CF sequence list of this step
      """
      log.debug( "findCFSequences: filter base name %s", filter_name )
      foundFilters = [cfseq for cfseq in cfseqList if filter_name == cfseq.filter.Alg.name()]
      log.debug("found %d filters with base name %s", len( foundFilters ), filter_name)

      found=len(foundFilters)
      if found:
          return (found, foundFilters)
      return (found, None)


def buildFilter(filter_name,  filter_input):
    """
     Build the FILTER
     one filter per previous sequence at the start of the sequence: always create a new one
     if the previous hypo has more than one output, try to get all of them
     one filter per previous sequence: 1 input/previous seq, 1 output/next seq
    """
    from TriggerMenuMT.HLTMenuConfig.Menu.MenuComponents import  RoRSequenceFilterNode
    sfilter = RoRSequenceFilterNode(name=filter_name)
    for i in filter_input:
        sfilter.addInput(i)
    for i in filter_input:
        sfilter.addOutput(CFNaming.filterOutName(filter_name, i))

    log.debug("Added inputs to filter: %s", sfilter.getInputList())
    log.debug("Added outputs to filter: %s", sfilter.getOutputList())
    log.debug("Filter Done: %s", sfilter.Alg.name())

    return (sfilter)