diff --git a/Reconstruction/RecJobTransforms/CMakeLists.txt b/Reconstruction/RecJobTransforms/CMakeLists.txt index 85424aa442489cd450c201684af59938a13c0f1a..ef2583c4b6f17feb8b6835ecc2bcfcf1db3d78f9 100644 --- a/Reconstruction/RecJobTransforms/CMakeLists.txt +++ b/Reconstruction/RecJobTransforms/CMakeLists.txt @@ -4,7 +4,7 @@ atlas_subdir( RecJobTransforms ) # Install python modules -atlas_install_python_modules( python/*.py ) +atlas_install_python_modules( python/*.py POST_BUILD_CMD ${ATLAS_FLAKE8} ) # Install RDOtoRDOtrigger job opts with flake8 check atlas_install_joboptions( share/skeleton.RDOtoRDOtrigger*.py POST_BUILD_CMD ${ATLAS_FLAKE8} ) # Install other job opts without flake8 check diff --git a/Reconstruction/RecJobTransforms/python/BsConfig.py b/Reconstruction/RecJobTransforms/python/BsConfig.py index 3010f746eedbdd47b873ab0bf0d347df8af8dbcf..56439aea92480a965378a7267da6bfebac21fa77 100644 --- a/Reconstruction/RecJobTransforms/python/BsConfig.py +++ b/Reconstruction/RecJobTransforms/python/BsConfig.py @@ -1,9 +1,8 @@ -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration -import os __all__ = [] -from PyJobTransformsCore.TransformConfig import * +from PyJobTransformsCore.TransformConfig import TransformConfig class BsConfig(TransformConfig): # prevent any mistypings by not allowing dynamic members diff --git a/Reconstruction/RecJobTransforms/python/MixStreamConfig.py b/Reconstruction/RecJobTransforms/python/MixStreamConfig.py index ba6a0facc90e37b26bfac95cc03dee10725d40a0..7073427d77df4e6581995640e8fae805c721a1d4 100644 --- a/Reconstruction/RecJobTransforms/python/MixStreamConfig.py +++ b/Reconstruction/RecJobTransforms/python/MixStreamConfig.py @@ -1,10 +1,3 @@ -from __future__ import print_function -from __future__ import division -from builtins import object -from future.utils import iteritems - -from builtins import int - # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration ####################################################################################################### @@ -17,106 +10,103 @@ from builtins import int # which also has (that) MixStreamConfig. ####################################################################################################### -#__all__ = [] - -from PyJobTransformsCore.TransformConfig import * -from RecJobTransforms.MixingSelector import * -from math import sqrt -import re +from PyJobTransformsCore.TransformConfig import Descriptor, UniqueList, JobConfigError, TransformConfig, Integer +from RecJobTransforms.MixingSelector import MixingSelector +import os class ListOfSelectors(UniqueList): - """List of class MixingSelector, for configuration of job.""" - def __init__(self,doc,default=None,allowedValues=None): - UniqueList.__init__(self,doc,default,allowedValues) - def _checkType(self,variableName,value): - """Add check that all entries are MixingSelectors""" - # check that <value> is a list or tuple - value = UniqueList._checkType(self,variableName,value) - # check that all entries are strings - for v in value: - valType = type(v).__name__ - if not isinstance(v, MixingSelector): - raise JobConfigError("Entry %r in %s is not a MixingSelector (but an %s)" % \ - (v,variableName,valType) ) - return value - + """List of class MixingSelector, for configuration of job.""" + def __init__(self,doc,default=None,allowedValues=None): + UniqueList.__init__(self,doc,default,allowedValues) + def _checkType(self,variableName,value): + """Add check that all entries are MixingSelectors""" + # check that <value> is a list or tuple + value = UniqueList._checkType(self,variableName,value) + # check that all entries are strings + for v in value: + valType = type(v).__name__ + if not isinstance(v, MixingSelector): + raise JobConfigError("Entry %r in %s is not a MixingSelector (but an %s)" % + (v,variableName,valType) ) + return value + class FloatList(Descriptor): - """List of floats, which can have repeated entries. - List of allowed value has unit length and is the minimum SIZE of the list.""" - def __init__(self,doc,default=None,allowedValues=None): - if default is None: default = [] - Descriptor.__init__(self,doc,default,allowedValues) - def _checkType(self,variableName,value): - """Check that <value> is a list or tuple, and make the tuple a list.""" - valType = type(value).__name__ - if not isinstance(value, list) and not isinstance(value, tuple): - raise JobConfigError('%s should be a list or tuple. Got %s instead.' & (variableName, valType)) - if isinstance(value, tuple): - value = list(value) - for v in value: - valType = type(v).__name__ - if not isinstance(v, float): - raise JobConfigError("Entry %r in %s is not a float (but an %s)" % (v,variableName,valType) ) - return value - - def _checkValue(self,variableName,value): - # check that size is allowed - if len(self.allowedValues()): - allowed = self.allowedValues()[0] - if len(value) < allowed: raise JobConfigError( 'The list %s has size %r, less than %s' % (variableName, len(value), allowed) ) - return value - - def _checkAllowedValues(self,variableName,allowedValues): - # just get the first value and ensure it is a positive integer - newAllowed=[] - if not allowedValues: - return newAllowed - if isinstance(allowedValues[0], int) and allowedValues[0] >= 0: - newAllowed += [ allowedValues[0] ] - return newAllowed - + """List of floats, which can have repeated entries. + List of allowed value has unit length and is the minimum SIZE of the list.""" + def __init__(self,doc,default=None,allowedValues=None): + if default is None: default = [] + Descriptor.__init__(self,doc,default,allowedValues) + def _checkType(self,variableName,value): + """Check that <value> is a list or tuple, and make the tuple a list.""" + valType = type(value).__name__ + if not isinstance(value, list) and not isinstance(value, tuple): + raise JobConfigError('%s should be a list or tuple. Got %s instead.' & (variableName, valType)) + if isinstance(value, tuple): + value = list(value) + for v in value: + valType = type(v).__name__ + if not isinstance(v, float): + raise JobConfigError("Entry %r in %s is not a float (but an %s)" % (v,variableName,valType) ) + return value + + def _checkValue(self,variableName,value): + # check that size is allowed + if len(self.allowedValues()): + allowed = self.allowedValues()[0] + if len(value) < allowed: raise JobConfigError( 'The list %s has size %r, less than %s' % (variableName, len(value), allowed) ) + return value + + def _checkAllowedValues(self,variableName,allowedValues): + # just get the first value and ensure it is a positive integer + newAllowed=[] + if not allowedValues: + return newAllowed + if isinstance(allowedValues[0], int) and allowedValues[0] >= 0: + newAllowed += [ allowedValues[0] ] + return newAllowed + class MixStreamConfig(TransformConfig): - # not allow dynamic members - __slots__ = () - NumberOfBatchJobs = Integer("Number of batch jobs in total, for dividing up input files",5) - NumberOfSFO = Integer("Number of SFO simulated in test, for setting event numbers",5) - NumberOfLumiBlocksPerRun = Integer("Number of LB per run simulated in test, for setting run numbers",30) - RunNumberBase = Integer("Base for calculating run number", 1) - LuminosityPartition = FloatList("Division of the total task luminosity as a list of floats",[1.0,], [1]) - JobSelectors = ListOfSelectors("List of unique MixingSelectors for input config. See Reconstruction.MixingSelector for details.",[]) - - def WhichSFO(self, WhichPartition): - if WhichPartition >= self.NumberOfBatchJobs: return -1 - return WhichPartition % self.NumberOfSFO - def WhichLumiBlock(self, WhichPartition): - if WhichPartition >= self.NumberOfBatchJobs: return -1 - return (WhichPartition/self.NumberOfSFO) % self.NumberOfLumiBlocksPerRun - def WhichRunNumber(self, WhichPartition): - if WhichPartition >= self.NumberOfBatchJobs: return -1 - return self.RunNumberBase + (WhichPartition/self.NumberOfSFO)/self.NumberOfLumiBlocksPerRun - def FinishedLuminosityFraction(self, WhichPartition): - if WhichPartition >= self.NumberOfBatchJobs: return -1 - lp = self.LuminosityPartition - theLB = (WhichPartition/self.NumberOfSFO) - totLB = (self.NumberOfBatchJobs/self.NumberOfSFO) - return sum(lp[0:theLB])/sum(lp[0:totLB]) - def ThisLuminosityFraction(self, WhichPartition): - if WhichPartition >= self.NumberOfBatchJobs: return -1 - lp = self.LuminosityPartition - theLB = (WhichPartition/self.NumberOfSFO) - totLB = (self.NumberOfBatchJobs/self.NumberOfSFO) - return ((lp[theLB])/sum(lp[0:totLB])) - def __init__(self,name='mixStreamConfig',metaData=[]): - TransformConfig.__init__(self,name,metaData) + # not allow dynamic members + __slots__ = () + NumberOfBatchJobs = Integer("Number of batch jobs in total, for dividing up input files",5) + NumberOfSFO = Integer("Number of SFO simulated in test, for setting event numbers",5) + NumberOfLumiBlocksPerRun = Integer("Number of LB per run simulated in test, for setting run numbers",30) + RunNumberBase = Integer("Base for calculating run number", 1) + LuminosityPartition = FloatList("Division of the total task luminosity as a list of floats",[1.0,], [1]) + JobSelectors = ListOfSelectors("List of unique MixingSelectors for input config. See Reconstruction.MixingSelector for details.",[]) + + def WhichSFO(self, WhichPartition): + if WhichPartition >= self.NumberOfBatchJobs: return -1 + return WhichPartition % self.NumberOfSFO + def WhichLumiBlock(self, WhichPartition): + if WhichPartition >= self.NumberOfBatchJobs: return -1 + return (WhichPartition/self.NumberOfSFO) % self.NumberOfLumiBlocksPerRun + def WhichRunNumber(self, WhichPartition): + if WhichPartition >= self.NumberOfBatchJobs: return -1 + return self.RunNumberBase + (WhichPartition/self.NumberOfSFO)/self.NumberOfLumiBlocksPerRun + def FinishedLuminosityFraction(self, WhichPartition): + if WhichPartition >= self.NumberOfBatchJobs: return -1 + lp = self.LuminosityPartition + theLB = (WhichPartition/self.NumberOfSFO) + totLB = (self.NumberOfBatchJobs/self.NumberOfSFO) + return sum(lp[0:theLB])/sum(lp[0:totLB]) + def ThisLuminosityFraction(self, WhichPartition): + if WhichPartition >= self.NumberOfBatchJobs: return -1 + lp = self.LuminosityPartition + theLB = (WhichPartition/self.NumberOfSFO) + totLB = (self.NumberOfBatchJobs/self.NumberOfSFO) + return ((lp[theLB])/sum(lp[0:totLB])) + def __init__(self,name='mixStreamConfig',metaData=[]): + TransformConfig.__init__(self,name,metaData) # make default configuration object -- once. -if not 'mixStreamConfig' in dir(): - mixStreamConfig = MixStreamConfig() +if 'mixStreamConfig' not in dir(): + mixStreamConfig = MixStreamConfig() - + +############################################ +# MixingPartitioner ############################################ -# MixingPartitioner -############################################ # Usage: # m = MixingPartitioner() #Selects files # m.SetPartition(thisJob) #for batch job number \<thisJob\> @@ -127,112 +117,111 @@ if not 'mixStreamConfig' in dir(): # ...StreamAlg.SamplesToWeight = m.ListOfSamplesToWeight() # ...StreamAlg.SampleWeights = m.ListOfSampleWeights() ############################################ - + class MixingPartitioner(object): - """ This collects input files to add to the stager, and generates python to configure EventMixer and your optional MCRunNumber-based event weighting service. """ - def __init__(self): - self.__ThisPartition = -1 - self.__Selectors = {} #dataset aliases (like 5009) : MixingSelector object - self.__DatasetsToWeight = {} #hash of dataset weights (indexed by MC run number) - self.__ThisScaleFactor = -1 - self.__ScaleFactorSum = -1 - self.__NumPartitions = -1 - - def SetPartition(self,thisPartition): - """Must be done when partitionArg and mixStreamConfig gets a value.""" - if self.__ThisPartition < 0 : #first time - for s in mixStreamConfig.JobSelectors: - self.__Selectors[s.DSID()] = s - self.__NumPartitions = mixStreamConfig.NumberOfBatchJobs - - self.__ThisPartition = thisPartition - if self.__ThisPartition < 0 : - print("*** MixingPartitioner: WARNING you requested an invalid partition number: setting to 0. *** ") - self.__ThisPartition = 0; - else: - if self.__ThisPartition >= (self.__NumPartitions - 1): - print("*** MixingPartitioner: WARNING you requested an invalid (big) partition number: setting to", self.__NumPartitions - 1, ". *** ") - self.__ThisPartition = self.__NumPartitions - 1; - else: - print("*** MixingPartitioner: INFO you requested partition number", self.__ThisPartition) - self.__ThisScaleFactor = mixStreamConfig.ThisLuminosityFraction(self.__ThisPartition) #the job config knows partitioning - self.__ScaleFactorSum = mixStreamConfig.FinishedLuminosityFraction(self.__ThisPartition) #the job config knows partitioning - - - def AddSample(self, fileList, ksample, nEvents, newAlias=-1, nEventsPerFile=50): - if not (ksample in self.__Selectors): - self.__Selectors[ksample] = MixingSelector(self.__NumPartitions,ksample,nEvents,nEventsPerFile) - self.__Selectors[ksample].addNewCatalog(fileList) - else: - self.__Selectors[ksample].addNewCatalog(fileList) - if newAlias > 0: self.__Selectors[ksample].addAlias(newAlias) - - def ConfigureServices(self): - EventMixerList = [] - serviceString = "#Input selectors " - soFar = self.SoFar() - perSFOnow = self.PerSFOnow() - for ksample in self.__Selectors: - EventMixerList += [ self.__Selectors[ksample].trigger(soFar,perSFOnow) ] - serviceString += "\n%s = EventSelectorAthenaPool( '%s' )" % (self.__Selectors[ksample].name(), self.__Selectors[ksample].name()) - serviceString += "\nsvcMgr += %s" % self.__Selectors[ksample].name() - serviceString += "\n%s.InputCollections= ['%s']" % \ - (self.__Selectors[ksample].name(), ("', '").join(self.__Selectors[ksample].FilesToStage(soFar,perSFOnow))) - return EventMixerList, serviceString - def ScaleFactor(self): - totalLumi = 1 - return self.__ThisScaleFactor * totalLumi - def PerSFOnow(self): - return self.__ThisScaleFactor/mixStreamConfig.NumberOfSFO - def SoFar(self): - return self.__ScaleFactorSum + self.PerSFOnow() * (self.__ThisPartition % mixStreamConfig.NumberOfSFO) - def ConfigureSelectors(self): - print("*** MixingPartitioner: INFO Partition ", self.__ThisPartition, " has a LB beginning after ", self.__ScaleFactorSum, "% of the input has been processed.") - print("*** MixingPartitioner: INFO Partition ", self.__ThisPartition, " has a LB reading ", self.__ThisScaleFactor * 100, "% of the input. ") - totalLumi = 1 - print(":::::::::: STREAMING JOB CONFIGURATION: LHC INSTANTANEOUS LUMINOSITY= %f x 10^%i cm^-2 s^-1" % (self.ScaleFactor(), 31)) - soFar = self.SoFar() - perSFOnow = self.PerSFOnow() - for ksample in self.__Selectors: - sel = self.__Selectors[ksample] - if not sel.isSufficient(): # prescale weighting is a TASK-GLOBAL! Read more, don't scale more. - print("*** MixingPartitioner: WARNING not enough events for %s -- (%s) will be weighted." % \ - (sel.name(), ','.join([str(ali) for ali in sel.Equivalents()]) )) - for aliasedID in sel.Equivalents(): - self.__DatasetsToWeight[ aliasedID ] = sel.weight() - print("*** MixingPartitioner: INFO \t%s FirstFile=%s EvOffset=%i NeV=%f" % \ - (sel.name(), sel.firstFileIndex(soFar), sel.firstEventInFile(soFar), sel.totalEventsThisJob(perSFOnow))) - - def preStageInputFiles(self,CastorOrDCache = 'Castor'): - from PyJobTransformsCore.FilePreStager import theFileStagerRobot - if CastorOrDCache == 'Castor': + """ This collects input files to add to the stager, and generates python to configure EventMixer and your optional MCRunNumber-based event weighting service. """ + def __init__(self): + self.__ThisPartition = -1 + self.__Selectors = {} #dataset aliases (like 5009) : MixingSelector object + self.__DatasetsToWeight = {} #hash of dataset weights (indexed by MC run number) + self.__ThisScaleFactor = -1 + self.__ScaleFactorSum = -1 + self.__NumPartitions = -1 + + def SetPartition(self,thisPartition): + """Must be done when partitionArg and mixStreamConfig gets a value.""" + if self.__ThisPartition < 0 : #first time + for s in mixStreamConfig.JobSelectors: + self.__Selectors[s.DSID()] = s + self.__NumPartitions = mixStreamConfig.NumberOfBatchJobs + + self.__ThisPartition = thisPartition + if self.__ThisPartition < 0 : + print("*** MixingPartitioner: WARNING you requested an invalid partition number: setting to 0. *** ") + self.__ThisPartition = 0 + else: + if self.__ThisPartition >= (self.__NumPartitions - 1): + print("*** MixingPartitioner: WARNING you requested an invalid (big) partition number: setting to", self.__NumPartitions - 1, ". *** ") + self.__ThisPartition = self.__NumPartitions - 1 + else: + print("*** MixingPartitioner: INFO you requested partition number", self.__ThisPartition) + self.__ThisScaleFactor = mixStreamConfig.ThisLuminosityFraction(self.__ThisPartition) #the job config knows partitioning + self.__ScaleFactorSum = mixStreamConfig.FinishedLuminosityFraction(self.__ThisPartition) #the job config knows partitioning + + + def AddSample(self, fileList, ksample, nEvents, newAlias=-1, nEventsPerFile=50): + if not (ksample in self.__Selectors): + self.__Selectors[ksample] = MixingSelector(self.__NumPartitions,ksample,nEvents,nEventsPerFile) + self.__Selectors[ksample].addNewCatalog(fileList) + else: + self.__Selectors[ksample].addNewCatalog(fileList) + if newAlias > 0: self.__Selectors[ksample].addAlias(newAlias) + + def ConfigureServices(self): + EventMixerList = [] + serviceString = "#Input selectors " + soFar = self.SoFar() + perSFOnow = self.PerSFOnow() + for ksample in self.__Selectors: + EventMixerList += [ self.__Selectors[ksample].trigger(soFar,perSFOnow) ] + serviceString += "\n%s = EventSelectorAthenaPool( '%s' )" % (self.__Selectors[ksample].name(), self.__Selectors[ksample].name()) + serviceString += "\nsvcMgr += %s" % self.__Selectors[ksample].name() + serviceString += "\n%s.InputCollections= ['%s']" % \ + (self.__Selectors[ksample].name(), ("', '").join(self.__Selectors[ksample].FilesToStage(soFar,perSFOnow))) + return EventMixerList, serviceString + def ScaleFactor(self): + totalLumi = 1 + return self.__ThisScaleFactor * totalLumi + def PerSFOnow(self): + return self.__ThisScaleFactor/mixStreamConfig.NumberOfSFO + def SoFar(self): + return self.__ScaleFactorSum + self.PerSFOnow() * (self.__ThisPartition % mixStreamConfig.NumberOfSFO) + def ConfigureSelectors(self): + print("*** MixingPartitioner: INFO Partition ", self.__ThisPartition, " has a LB beginning after ", self.__ScaleFactorSum, "% of the input has been processed.") + print("*** MixingPartitioner: INFO Partition ", self.__ThisPartition, " has a LB reading ", self.__ThisScaleFactor * 100, "% of the input. ") + print(":::::::::: STREAMING JOB CONFIGURATION: LHC INSTANTANEOUS LUMINOSITY= %f x 10^%i cm^-2 s^-1" % (self.ScaleFactor(), 31)) + soFar = self.SoFar() + perSFOnow = self.PerSFOnow() + for ksample in self.__Selectors: + sel = self.__Selectors[ksample] + if not sel.isSufficient(): # prescale weighting is a TASK-GLOBAL! Read more, don't scale more. + print("*** MixingPartitioner: WARNING not enough events for %s -- (%s) will be weighted." % \ + (sel.name(), ','.join([str(ali) for ali in sel.Equivalents()]) )) + for aliasedID in sel.Equivalents(): + self.__DatasetsToWeight[ aliasedID ] = sel.weight() + print("*** MixingPartitioner: INFO \t%s FirstFile=%s EvOffset=%i NeV=%f" % \ + (sel.name(), sel.firstFileIndex(soFar), sel.firstEventInFile(soFar), sel.totalEventsThisJob(perSFOnow))) + + def preStageInputFiles(self,CastorOrDCache = 'Castor'): + from PyJobTransformsCore.FilePreStager import theFileStagerRobot + if CastorOrDCache == 'Castor': print("WARNING: Castor is not supported anymore") - else: - from StreamMix import DCachePreStager - fileList = self.ListOfFilesToStage() - # NOTE THAT FILES THAT DON'T START WITH PNFS OR DCACHE WILL NOT BE CHECKED. - filesNeedingAction = theFileStagerRobot.addFilesToStagerIfNeeded( fileList ) - print("This job muust stage %i files" % len(filesNeedingAction)) - for f in filesNeedingAction: print(f) - filesNotStaged = theFileStagerRobot.waitUntilAllFilesStaged() - if filesNotStaged: - problemFiles = '' - for filename,status in iteritems(filesNotStaged): - problemFiles += os.linesep + "%s:%s" % (filename,status) - raise IOError("Could not stage following files from tape:%s" % problemFiles ) - - def ListOfSamplesToWeight(self): - return [ k for k in self.__DatasetsToWeight ] - def ListOfSampleFractions(self): - return [ self.__DatasetsToWeight[k] for k in self.__DatasetsToWeight ] - def ListOfSampleColonWeights(self): - - return [ str(k)+":"+("%f" % self.__DatasetsToWeight[k]) for k in self.__DatasetsToWeight ] - def ListOfFilesToStage(self): - tlst = [] - soFar = self.SoFar() - perSFOnow = self.PerSFOnow() - for ksample in self.__Selectors: - sel = self.__Selectors[ksample] - tlst += sel.FilesToStage(soFar,perSFOnow) - return tlst + else: + from StreamMix import DCachePreStager # noqa: F401 + fileList = self.ListOfFilesToStage() + # NOTE THAT FILES THAT DON'T START WITH PNFS OR DCACHE WILL NOT BE CHECKED. + filesNeedingAction = theFileStagerRobot.addFilesToStagerIfNeeded( fileList ) + print("This job muust stage %i files" % len(filesNeedingAction)) + for f in filesNeedingAction: print(f) + filesNotStaged = theFileStagerRobot.waitUntilAllFilesStaged() + if filesNotStaged: + problemFiles = '' + for filename,status in filesNotStaged.items(): + problemFiles += os.linesep + "%s:%s" % (filename,status) + raise IOError("Could not stage following files from tape:%s" % problemFiles ) + + def ListOfSamplesToWeight(self): + return [ k for k in self.__DatasetsToWeight ] + def ListOfSampleFractions(self): + return [ self.__DatasetsToWeight[k] for k in self.__DatasetsToWeight ] + def ListOfSampleColonWeights(self): + + return [ str(k)+":"+("%f" % self.__DatasetsToWeight[k]) for k in self.__DatasetsToWeight ] + def ListOfFilesToStage(self): + tlst = [] + soFar = self.SoFar() + perSFOnow = self.PerSFOnow() + for ksample in self.__Selectors: + sel = self.__Selectors[ksample] + tlst += sel.FilesToStage(soFar,perSFOnow) + return tlst diff --git a/Reconstruction/RecJobTransforms/python/MixingSelector.py b/Reconstruction/RecJobTransforms/python/MixingSelector.py index fc45efc6628337f98acbe98d5e8ccd8179b747d3..87ab0160dbbb8dbb71b185fe49591e31d3fd46b4 100644 --- a/Reconstruction/RecJobTransforms/python/MixingSelector.py +++ b/Reconstruction/RecJobTransforms/python/MixingSelector.py @@ -1,8 +1,4 @@ -from __future__ import print_function -from __future__ import division - -from builtins import object -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration ##################################### # Utility class @@ -11,86 +7,85 @@ from builtins import object from math import sqrt class MixingSelector(object): - """This class configures one EventSelector with the MC dataset ID and the average events per job. - Later, the user can add lists of physical file names, or equivalent DSIDs (e.g. 5010 == 5030). - This class only works properly for fixed-sized input files. """ - def __init__(self, numJobs, datasetName, eventsRequired, eventsPerFile=50): - self.__np = numJobs - self.__dsid = datasetName - self.__evReq = eventsRequired - self.__evPerFile = eventsPerFile - self.__aliases = [ datasetName ] - self.__physical = [] - print("*** MixingPartitioner: INFO Created new selector Selector%i requesting an average of %f events per job ***" % (datasetName, eventsRequired)) - def name(self): - return "Selector"+str(self.__dsid) - def __str__(self): - """ These are for printing """ - return self.name()+"[ev/job="+str(self.__evReq)+",nFiles="+str(len(self.__physical))+"]" - def __repr__(self): - return self.__str__() - def DSID(self): - return self.__dsid - def numFiles(self): - return len(self.__physical) - def numEvents(self): - return self.__evPerFile * len(self.__physical) - ### functions to add new files to the selector ### - def addAlias(self,newAlias): - """If some downstream module is weighting prescales based on MC run number, inform it that run number newAlias is equivalent to this __dsid.""" - if not newAlias in self.__aliases: - print("*** MixingPartitioner: INFO \tEventInfo run number %i interpreted like %i. ***" % (newAlias, self.__dsid)) - self.__aliases += [ newAlias ] - def addNewCatalog(self, pfnlist): - if len(pfnlist) == 0: - print("*** MixingPartitioner: WARNING Adding empty list to %s?" % self.name()) - return - if self.numFiles(): - print("*** MixingPartitioner: INFO Files (%s ...) will be appended to %s. ***" % (pfnlist[0], self.name())) - else: - print("*** MixingPartitioner: INFO Using files (%s ...) to initialize %s. ***" % (pfnlist[0], self.name())) - self.__physical += pfnlist - ### functions to calculate staging and mixer configuration ### ) - def evAvailablePerJob(self): - #This allows a three-sigma "buffer" between jobs. - evraw = (1.0*self.numEvents())/self.__np - return (2 + evraw + 9 - 3 * sqrt(4 * evraw + 9)) - def isSufficient(self): - if True: #For safety, you could redefine 'sufficient' to include in-file buffers. - return self.__evReq*self.__np <= (self.numEvents()) - else: - return self.evAvailablePerJob() >= self.__evReq - def weight(self): - #This works regardless of how isSufficient() is defined. - if self.isSufficient(): - return 1.0 - else: - return self.evAvailablePerJob()/self.__evReq - def firstFileIndex(self, fractionDone): - #Always use maximal job spacing. - return int ((self.numEvents()*fractionDone)/self.__evPerFile) - def firstEventInFile(self, fractionDone) : - return int (self.numEvents()*fractionDone) % self.__evPerFile - def lastFileIndex(self, fractionDone, lumiFraction): - if not self.isSufficient(): - # maximum of 1 file overlap! - return int( (self.numEvents()*(fractionDone + lumiFraction))/self.__evPerFile) + 1 - lastProbableFile = self.firstFileIndex(fractionDone) + 1 - eventsRequired = self.__evReq * (lumiFraction * self.__np) - lastProbableFile += int ((eventsRequired + 3 * sqrt(eventsRequired))/self.__evPerFile) #allow 3 sigma fluctuation. - if lastProbableFile > self.numFiles(): - lastProbableFile = self.numFiles() - return lastProbableFile - def totalEventsThisJob(self,lumiFraction): - if self.isSufficient(): - return self.__evReq * (lumiFraction * self.__np) - else: - return self.evAvailablePerJob() * (lumiFraction * self.__np) #luminosities should not vary too much per job. - def trigger(self,fractionDone,lumiFraction): - firstEvt = self.firstEventInFile(fractionDone) - return "EventSelectorAthenaPool/" + self.name() + ":" + str(firstEvt) + ":" + str(int(firstEvt + self.totalEventsThisJob(lumiFraction))) - def Equivalents(self): - return self.__aliases - def FilesToStage(self,fractionDone,lumiFraction): - return self.__physical[self.firstFileIndex(fractionDone):self.lastFileIndex(fractionDone,lumiFraction)] - + """This class configures one EventSelector with the MC dataset ID and the average events per job. + Later, the user can add lists of physical file names, or equivalent DSIDs (e.g. 5010 == 5030). + This class only works properly for fixed-sized input files. """ + def __init__(self, numJobs, datasetName, eventsRequired, eventsPerFile=50): + self.__np = numJobs + self.__dsid = datasetName + self.__evReq = eventsRequired + self.__evPerFile = eventsPerFile + self.__aliases = [ datasetName ] + self.__physical = [] + print("*** MixingPartitioner: INFO Created new selector Selector%i requesting an average of %f events per job ***" % (datasetName, eventsRequired)) + def name(self): + return "Selector"+str(self.__dsid) + def __str__(self): + """ These are for printing """ + return self.name()+"[ev/job="+str(self.__evReq)+",nFiles="+str(len(self.__physical))+"]" + def __repr__(self): + return self.__str__() + def DSID(self): + return self.__dsid + def numFiles(self): + return len(self.__physical) + def numEvents(self): + return self.__evPerFile * len(self.__physical) + ### functions to add new files to the selector ### + def addAlias(self,newAlias): + """If some downstream module is weighting prescales based on MC run number, inform it that run number newAlias is equivalent to this __dsid.""" + if newAlias not in self.__aliases: + print("*** MixingPartitioner: INFO \tEventInfo run number %i interpreted like %i. ***" % (newAlias, self.__dsid)) + self.__aliases += [ newAlias ] + def addNewCatalog(self, pfnlist): + if len(pfnlist) == 0: + print("*** MixingPartitioner: WARNING Adding empty list to %s?" % self.name()) + return + if self.numFiles(): + print("*** MixingPartitioner: INFO Files (%s ...) will be appended to %s. ***" % (pfnlist[0], self.name())) + else: + print("*** MixingPartitioner: INFO Using files (%s ...) to initialize %s. ***" % (pfnlist[0], self.name())) + self.__physical += pfnlist + ### functions to calculate staging and mixer configuration ### ) + def evAvailablePerJob(self): + #This allows a three-sigma "buffer" between jobs. + evraw = (1.0*self.numEvents())/self.__np + return (2 + evraw + 9 - 3 * sqrt(4 * evraw + 9)) + def isSufficient(self): + if True: #For safety, you could redefine 'sufficient' to include in-file buffers. + return self.__evReq*self.__np <= (self.numEvents()) + else: + return self.evAvailablePerJob() >= self.__evReq + def weight(self): + #This works regardless of how isSufficient() is defined. + if self.isSufficient(): + return 1.0 + else: + return self.evAvailablePerJob()/self.__evReq + def firstFileIndex(self, fractionDone): + #Always use maximal job spacing. + return int ((self.numEvents()*fractionDone)/self.__evPerFile) + def firstEventInFile(self, fractionDone) : + return int (self.numEvents()*fractionDone) % self.__evPerFile + def lastFileIndex(self, fractionDone, lumiFraction): + if not self.isSufficient(): + # maximum of 1 file overlap! + return int( (self.numEvents()*(fractionDone + lumiFraction))/self.__evPerFile) + 1 + lastProbableFile = self.firstFileIndex(fractionDone) + 1 + eventsRequired = self.__evReq * (lumiFraction * self.__np) + lastProbableFile += int ((eventsRequired + 3 * sqrt(eventsRequired))/self.__evPerFile) #allow 3 sigma fluctuation. + if lastProbableFile > self.numFiles(): + lastProbableFile = self.numFiles() + return lastProbableFile + def totalEventsThisJob(self,lumiFraction): + if self.isSufficient(): + return self.__evReq * (lumiFraction * self.__np) + else: + return self.evAvailablePerJob() * (lumiFraction * self.__np) #luminosities should not vary too much per job. + def trigger(self,fractionDone,lumiFraction): + firstEvt = self.firstEventInFile(fractionDone) + return "EventSelectorAthenaPool/" + self.name() + ":" + str(firstEvt) + ":" + str(int(firstEvt + self.totalEventsThisJob(lumiFraction))) + def Equivalents(self): + return self.__aliases + def FilesToStage(self,fractionDone,lumiFraction): + return self.__physical[self.firstFileIndex(fractionDone):self.lastFileIndex(fractionDone,lumiFraction)] diff --git a/Reconstruction/RecJobTransforms/python/RDOFilePeeker.py b/Reconstruction/RecJobTransforms/python/RDOFilePeeker.py index 9c0b2dab049633de86546f80891b47193f4cccdc..190f79423c13f051f7dc48bcac419f61705eedfd 100644 --- a/Reconstruction/RecJobTransforms/python/RDOFilePeeker.py +++ b/Reconstruction/RecJobTransforms/python/RDOFilePeeker.py @@ -1,6 +1,6 @@ from past.builtins import basestring -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration def RDOFilePeeker(runArgs, skeletonLog): from PyUtils.MetaReader import read_metadata @@ -30,14 +30,14 @@ def RDOFilePeeker(runArgs, skeletonLog): if '/Digitization/Parameters' in metadata: metadatadict = metadata['/Digitization/Parameters'] if isinstance(metadatadict, list): - skeletonLog.warning("%s inputfile: %s contained %s sets of Dititization Metadata. Using the final set in the list.",inputtype,inputfile,len(metadatadict)) + skeletonLog.warning("inputfile: %s contained %s sets of Dititization Metadata. Using the final set in the list.",input_file,len(metadatadict)) metadatadict = metadatadict[-1] ##Get IOVDbGlobalTag if 'IOVDbGlobalTag' not in metadatadict: try: if metadata['/TagInfo']['IOVDbGlobalTag'] is not None: metadatadict['IOVDbGlobalTag'] = metadata['/TagInfo']['IOVDbGlobalTag'] - except: + except Exception: skeletonLog.warning("Failed to find IOVDbGlobalTag.") else: ##Patch for older hit files @@ -89,7 +89,7 @@ def RDOFilePeeker(runArgs, skeletonLog): skeletonLog.debug(cmd) try: exec(cmd) - except: + except Exception: skeletonLog.warning('Failed to switch on subdetector %s',subdet) #hacks to reproduce the sub-set of DetFlags left on by RecExCond/AllDet_detDescr.py DetFlags.simulate.all_setOff() diff --git a/Reconstruction/RecJobTransforms/python/RecConfig.py b/Reconstruction/RecJobTransforms/python/RecConfig.py index 9586f252b15843f02cc6250e6f2b689db5a12c6b..f5421eeec6799e4c42119e575b2a48165f526d12 100644 --- a/Reconstruction/RecJobTransforms/python/RecConfig.py +++ b/Reconstruction/RecJobTransforms/python/RecConfig.py @@ -1,9 +1,8 @@ -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration -import os __all__ = [] -from PyJobTransformsCore.TransformConfig import * +from PyJobTransformsCore.TransformConfig import TransformConfig, Boolean, String, ListOfStrings class RecConfig(TransformConfig): # prevent any mistypings by not allowing dynamic members diff --git a/Reconstruction/RecJobTransforms/python/recTransformUtils.py b/Reconstruction/RecJobTransforms/python/recTransformUtils.py index 77a6bacc76b99a17a274b01d173c6043d3e006f5..28bee14f944b0afac1069c9e6e474fd0551400da 100644 --- a/Reconstruction/RecJobTransforms/python/recTransformUtils.py +++ b/Reconstruction/RecJobTransforms/python/recTransformUtils.py @@ -1,4 +1,4 @@ -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration ## @brief Module with standard reconstruction transform options and substeps @@ -10,7 +10,7 @@ msg = logging.getLogger(__name__) import PyJobTransforms.trfArgClasses as trfArgClasses from PyJobTransforms.trfExe import athenaExecutor, dummyExecutor, DQMergeExecutor, reductionFrameworkExecutor, reductionFrameworkExecutorNTUP -from PyJobTransforms.trfArgs import addD3PDArguments, addPrimaryDPDArguments, addExtraDPDTypes, addReductionArguments +from PyJobTransforms.trfArgs import addPrimaryDPDArguments, addExtraDPDTypes, addReductionArguments def addCommonRecTrfArgs(parser): diff --git a/Reconstruction/RecJobTransforms/python/recoTransforms.py b/Reconstruction/RecJobTransforms/python/recoTransforms.py index ac7bddd6e1396924b9f75630cc86174a3bcee556..58e7282f5b7fc291fe4e2b806f512acedd23d049 100644 --- a/Reconstruction/RecJobTransforms/python/recoTransforms.py +++ b/Reconstruction/RecJobTransforms/python/recoTransforms.py @@ -1,11 +1,9 @@ -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration +# Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration ## @brief Specialist reconstruction and bytestream transforms # @author atlas-comp-jt-dev@cern.ch -# @version $Id: recoTransforms.py 611371 2014-08-12 09:21:15Z seuster $ import os -import re import subprocess import logging @@ -39,21 +37,21 @@ class skimRawExecutor(scriptExecutor): evtprefix, evtstr = splitStrings[2].split("=") # Check sanity if runprefix != "Run" or evtprefix != "Event": - msg.warning("Failed to understand this line from AtlListBSEvents: {0}".format(line)) + msg.warning("Failed to understand this line from AtlListBSEvents: %s", line) else: - runnumber = int(runstr) - evtnumber = int(evtstr) + runnumber = int(runstr) # noqa: F841 + evtnumber = int(evtstr) # noqa: F841 # We build up a string key as "RUN-EVENT", so that we can take advantage of # the fast hash search against a dictionary rawEventList[runstr + "-" + evtstr] = True - msg.debug("Identified run {0}, event {1} in input RAW files".format(runstr, evtstr)) - except ValueError as e: - msg.warning("Failed to understand this line from AtlListBSEvents: {0}".format(line)) + msg.debug("Identified run %s, event %s in input RAW files", runstr, evtstr) + except ValueError: + msg.warning("Failed to understand this line from AtlListBSEvents: %s", line) except subprocess.CalledProcessError as e: errMsg = "Call to AtlListBSEvents failed: {0}".format(e) - msg.error(erMsg) + msg.error(errMsg) raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_SETUP_FAIL"), errMsg) - msg.info("Found {0} events as skim candidates in RAW inputs".format(len(rawEventList))) + msg.info("Found %d events as skim candidates in RAW inputs", len(rawEventList)) # Now open the list of filter events, and check through them slimmedFilterFile = "slimmedFilterFile.{0}".format(os.getpid()) @@ -63,17 +61,17 @@ class skimRawExecutor(scriptExecutor): try: runstr, evtstr = line.split() if runstr + "-" + evtstr in rawEventList: - msg.debug("Found run {0}, event {1} in master filter list".format(runstr, evtstr)) + msg.debug("Found run %s, event %s in master filter list", runstr, evtstr) os.write(slimFF.fileno(), line) count += 1 except ValueError as e: - msg.warning("Failed to understand this line from master filter file: {0} {1}".format(line, e)) + msg.warning("Failed to understand this line from master filter file: %s %s", line, e) if count == 0: # If there are no matched events, create a bogus request for run and event 0 to keep # AtlCopyBSEvent.exe CLI msg.info("No events matched in this input file - empty RAW file output will be made") os.write(slimFF.fileno(), "0 0\n") - msg.info("Matched {0} lines from the master filter file against input events; wrote these to {1}".format(count, slimmedFilterFile)) + msg.info("Matched %d lines from the master filter file against input events; wrote these to %s", count, slimmedFilterFile) # Build up the right command line for acmd.py self._cmd = ['acmd.py', 'filter-files'] diff --git a/Reconstruction/RecJobTransforms/python/streaming_arg.py b/Reconstruction/RecJobTransforms/python/streaming_arg.py deleted file mode 100644 index c19e18271a227e9c06112a2743ee8252fbc101cb..0000000000000000000000000000000000000000 --- a/Reconstruction/RecJobTransforms/python/streaming_arg.py +++ /dev/null @@ -1,72 +0,0 @@ - -# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration - -__author__ = "Ayana.Holloway@cern.ch" -__doc__ = """Arguments for streaming batch test.""" - -from copy import copy -from PyJobTransformsCore.basic_trfarg import * -from RecJobTransforms.MixStreamConfig import * - -class PartitionArg(IntegerArg): - """Batch job ('partition') number of this job. The trf will calculate the run/lumi/sfo number, and the offset into the input file lists, from this.""" - def __init__(self,help='default',name='default'): - IntegerArg.__init__(self,help,name) - self.__mixPartitioner = MixingPartitioner() - - def metaData(self): - val = self.value() - if val is None: - return {} - else: - return { self.name() : self.value(), - "pseudoRun" : mixStreamConfig.WhichRunNumber(self.value()), - "pseudoSFO" : mixStreamConfig.WhichSFO(self.value()), - "pseudoLumiBlock" : mixStreamConfig.WhichLumiBlock(self.value()), - "pseudoLumiFraction" : mixStreamConfig.ThisLuminosityFraction(self.value()) } - - def GetPartitioner(self): - return self.__mixPartitioner - - def isFullArgument(self): - return True - - def jobOrTask(self): - return 'job' - - def preRunAction(self): - self.__mixPartitioner.SetPartition(self.value()) - self.__mixPartitioner.ConfigureSelectors() - self.__mixPartitioner.preStageInputFiles() - - def WritePartitionJobOptions(self): - EventMixerList, outstring = self.__mixPartitioner.ConfigureServices() - el = "',\n'" - sp = ", " - - outstring += '\nprint ":::::::::: STREAMING JOB CONFIGURATION: LHC INSTANTANEOUS LUMINOSITY= %f x 10^%i cm^-2 s^-1"' % (self.__mixPartitioner.ScaleFactor(), 31) - outstring += "\nEventMixer.TriggerList += [ '%s' ]\n" % el.join(EventMixerList) - - weighted = [str(i) for i in self.__mixPartitioner.ListOfSamplesToWeight()] - weights = [str(f) for f in self.__mixPartitioner.ListOfSampleFractions()] - if len(self.__mixPartitioner.ListOfSampleColonWeights()): - outstring += "\nWeightConfiguration = [ '%s' ]\n" % el.join(self.__mixPartitioner.ListOfSampleColonWeights()) - else: - outstring += "\nWeightConfiguration = [ ]\n" - outstring += "\nListOfSamplesToWeight = [%s]\n" % sp.join(weighted) - outstring += "\nListOfSampleFractions = [%s]\n" % (sp).join(weights) - return outstring - - -##################################### - -class FakeTableArg(BoolArg): - """Toggle between simple trigger table and full table""" - def __init__(self,help='Toggle between simple trigger table and full table',name='default'): - BoolArg.__init__(self,help,name) - - def isFullArgument(self): - return True - - def jobOrTask(self): - return 'task' diff --git a/Reconstruction/RecJobTransforms/share/TrigConfigForCosmic09RerunHLT.py b/Reconstruction/RecJobTransforms/share/TrigConfigForCosmic09RerunHLT.py deleted file mode 100644 index c05889d25cb0c795c0896c09fa83a0725df554fd..0000000000000000000000000000000000000000 --- a/Reconstruction/RecJobTransforms/share/TrigConfigForCosmic09RerunHLT.py +++ /dev/null @@ -1,16 +0,0 @@ -# This sets up the trigger config for a BStoESD job -# to use the HLT output XML file generated by a previous BStoBS step -# and a fixed LVL1 file from the release which is broadly compatible -# with the 2009 cosmic runs. -# It is intended for special trigger reprocessing only. -# Contact: Sylvie Brunet, Clemencia Mora or other trigger configuration experts - -##preInclude for all steps but enable only for RAWtoESD -from RecExConfig.RecFlags import rec -if rec.readRDO and rec.doESD: - from TriggerJobOpts.TriggerFlags import TriggerFlags as tf - tf.inputHLTconfigFile.set_Value_and_Lock("outputHLTconfig.xml") - tf.inputLVL1configFile.set_Value_and_Lock("TriggerMenuXML/LVL1config_Cosmic2009_v1_7-bit_trigger_types.xml") - tf.configForStartup.set_Value_and_Lock("HLToffline") - tf.configurationSourceList.set_Value_and_Lock(['xml']) - #don't set this in ESDtoAOD, it works with HLTonline since DS folders are stored in ESD metadata. diff --git a/Reconstruction/RecJobTransforms/share/rawtodpd.py b/Reconstruction/RecJobTransforms/share/rawtodpd.py deleted file mode 100644 index ea118d46a1d03365736dbc8eadfdfb2d354e804c..0000000000000000000000000000000000000000 --- a/Reconstruction/RecJobTransforms/share/rawtodpd.py +++ /dev/null @@ -1,12 +0,0 @@ -############################################################### -# -# Skeleton top job options for RAW->DESD -# Put here outputs that require rec.doDPD=True -# -# TODO: Review of options supported here... -# -#============================================================== - -rec.doDPD = True -rec.DPDMakerScripts.append("PrimaryDPDMaker/PrimaryDPDMaker.py") -include( "RecExCommon/RecExCommon_topOptions.py" ) \ No newline at end of file