Skip to content
Snippets Groups Projects
AutoConfigFlags.py 8.97 KiB
Newer Older
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
from PyUtils.MetaReader import read_metadata
from AthenaCommon.Logging import logging
from functools import lru_cache
Walter Lampl's avatar
Walter Lampl committed

msg = logging.getLogger('AutoConfigFlags')
# Module level cache of file-metadata:
_fileMetaData = dict()
class DynamicallyLoadMetadata:
    def __init__(self, filename):
        self.metadata = {}
        self.filename = filename
        self.metAccessLevel = 'lite'
        thisFileMD = read_metadata(filename, None, 'lite')
        self.metadata.update(thisFileMD[self.filename])
        msg.debug("Loaded using 'lite' %s", str(self.metadata))

    def _loadMore(self):
        thisFileMD = read_metadata(self.filename, None, 'peeker')
        self.metadata.update(thisFileMD[self.filename])

    def get(self, key, default):
        if key in self.metadata:
            return self.metadata[key]
        if self.metAccessLevel != 'peeker':
            msg.info("Looking into the file in 'peeker' mode as the configuration requires more details: %s ", key)
            self.metAccessLevel = 'peeker'
            self._loadMore()
            if key in self.metadata:
                return self.metadata[key]
        return default

    def __contains__(self, key):
        self.get(key, None)
        return key in self.metadata
        return self.get(key, None)

    def __repr__(self):
        return repr(self.metadata)
Walter Lampl's avatar
Walter Lampl committed
def GetFileMD(filenames):
    if isinstance(filenames, str):
        filenames = [filenames]
    if ['_ATHENA_GENERIC_INPUTFILE_NAME_'] == filenames:
        raise RuntimeError('Input file name not set, instead _ATHENA_GENERIC_INPUTFILE_NAME_ found. Cannot read metadata.')
    for filename in filenames:
        if filename not in _fileMetaData:
            msg.info("Obtaining metadata of auto-configuration by peeking into '%s'", filename)
            _fileMetaData[filename] = DynamicallyLoadMetadata(filename)
        if _fileMetaData[filename]['nentries'] not in [None, 0]: 
            return _fileMetaData[filename]
        else:
            msg.info("The file: %s has no entries, going to the next one for harvesting the metadata", filename)
    msg.info("No file with events found, returning anyways metadata associated to the first file %s", filenames[0])
    return _fileMetaData[filenames[0]]
def _initializeGeometryParameters(geoTag):
    """Read geometry database for all detectors"""

    from AtlasGeoModel import CommonGeoDB
    from PixelGeoModel import PixelGeoDB
    from LArGeoAlgsNV import LArGeoDB
    from MuonGeoModel import MuonGeoDB
    from AtlasGeoModel.AtlasGeoDBInterface import AtlasGeoDBInterface

    dbGeomCursor = AtlasGeoDBInterface(geoTag)
    dbGeomCursor.ConnectAndBrowseGeoDB()
    params = { 'Common' : CommonGeoDB.InitializeGeometryParameters(dbGeomCursor),
               'Pixel' : PixelGeoDB.InitializeGeometryParameters(dbGeomCursor),
               'LAr' : LArGeoDB.InitializeGeometryParameters(dbGeomCursor),
               'Muon' : MuonGeoDB.InitializeGeometryParameters(dbGeomCursor),
               'Luminosity' : CommonGeoDB.InitializeLuminosityDetectorParameters(dbGeomCursor),
             }

    return params


