Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
HLTTriggerResultGetter.py 19.67 KiB
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration

from TriggerJobOpts.TriggerFlags import TriggerFlags
from AthenaConfiguration.AllConfigFlags import ConfigFlags
from AthenaCommon.Logging import logging
from AthenaCommon.GlobalFlags import globalflags

from AthenaCommon.AppMgr import ServiceMgr
from RecExConfig.Configured import Configured

from RecExConfig.RecFlags import rec

from TrigRoiConversion.TrigRoiConversionConf import RoiWriter


class xAODConversionGetter(Configured):
    def configure(self):
        from AthenaCommon.AlgSequence import AlgSequence
        topSequence = AlgSequence()

        #schedule xAOD conversions here
        from TrigBSExtraction.TrigBSExtractionConf import TrigHLTtoxAODConversion
        xaodconverter = TrigHLTtoxAODConversion()
        
        from TrigNavigation.TrigNavigationConfig import HLTNavigationOffline
        xaodconverter.Navigation = HLTNavigationOffline()

        from TrigEDMConfig.TriggerEDM import getPreregistrationList
        from TrigEDMConfig.TriggerEDM import getEFRun1BSList,getEFRun2EquivalentList,getL2Run1BSList,getL2Run2EquivalentList
        xaodconverter.Navigation.ClassesToPreregister = getPreregistrationList(ConfigFlags.Trigger.EDMVersion)

        # we want only containers from Run 1 with the BS tag
        xaodconverter.BStoxAOD.ContainersToConvert = getL2Run1BSList() + getEFRun1BSList()
        xaodconverter.BStoxAOD.NewContainers = getL2Run2EquivalentList() + getEFRun2EquivalentList()

        xaodconverter.HLTResultKey="HLTResult_EF"
        topSequence += xaodconverter

        # define list of HLT xAOD containers to be written to the output root file
        # (previously this was defined in HLTTriggerResultGetter def configure)
        from TrigEDMConfig.TriggerEDM import getTriggerEDMList
        self.xaodlist = {}
        self.xaodlist.update( getTriggerEDMList(TriggerFlags.ESDEDMSet(), 2 ))

        return True
    
        

class ByteStreamUnpackGetter(Configured):
    def configure(self):
        log = logging.getLogger("ByteStreamUnpackGetter")

        log.info( "TriggerFlags.dataTakingConditions: %s", TriggerFlags.dataTakingConditions() )
        hasHLT = TriggerFlags.dataTakingConditions()=='HltOnly' or TriggerFlags.dataTakingConditions()=='FullTrigger'
        if not hasHLT:
            log.info("Will not configure HLT BS unpacking because dataTakingConditions flag indicates HLT was disabled")
            return True

        # Define the decoding sequence
        from TrigHLTResultByteStream.TrigHLTResultByteStreamConf import HLTResultMTByteStreamDecoderAlg
        from TrigOutputHandling.TrigOutputHandlingConf import TriggerEDMDeserialiserAlg
        from AthenaCommon.CFElements import seqAND
        decoder = HLTResultMTByteStreamDecoderAlg()
        deserialiser = TriggerEDMDeserialiserAlg("TrigDeserialiser")
        deserialiser.ExtraOutputs += [('xAOD::TrigCompositeContainer' , 'StoreGateSvc+HLTNav_Summary_OnlineSlimmed')]
        decodingSeq = seqAND("HLTDecodingSeq")
        decodingSeq += decoder  # BS -> HLTResultMT
        decodingSeq += deserialiser  # HLTResultMT -> xAOD

        # Append the decoding sequence to topSequence
        from AthenaCommon.AlgSequence import AlgSequence
        topSequence = AlgSequence()
        topSequence += decodingSeq

        log.debug("Configured HLT result BS decoding sequence")
        return True