@lru_cache(maxsize=4)  # maxsize=1 should be enough for most jobs
def DetDescrInfo(geoTag):
    """Query geometry DB for detector description. Returns dictionary with
    detector description. Queries DB for each tag only once.
    geoTag: geometry tag (e.g. ATLAS-R2-2016-01-00-01)
    detDescrInfo = _initializeGeometryParameters(geoTag)
    detDescrInfo["geomTag"] = geoTag
    return detDescrInfo
@lru_cache(maxsize=4)  # maxsize=1 should be enough for most jobs
def getDefaultDetectors(geoTag):
    """Query geometry DB for detector description.
    Returns a set of detectors used in a geometry tag.

    geoTag: geometry tag (e.g. ATLAS-R2-2016-01-00-01)
    """
    detectors = set()
    detectors.add('Bpipe')

    if DetDescrInfo(geoTag)['Common']['Run'] == 'RUN4':
        detectors.add('ITkPixel')
        detectors.add('ITkStrip')
        if DetDescrInfo(geoTag)['Luminosity']['BCMPrime']:
            pass  # keep disabled for now
        if DetDescrInfo(geoTag)['Luminosity']['PLR']:
            detectors.add('PLR')
    else:
        detectors.add('Pixel')
        detectors.add('SCT')
        detectors.add('TRT')
        detectors.add('BCM')
    # TODO: wait for special table in the geo DB
    # if DetDescrInfo(geoTag)['Common']['Run'] == 'RUN4':
    #     detectors.add('BCMPrime')
    if DetDescrInfo(geoTag)['Common']['Run'] in ['RUN1', 'RUN2', 'RUN3'] and DetDescrInfo(geoTag)['Pixel']['DBM']:
        detectors.add('DBM')

    if DetDescrInfo(geoTag)['Common']['Run'] == 'RUN4':
        detectors.add('HGTD')

    detectors.add('LAr')
    detectors.add('Tile')
    if DetDescrInfo(geoTag)['Common']['Run'] in ['RUN1', 'RUN2', 'RUN3']:
        detectors.add('MBTS')

    detectors.add('MDT')
    detectors.add('RPC')
    detectors.add('TGC')
    if DetDescrInfo(geoTag)['Muon']['HasCSC']:
        detectors.add('CSC')
    if DetDescrInfo(geoTag)['Muon']['HasSTGC']:
        detectors.add('sTGC')
    if DetDescrInfo(geoTag)['Muon']['HasMM']:
        detectors.add('MM')

    return detectors


# Based on RunDMCFlags.py
def getRunToTimestampDict():
    # this wrapper is intended to avoid an initial import
    from .RunToTimestampData import RunToTimestampDict
    return RunToTimestampDict


def getInitialTimeStampsFromRunNumbers(runNumbers):
    """This is used to hold a dictionary of the form
    {152166:1269948352889940910, ...} to allow the
    timestamp to be determined from the run.
    """
    run2timestampDict =  getRunToTimestampDict()
    timeStamps = [run2timestampDict.get(runNumber,1) for runNumber in runNumbers] # Add protection here?
    return timeStamps


def getSpecialConfigurationMetadata(inputFiles):
    """Read in special simulation job option fragments based on metadata
    passed by the evgen stage
    """
    specialConfigDict = dict()
    legacyPreIncludeToCAPostInclude = { 'SimulationJobOptions/preInclude.AMSB.py' : 'Charginos.CharginosConfigNew.AMSB_Cfg',
                                        'SimulationJobOptions/preInclude.Monopole.py' :  'Monopole.MonopoleConfigNew.MonopoleCfg',
                                        'SimulationJobOptions/preInclude.Quirks.py' : 'Quirks.QuirksConfigNew.QuirksCfg',
                                        'SimulationJobOptions/preInclude.SleptonsLLP.py' : 'Sleptons.SleptonsConfigNew.SleptonsLLPCfg',
                                        'SimulationJobOptions/preInclude.GMSB.py' : 'Sleptons.SleptonsConfigNew.GMSB_Cfg',
                                        'SimulationJobOptions/preInclude.Qball.py' : 'Monopole.MonopoleConfigNew.QballCfg',
                                        'SimulationJobOptions/preInclude.RHadronsPythia8.py' : None, # FIXME
                                        'SimulationJobOptions/preInclude.fcp.py' : 'Monopole.MonopoleConfigNew.fcpCfg' }
    legacyPreIncludeToCAPreInclude = { 'SimulationJobOptions/preInclude.AMSB.py' : None,
                                       'SimulationJobOptions/preInclude.Monopole.py' :  'Monopole.MonopoleConfigNew.MonopolePreInclude',
                                       'SimulationJobOptions/preInclude.Quirks.py' : None,
                                       'SimulationJobOptions/preInclude.SleptonsLLP.py' : None,
                                       'SimulationJobOptions/preInclude.GMSB.py' : None,
                                       'SimulationJobOptions/preInclude.Qball.py' : 'Monopole.MonopoleConfigNew.QballPreInclude',
                                       'SimulationJobOptions/preInclude.RHadronsPythia8.py' : None, # FIXME
                                       'SimulationJobOptions/preInclude.fcp.py' : 'Monopole.MonopoleConfigNew.fcpPreInclude' }
    if len(inputFiles)>0:
        from AthenaConfiguration.AutoConfigFlags import GetFileMD
        specialConfigString = GetFileMD(inputFiles).get('specialConfiguration', '')
        ## Parse the specialConfiguration string
        ## Format is 'key1=value1;key2=value2;...'. or just '
        spcitems = specialConfigString.split(";")
        for spcitem in spcitems:
            #print spcitem
            ## Ignore empty or "NONE" substrings, e.g. from consecutive or trailing semicolons
            if not spcitem or spcitem.upper() == "NONE":
                continue
            ## If not in key=value format, treat as v, with k="preInclude"
            if "=" not in spcitem:
                spcitem = "preInclude=" + spcitem
            ## Handle k=v directives
            k, v = spcitem.split("=")
            if k == "preInclude" and v.endswith('.py'): # Translate old preIncludes into CA-based versions.
                v1 = legacyPreIncludeToCAPreInclude[v]
                if v1 is not None:
                    specialConfigDict[k] = v1
                v2 = legacyPreIncludeToCAPostInclude[v]
                if v2 is not None:
                    specialConfigDict['postInclude'] = v2
            else:
                specialConfigDict[k] = v
    return specialConfigDict