class ByteStreamUnpackGetterRun1or2(Configured):
    def configure(self):

        log = logging.getLogger("ByteStreamUnpackGetterRun1or2")
        from AthenaCommon.AlgSequence import AlgSequence 
        topSequence = AlgSequence()
        
        #if TriggerFlags.readBS():
        log.info( "TriggerFlags.dataTakingConditions: %s", TriggerFlags.dataTakingConditions() )
        # in MC this is always FullTrigger
        hasHLT = TriggerFlags.dataTakingConditions() in ('HltOnly', 'FullTrigger')
        
        # BS unpacking
        from TrigBSExtraction.TrigBSExtractionConf import TrigBSExtraction
        extr = TrigBSExtraction()

        # Add fictional output to ensure data dependency in AthenaMT
        # Keeping the run 2 workflow, we run this after we have put the full serialised navigation into xAOD
        extr.ExtraInputs += [("xAOD::TrigNavigation", "StoreGateSvc+TrigNavigation")]
        extr.ExtraOutputs += [("TrigBSExtractionOutput", "StoreGateSvc+TrigBSExtractionOutput")]
        
        if hasHLT:
            from TrigNavigation.TrigNavigationConfig import HLTNavigationOffline
            extr.NavigationForL2 = HLTNavigationOffline("NavigationForL2")
            # Ignore the L2 TrigPassBits to avoid clash with EF (ATR-23411)
            extr.NavigationForL2.ClassesFromPayloadIgnore = ["TrigPassBits#passbits"]

            extr.Navigation = HLTNavigationOffline()

            from TrigEDMConfig.TriggerEDM import getEDMLibraries
            extr.Navigation.Dlls = getEDMLibraries()            

            from TrigEDMConfig.TriggerEDM import getPreregistrationList
            extr.Navigation.ClassesToPreregister = getPreregistrationList(ConfigFlags.Trigger.EDMVersion)

            from eformat import helper as efh
            robIDMap = {}   # map of result keys and their ROB ID
            if ConfigFlags.Trigger.EDMVersion == 1:  # Run-1 has L2 and EF result
                robIDMap["HLTResult_L2"] = efh.SourceIdentifier(efh.SubDetector.TDAQ_LVL2, 0).code()
                robIDMap["HLTResult_EF"] = efh.SourceIdentifier(efh.SubDetector.TDAQ_EVENT_FILTER, 0).code()
                extr.L2ResultKey = "HLTResult_L2"
                extr.HLTResultKey = "HLTResult_EF"
            else:
                robIDMap["HLTResult_HLT"] = efh.SourceIdentifier(efh.SubDetector.TDAQ_HLT, 0).code()
                extr.L2ResultKey = ""
                extr.HLTResultKey = "HLTResult_HLT"

            # Configure DataScouting
            from PyUtils.MetaReaderPeeker import metadata
            if 'stream' in metadata:
                stream_local = metadata['stream']   # e.g. calibration_DataScouting_05_Jets
                if stream_local.startswith('calibration_DataScouting_'):
                    ds_tag = '_'.join(stream_local.split('_')[1:3])   # e.g. DataScouting_05
                    ds_id = int(stream_local.split('_')[2])           # e.g. 05
                    robIDMap[ds_tag] = efh.SourceIdentifier(efh.SubDetector.TDAQ_HLT, ds_id).code()
                    extr.DSResultKeys += [ds_tag]

        else:
            # if data doesn't have HLT info set HLTResult keys as empty strings to avoid warnings
            # but the extraction algorithm must run
            extr.L2ResultKey = ""
            extr.HLTResultKey = ""
            extr.DSResultKeys = []

        topSequence += extr

        # Add all HLTResult keys to AddressProvider
        for k in robIDMap.keys():
            ServiceMgr.ByteStreamAddressProviderSvc.TypeNames += [ f"HLT::HLTResult/{k}" ]

        # Create necessary public tools
        from AthenaCommon.AppMgr import ToolSvc
        from TrigSerializeTP.TrigSerializeTPConf import TrigSerTPTool
        from TrigEDMConfig.TriggerEDM import getTPList
        ToolSvc += TrigSerTPTool(TPMap = getTPList((ConfigFlags.Trigger.EDMVersion)))
        
        from TrigSerializeCnvSvc.TrigSerializeCnvSvcConf import TrigSerializeConvHelper
        ToolSvc += TrigSerializeConvHelper(doTP = True)

        from TrigHLTResultByteStream.TrigHLTResultByteStreamConf import HLT__HLTResultByteStreamTool
        ToolSvc += HLT__HLTResultByteStreamTool(HLTResultRobIdMap = robIDMap)

        return True


class TrigDecisionGetter(Configured):
    def configure(self):
        log = logging.getLogger("TrigDecisionGetter")

        from AthenaCommon.AlgSequence import AlgSequence
        topSequence = AlgSequence()

        from TrigDecisionMaker.TrigDecisionMakerConfig import TrigDecisionMakerMT
        tdm = TrigDecisionMakerMT('TrigDecMakerMT')

        if not TriggerFlags.readBS():
            # Construct trigger bits from HLTNav_summary instead of reading from BS
            from TrigOutputHandling.TrigOutputHandlingConf import TriggerBitsMakerTool
            tdm.BitsMakerTool = TriggerBitsMakerTool()

        topSequence += tdm
        log.info('xTrigDecision writing enabled')

        return True

class TrigDecisionGetterRun1or2(Configured):
    #class to setup the writing or just making of TrigDecisionObject
    def configure(self):
        
        log = logging.getLogger("TrigDecisionGetterRun1or2")

        from AthenaCommon.AlgSequence import AlgSequence 
        topSequence = AlgSequence()
        
        #if hasOnlyLVL1:
        #from RecExConfig.ObjKeyStore import objKeyStore
        #objKeyStore.addStreamESD('TrigDec::TrigDecision','TrigDecision')
        #objKeyStore.addStreamAOD('TrigDec::TrigDecision','TrigDecision')
        
        from RecExConfig.RecFlags import rec
        if ( rec.doWriteESD() or rec.doWriteAOD() or rec.doESD() or rec.doAOD() ) and \
               ( not ( rec.readAOD() or rec.readESD() or rec.doWriteBS()) ):
            log.info("Will write TrigDecision object to storegate")
            
            from TrigDecisionMaker.TrigDecisionMakerConfig import WriteTrigDecision
            trigDecWriter = WriteTrigDecision()  # noqa: F841
            if (ConfigFlags.Trigger.EDMVersion == 1 or ConfigFlags.Trigger.EDMVersion == 2) and ConfigFlags.Trigger.doEDMVersionConversion:
                from TrigNavTools.NavConverterConfig import createNavConverterAlg
                navCnvAlg = createNavConverterAlg()
                navCnvAlg.TrigConfigSvc = "HLTConfigSvcRun3"
                navCnvAlg.ExtraInputs += [("TrigBSExtractionOutput", "StoreGateSvc+TrigBSExtractionOutput")]
                topSequence += navCnvAlg

#           WritexAODTrigDecision() is called within WriteTrigDecision()

            # inform TD maker that some parts may be missing
            if TriggerFlags.dataTakingConditions()=='Lvl1Only':
                topSequence.TrigDecMaker.doL2=False
                topSequence.TrigDecMaker.doEF=False
                topSequence.TrigDecMaker.doHLT=False
                topSequence.TrigNavigationCnvAlg.doL2 = False
                topSequence.TrigNavigationCnvAlg.doEF = False
                topSequence.TrigNavigationCnvAlg.doHLT = False
            elif TriggerFlags.dataTakingConditions()=='HltOnly':
                from AthenaCommon.AlgSequence import AlgSequence
                topSequence.TrigDecMaker.doL1=False

            if ConfigFlags.Trigger.EDMVersion == 1:  # Run-1 has L2 and EF result
                topSequence.TrigDecMaker.doHLT = False
                topSequence.TrigNavigationCnvAlg.doL2 = False
                topSequence.TrigNavigationCnvAlg.doHLT = False
            else:
                topSequence.TrigDecMaker.doL2 = False
                topSequence.TrigDecMaker.doEF = False
                topSequence.TrigNavigationCnvAlg.doL2 = False
                topSequence.TrigNavigationCnvAlg.doEF = False
                pass
                
        else:
            log.info("Will not write TrigDecision object to storegate")
    
        return True
    
    
class HLTTriggerResultGetter(Configured):

    log = logging.getLogger("HLTTriggerResultGetter.py")

    def _AddOPIToESD(self):

        log = logging.getLogger("HLTTriggerResultGetter.py")        
        
        if rec.doESD():
            from PyUtils.MetaReaderPeeker import metadata
            if 'stream' in metadata:
                stream = metadata['stream']
                log.debug("the stream found in 'metadata' is %s", stream)
                if "express" in stream:
                    from TrigEDMConfig.TriggerEDM import getTypeAndKey,EDMDetails
                    type,key=getTypeAndKey("TrigOperationalInfo#HLT_EXPRESS_OPI_HLT")
                    if 'collection'in EDMDetails[type]:
                        colltype = EDMDetails[type]['collection']
                        log.info("Adding HLT_EXPRESS_OPI_HLT to ESD for stream %s", stream)
                        from RecExConfig.ObjKeyStore import objKeyStore
                        objKeyStore.addStreamESD(colltype, key)
                    return True
            else:
                log.warning("Could not determine stream of bytestream file, not adding HLT_EXPRESS_OPI_HLT to ESD.")
        return False

    def configure(self):

        log = logging.getLogger("HLTTriggerResultGetter.py")
        from RecExConfig.ObjKeyStore import objKeyStore

        # Set AODFULL for data unless it was set explicitly already
        if TriggerFlags.AODEDMSet.isDefault() and globalflags.DataSource()=='data':
            TriggerFlags.AODEDMSet = 'AODFULL'
            
        from AthenaCommon.AlgSequence import AlgSequence
        topSequence = AlgSequence()
        log.info("BS unpacking (TF.readBS): %d", TriggerFlags.readBS() )
        if TriggerFlags.readBS():
            if ConfigFlags.Trigger.EDMVersion == 1 or \
               ConfigFlags.Trigger.EDMVersion == 2:
                bs = ByteStreamUnpackGetterRun1or2()  # noqa: F841
            elif ConfigFlags.Trigger.EDMVersion >=3:
                bs = ByteStreamUnpackGetter()  # noqa: F841
            else:
                raise RuntimeError("Invalid EDMVersion=%s " % ConfigFlags.Trigger.EDMVersion)

        xAODContainers = {}

        if ConfigFlags.Trigger.EDMVersion == 1:
            xaodcnvrt = xAODConversionGetter()
            xAODContainers = xaodcnvrt.xaodlist

        if ConfigFlags.Trigger.EDMVersion == 1 or \
           ConfigFlags.Trigger.EDMVersion == 2:
            if rec.doTrigger() or TriggerFlags.doTriggerConfigOnly():
                tdt = TrigDecisionGetterRun1or2()  # noqa: F841
        elif ConfigFlags.Trigger.EDMVersion >= 3:
            if TriggerFlags.readBS():
                tdt = TrigDecisionGetter()  # noqa: F841
        else:
            raise RuntimeError("Invalid EDMVersion=%s " % ConfigFlags.Trigger.EDMVersion)

        # Temporary hack to add Run-3 navigation to ESD and AOD
        if (rec.doESD() or rec.doAOD()) and ConfigFlags.Trigger.EDMVersion == 3:
            # The hack with wildcards is needed for BS->ESD because we don't know the exact keys
            # of HLT navigation containers before unpacking them from the BS event.
            objKeyStore._store['streamESD'].allowWildCard(True)
            objKeyStore._store['streamAOD'].allowWildCard(True)
            objKeyStore.addManyTypesStreamESD(['xAOD::TrigCompositeContainer#HLTNav*',
                                               'xAOD::TrigCompositeAuxContainer#HLTNav*'])
            objKeyStore.addManyTypesStreamAOD(['xAOD::TrigCompositeContainer#HLTNav*',
                                               'xAOD::TrigCompositeAuxContainer#HLTNav*'])

        # TrigJetRec additions
        if rec.doWriteESD():
            objKeyStore.addStreamESD("JetKeyDescriptor","JetKeyMap")
            objKeyStore.addStreamESD("JetMomentMap","TrigJetRecMomentMap")

        if rec.doWriteAOD():
            objKeyStore.addStreamAOD("JetKeyDescriptor","JetKeyMap")
            objKeyStore.addStreamAOD("JetMomentMap","TrigJetRecMomentMap")
                    
        if rec.doAOD() or rec.doWriteAOD():
            # schedule the RoiDescriptorStore conversion
            # log.warning( "HLTTriggerResultGetter - setting up RoiWriter" )
            roiWriter = RoiWriter()
            # Add fictional input to ensure data dependency in AthenaMT
            roiWriter.ExtraInputs += [("TrigBSExtractionOutput", "StoreGateSvc+TrigBSExtractionOutput")]
            topSequence += roiWriter
            # write out the RoiDescriptorStores
            from TrigEDMConfig.TriggerEDMRun2 import TriggerRoiList
            objKeyStore.addManyTypesStreamAOD( TriggerRoiList )

        #Are we adding operational info objects in ESD?
        added=self._AddOPIToESD()
        if added:
            log.debug("Operational Info object HLT_EXPRESS_OPI_HLT with extra information about express stream prescaling added to the data.")
        


        # ESD objects definitions
        _TriggerESDList = {}

        from TrigEDMConfig.TriggerEDM import getTriggerEDMList 
        # we have to store xAOD containers in the root file, NOT AOD,
        # if the xAOD container list is not empty
        if(xAODContainers):
            _TriggerESDList.update( xAODContainers )
        else:
            _TriggerESDList.update( getTriggerEDMList(TriggerFlags.ESDEDMSet(),  ConfigFlags.Trigger.EDMVersion) ) 
        
        log.info("ESD content set according to the ESDEDMSet flag: %s and EDM version %d", TriggerFlags.ESDEDMSet(), ConfigFlags.Trigger.EDMVersion)

        # AOD objects choice
        _TriggerAODList = {}
        
        #from TrigEDMConfig.TriggerEDM import getAODList    
        _TriggerAODList.update( getTriggerEDMList(TriggerFlags.AODEDMSet(),  ConfigFlags.Trigger.EDMVersion) ) 

        log.info("AOD content set according to the AODEDMSet flag: %s and EDM version %d", TriggerFlags.AODEDMSet(),ConfigFlags.Trigger.EDMVersion)

        log.debug("ESD EDM list: %s", _TriggerESDList)
        log.debug("AOD EDM list: %s", _TriggerAODList)
        
        # Highlight what is in AOD list but not in ESD list, as this can cause
        # the "different number of entries in branch" problem, when it is in the
        # AOD list but the empty container per event is not created
        # Just compares keys of dicts, which are the class names, not their string keys in StoreGate
        not_in = [ element for element in  _TriggerAODList if element not in _TriggerESDList ]
        if (len(not_in)>0):
            log.warning("In AOD list but not in ESD list: ")
            log.warning(not_in)
        else:
            log.info("AOD list is subset of ESD list - good.")


        def _addSlimmingRun2(stream, edm):
            from TrigNavTools.TrigNavToolsConfig import navigationThinningSvc

            edmlist = list(y.split('-')[0] for x in edm.values() for y in x) #flatten names
          
            svc = navigationThinningSvc ({'name':'HLTNav_%s'%stream, 'mode':'cleanup', 
                                          'result':'HLTResult_HLT',
                                          'features':edmlist})

            from OutputStreamAthenaPool.CreateOutputStreams import registerTrigNavThinningSvc
            registerTrigNavThinningSvc (stream, svc)

            log.info("Configured slimming of HLT for %s", stream)
            print(svc)  # noqa: ATL901
            del edmlist


        if ConfigFlags.Trigger.EDMVersion == 1 or ConfigFlags.Trigger.EDMVersion == 2:

            # Run 1, 2 slimming
            if TriggerFlags.doNavigationSlimming() and rec.readRDO() and rec.doWriteAOD():
                _addSlimmingRun2('StreamAOD', _TriggerESDList ) #Use ESD item list also for AOD!
                log.info("configured navigation slimming for AOD output")
                
            if TriggerFlags.doNavigationSlimming() and rec.readRDO() and rec.doWriteESD():
                _addSlimmingRun2('StreamESD', _TriggerESDList )                
                log.info("configured navigation slimming for ESD output")

        if ConfigFlags.Trigger.EDMVersion >= 3:
            # Change in the future to 'if EDMVersion >= 3 or doEDMVersionConversion:'

            # Run 3 slimming
            if ConfigFlags.Trigger.doNavigationSlimming: 
                from TrigNavSlimmingMT.TrigNavSlimmingMTConfig import getTrigNavSlimmingMTConfig
                from AthenaCommon.Configurable import Configurable
                Configurable.configurableRun3Behavior += 1
                from AthenaConfiguration.ComponentAccumulator import appendCAtoAthena
                appendCAtoAthena( getTrigNavSlimmingMTConfig(ConfigFlags) )
                Configurable.configurableRun3Behavior -= 1
            else:
                log.info("doNavigationSlimming is False, won't schedule run 3 navigation slimming")

        objKeyStore.addManyTypesStreamESD( _TriggerESDList )
        objKeyStore.addManyTypesStreamAOD( _TriggerAODList )        
            
        return True