Skip to content
Snippets Groups Projects
Commit 8888f1b9 authored by Frank Winklmeier's avatar Frank Winklmeier
Browse files

RecJobTransforms: enable flake8 and package cleanup

Enable flake8 and delete some clearly obsolete files.
parent 3740e0a7
No related branches found
No related tags found
No related merge requests found
Showing
with 301 additions and 421 deletions
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
atlas_subdir( RecJobTransforms ) atlas_subdir( RecJobTransforms )
# Install python modules # Install python modules
atlas_install_python_modules( python/*.py ) atlas_install_python_modules( python/*.py POST_BUILD_CMD ${ATLAS_FLAKE8} )
# Install RDOtoRDOtrigger job opts with flake8 check # Install RDOtoRDOtrigger job opts with flake8 check
atlas_install_joboptions( share/skeleton.RDOtoRDOtrigger*.py POST_BUILD_CMD ${ATLAS_FLAKE8} ) atlas_install_joboptions( share/skeleton.RDOtoRDOtrigger*.py POST_BUILD_CMD ${ATLAS_FLAKE8} )
# Install other job opts without flake8 check # Install other job opts without flake8 check
......
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
import os
__all__ = [] __all__ = []
from PyJobTransformsCore.TransformConfig import * from PyJobTransformsCore.TransformConfig import TransformConfig
class BsConfig(TransformConfig): class BsConfig(TransformConfig):
# prevent any mistypings by not allowing dynamic members # prevent any mistypings by not allowing dynamic members
......
This diff is collapsed.
from __future__ import print_function # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
from __future__ import division
from builtins import object
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
##################################### #####################################
# Utility class # Utility class
...@@ -11,86 +7,85 @@ from builtins import object ...@@ -11,86 +7,85 @@ from builtins import object
from math import sqrt from math import sqrt
class MixingSelector(object): class MixingSelector(object):
"""This class configures one EventSelector with the MC dataset ID and the average events per job. """This class configures one EventSelector with the MC dataset ID and the average events per job.
Later, the user can add lists of physical file names, or equivalent DSIDs (e.g. 5010 == 5030). Later, the user can add lists of physical file names, or equivalent DSIDs (e.g. 5010 == 5030).
This class only works properly for fixed-sized input files. """ This class only works properly for fixed-sized input files. """
def __init__(self, numJobs, datasetName, eventsRequired, eventsPerFile=50): def __init__(self, numJobs, datasetName, eventsRequired, eventsPerFile=50):
self.__np = numJobs self.__np = numJobs
self.__dsid = datasetName self.__dsid = datasetName
self.__evReq = eventsRequired self.__evReq = eventsRequired
self.__evPerFile = eventsPerFile self.__evPerFile = eventsPerFile
self.__aliases = [ datasetName ] self.__aliases = [ datasetName ]
self.__physical = [] self.__physical = []
print("*** MixingPartitioner: INFO Created new selector Selector%i requesting an average of %f events per job ***" % (datasetName, eventsRequired)) print("*** MixingPartitioner: INFO Created new selector Selector%i requesting an average of %f events per job ***" % (datasetName, eventsRequired))
def name(self): def name(self):
return "Selector"+str(self.__dsid) return "Selector"+str(self.__dsid)
def __str__(self): def __str__(self):
""" These are for printing """ """ These are for printing """
return self.name()+"[ev/job="+str(self.__evReq)+",nFiles="+str(len(self.__physical))+"]" return self.name()+"[ev/job="+str(self.__evReq)+",nFiles="+str(len(self.__physical))+"]"
def __repr__(self): def __repr__(self):
return self.__str__() return self.__str__()
def DSID(self): def DSID(self):
return self.__dsid return self.__dsid
def numFiles(self): def numFiles(self):
return len(self.__physical) return len(self.__physical)
def numEvents(self): def numEvents(self):
return self.__evPerFile * len(self.__physical) return self.__evPerFile * len(self.__physical)
### functions to add new files to the selector ### ### functions to add new files to the selector ###
def addAlias(self,newAlias): def addAlias(self,newAlias):
"""If some downstream module is weighting prescales based on MC run number, inform it that run number newAlias is equivalent to this __dsid.""" """If some downstream module is weighting prescales based on MC run number, inform it that run number newAlias is equivalent to this __dsid."""
if not newAlias in self.__aliases: if newAlias not in self.__aliases:
print("*** MixingPartitioner: INFO \tEventInfo run number %i interpreted like %i. ***" % (newAlias, self.__dsid)) print("*** MixingPartitioner: INFO \tEventInfo run number %i interpreted like %i. ***" % (newAlias, self.__dsid))
self.__aliases += [ newAlias ] self.__aliases += [ newAlias ]
def addNewCatalog(self, pfnlist): def addNewCatalog(self, pfnlist):
if len(pfnlist) == 0: if len(pfnlist) == 0:
print("*** MixingPartitioner: WARNING Adding empty list to %s?" % self.name()) print("*** MixingPartitioner: WARNING Adding empty list to %s?" % self.name())
return return
if self.numFiles(): if self.numFiles():
print("*** MixingPartitioner: INFO Files (%s ...) will be appended to %s. ***" % (pfnlist[0], self.name())) print("*** MixingPartitioner: INFO Files (%s ...) will be appended to %s. ***" % (pfnlist[0], self.name()))
else: else:
print("*** MixingPartitioner: INFO Using files (%s ...) to initialize %s. ***" % (pfnlist[0], self.name())) print("*** MixingPartitioner: INFO Using files (%s ...) to initialize %s. ***" % (pfnlist[0], self.name()))
self.__physical += pfnlist self.__physical += pfnlist
### functions to calculate staging and mixer configuration ### ) ### functions to calculate staging and mixer configuration ### )
def evAvailablePerJob(self): def evAvailablePerJob(self):
#This allows a three-sigma "buffer" between jobs. #This allows a three-sigma "buffer" between jobs.
evraw = (1.0*self.numEvents())/self.__np evraw = (1.0*self.numEvents())/self.__np
return (2 + evraw + 9 - 3 * sqrt(4 * evraw + 9)) return (2 + evraw + 9 - 3 * sqrt(4 * evraw + 9))
def isSufficient(self): def isSufficient(self):
if True: #For safety, you could redefine 'sufficient' to include in-file buffers. if True: #For safety, you could redefine 'sufficient' to include in-file buffers.
return self.__evReq*self.__np <= (self.numEvents()) return self.__evReq*self.__np <= (self.numEvents())
else: else:
return self.evAvailablePerJob() >= self.__evReq return self.evAvailablePerJob() >= self.__evReq
def weight(self): def weight(self):
#This works regardless of how isSufficient() is defined. #This works regardless of how isSufficient() is defined.
if self.isSufficient(): if self.isSufficient():
return 1.0 return 1.0
else: else:
return self.evAvailablePerJob()/self.__evReq return self.evAvailablePerJob()/self.__evReq
def firstFileIndex(self, fractionDone): def firstFileIndex(self, fractionDone):
#Always use maximal job spacing. #Always use maximal job spacing.
return int ((self.numEvents()*fractionDone)/self.__evPerFile) return int ((self.numEvents()*fractionDone)/self.__evPerFile)
def firstEventInFile(self, fractionDone) : def firstEventInFile(self, fractionDone) :
return int (self.numEvents()*fractionDone) % self.__evPerFile return int (self.numEvents()*fractionDone) % self.__evPerFile
def lastFileIndex(self, fractionDone, lumiFraction): def lastFileIndex(self, fractionDone, lumiFraction):
if not self.isSufficient(): if not self.isSufficient():
# maximum of 1 file overlap! # maximum of 1 file overlap!
return int( (self.numEvents()*(fractionDone + lumiFraction))/self.__evPerFile) + 1 return int( (self.numEvents()*(fractionDone + lumiFraction))/self.__evPerFile) + 1
lastProbableFile = self.firstFileIndex(fractionDone) + 1 lastProbableFile = self.firstFileIndex(fractionDone) + 1
eventsRequired = self.__evReq * (lumiFraction * self.__np) eventsRequired = self.__evReq * (lumiFraction * self.__np)
lastProbableFile += int ((eventsRequired + 3 * sqrt(eventsRequired))/self.__evPerFile) #allow 3 sigma fluctuation. lastProbableFile += int ((eventsRequired + 3 * sqrt(eventsRequired))/self.__evPerFile) #allow 3 sigma fluctuation.
if lastProbableFile > self.numFiles(): if lastProbableFile > self.numFiles():
lastProbableFile = self.numFiles() lastProbableFile = self.numFiles()
return lastProbableFile return lastProbableFile
def totalEventsThisJob(self,lumiFraction): def totalEventsThisJob(self,lumiFraction):
if self.isSufficient(): if self.isSufficient():
return self.__evReq * (lumiFraction * self.__np) return self.__evReq * (lumiFraction * self.__np)
else: else:
return self.evAvailablePerJob() * (lumiFraction * self.__np) #luminosities should not vary too much per job. return self.evAvailablePerJob() * (lumiFraction * self.__np) #luminosities should not vary too much per job.
def trigger(self,fractionDone,lumiFraction): def trigger(self,fractionDone,lumiFraction):
firstEvt = self.firstEventInFile(fractionDone) firstEvt = self.firstEventInFile(fractionDone)
return "EventSelectorAthenaPool/" + self.name() + ":" + str(firstEvt) + ":" + str(int(firstEvt + self.totalEventsThisJob(lumiFraction))) return "EventSelectorAthenaPool/" + self.name() + ":" + str(firstEvt) + ":" + str(int(firstEvt + self.totalEventsThisJob(lumiFraction)))
def Equivalents(self): def Equivalents(self):
return self.__aliases return self.__aliases
def FilesToStage(self,fractionDone,lumiFraction): def FilesToStage(self,fractionDone,lumiFraction):
return self.__physical[self.firstFileIndex(fractionDone):self.lastFileIndex(fractionDone,lumiFraction)] return self.__physical[self.firstFileIndex(fractionDone):self.lastFileIndex(fractionDone,lumiFraction)]
from past.builtins import basestring from past.builtins import basestring
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
def RDOFilePeeker(runArgs, skeletonLog): def RDOFilePeeker(runArgs, skeletonLog):
from PyUtils.MetaReader import read_metadata from PyUtils.MetaReader import read_metadata
...@@ -30,14 +30,14 @@ def RDOFilePeeker(runArgs, skeletonLog): ...@@ -30,14 +30,14 @@ def RDOFilePeeker(runArgs, skeletonLog):
if '/Digitization/Parameters' in metadata: if '/Digitization/Parameters' in metadata:
metadatadict = metadata['/Digitization/Parameters'] metadatadict = metadata['/Digitization/Parameters']
if isinstance(metadatadict, list): if isinstance(metadatadict, list):
skeletonLog.warning("%s inputfile: %s contained %s sets of Dititization Metadata. Using the final set in the list.",inputtype,inputfile,len(metadatadict)) skeletonLog.warning("inputfile: %s contained %s sets of Dititization Metadata. Using the final set in the list.",input_file,len(metadatadict))
metadatadict = metadatadict[-1] metadatadict = metadatadict[-1]
##Get IOVDbGlobalTag ##Get IOVDbGlobalTag
if 'IOVDbGlobalTag' not in metadatadict: if 'IOVDbGlobalTag' not in metadatadict:
try: try:
if metadata['/TagInfo']['IOVDbGlobalTag'] is not None: if metadata['/TagInfo']['IOVDbGlobalTag'] is not None:
metadatadict['IOVDbGlobalTag'] = metadata['/TagInfo']['IOVDbGlobalTag'] metadatadict['IOVDbGlobalTag'] = metadata['/TagInfo']['IOVDbGlobalTag']
except: except Exception:
skeletonLog.warning("Failed to find IOVDbGlobalTag.") skeletonLog.warning("Failed to find IOVDbGlobalTag.")
else: else:
##Patch for older hit files ##Patch for older hit files
...@@ -89,7 +89,7 @@ def RDOFilePeeker(runArgs, skeletonLog): ...@@ -89,7 +89,7 @@ def RDOFilePeeker(runArgs, skeletonLog):
skeletonLog.debug(cmd) skeletonLog.debug(cmd)
try: try:
exec(cmd) exec(cmd)
except: except Exception:
skeletonLog.warning('Failed to switch on subdetector %s',subdet) skeletonLog.warning('Failed to switch on subdetector %s',subdet)
#hacks to reproduce the sub-set of DetFlags left on by RecExCond/AllDet_detDescr.py #hacks to reproduce the sub-set of DetFlags left on by RecExCond/AllDet_detDescr.py
DetFlags.simulate.all_setOff() DetFlags.simulate.all_setOff()
......
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
import os
__all__ = [] __all__ = []
from PyJobTransformsCore.TransformConfig import * from PyJobTransformsCore.TransformConfig import TransformConfig, Boolean, String, ListOfStrings
class RecConfig(TransformConfig): class RecConfig(TransformConfig):
# prevent any mistypings by not allowing dynamic members # prevent any mistypings by not allowing dynamic members
......
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
## @brief Module with standard reconstruction transform options and substeps ## @brief Module with standard reconstruction transform options and substeps
...@@ -10,7 +10,7 @@ msg = logging.getLogger(__name__) ...@@ -10,7 +10,7 @@ msg = logging.getLogger(__name__)
import PyJobTransforms.trfArgClasses as trfArgClasses import PyJobTransforms.trfArgClasses as trfArgClasses
from PyJobTransforms.trfExe import athenaExecutor, dummyExecutor, DQMergeExecutor, reductionFrameworkExecutor, reductionFrameworkExecutorNTUP from PyJobTransforms.trfExe import athenaExecutor, dummyExecutor, DQMergeExecutor, reductionFrameworkExecutor, reductionFrameworkExecutorNTUP
from PyJobTransforms.trfArgs import addD3PDArguments, addPrimaryDPDArguments, addExtraDPDTypes, addReductionArguments from PyJobTransforms.trfArgs import addPrimaryDPDArguments, addExtraDPDTypes, addReductionArguments
def addCommonRecTrfArgs(parser): def addCommonRecTrfArgs(parser):
......
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # Copyright (C) 2002-2020 CERN for the benefit of the ATLAS collaboration
## @brief Specialist reconstruction and bytestream transforms ## @brief Specialist reconstruction and bytestream transforms
# @author atlas-comp-jt-dev@cern.ch # @author atlas-comp-jt-dev@cern.ch
# @version $Id: recoTransforms.py 611371 2014-08-12 09:21:15Z seuster $
import os import os
import re
import subprocess import subprocess
import logging import logging
...@@ -39,21 +37,21 @@ class skimRawExecutor(scriptExecutor): ...@@ -39,21 +37,21 @@ class skimRawExecutor(scriptExecutor):
evtprefix, evtstr = splitStrings[2].split("=") evtprefix, evtstr = splitStrings[2].split("=")
# Check sanity # Check sanity
if runprefix != "Run" or evtprefix != "Event": if runprefix != "Run" or evtprefix != "Event":
msg.warning("Failed to understand this line from AtlListBSEvents: {0}".format(line)) msg.warning("Failed to understand this line from AtlListBSEvents: %s", line)
else: else:
runnumber = int(runstr) runnumber = int(runstr) # noqa: F841
evtnumber = int(evtstr) evtnumber = int(evtstr) # noqa: F841
# We build up a string key as "RUN-EVENT", so that we can take advantage of # We build up a string key as "RUN-EVENT", so that we can take advantage of
# the fast hash search against a dictionary # the fast hash search against a dictionary
rawEventList[runstr + "-" + evtstr] = True rawEventList[runstr + "-" + evtstr] = True
msg.debug("Identified run {0}, event {1} in input RAW files".format(runstr, evtstr)) msg.debug("Identified run %s, event %s in input RAW files", runstr, evtstr)
except ValueError as e: except ValueError:
msg.warning("Failed to understand this line from AtlListBSEvents: {0}".format(line)) msg.warning("Failed to understand this line from AtlListBSEvents: %s", line)
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
errMsg = "Call to AtlListBSEvents failed: {0}".format(e) errMsg = "Call to AtlListBSEvents failed: {0}".format(e)
msg.error(erMsg) msg.error(errMsg)
raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_SETUP_FAIL"), errMsg) raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_SETUP_FAIL"), errMsg)
msg.info("Found {0} events as skim candidates in RAW inputs".format(len(rawEventList))) msg.info("Found %d events as skim candidates in RAW inputs", len(rawEventList))
# Now open the list of filter events, and check through them # Now open the list of filter events, and check through them
slimmedFilterFile = "slimmedFilterFile.{0}".format(os.getpid()) slimmedFilterFile = "slimmedFilterFile.{0}".format(os.getpid())
...@@ -63,17 +61,17 @@ class skimRawExecutor(scriptExecutor): ...@@ -63,17 +61,17 @@ class skimRawExecutor(scriptExecutor):
try: try:
runstr, evtstr = line.split() runstr, evtstr = line.split()
if runstr + "-" + evtstr in rawEventList: if runstr + "-" + evtstr in rawEventList:
msg.debug("Found run {0}, event {1} in master filter list".format(runstr, evtstr)) msg.debug("Found run %s, event %s in master filter list", runstr, evtstr)
os.write(slimFF.fileno(), line) os.write(slimFF.fileno(), line)
count += 1 count += 1
except ValueError as e: except ValueError as e:
msg.warning("Failed to understand this line from master filter file: {0} {1}".format(line, e)) msg.warning("Failed to understand this line from master filter file: %s %s", line, e)
if count == 0: if count == 0:
# If there are no matched events, create a bogus request for run and event 0 to keep # If there are no matched events, create a bogus request for run and event 0 to keep
# AtlCopyBSEvent.exe CLI # AtlCopyBSEvent.exe CLI
msg.info("No events matched in this input file - empty RAW file output will be made") msg.info("No events matched in this input file - empty RAW file output will be made")
os.write(slimFF.fileno(), "0 0\n") os.write(slimFF.fileno(), "0 0\n")
msg.info("Matched {0} lines from the master filter file against input events; wrote these to {1}".format(count, slimmedFilterFile)) msg.info("Matched %d lines from the master filter file against input events; wrote these to %s", count, slimmedFilterFile)
# Build up the right command line for acmd.py # Build up the right command line for acmd.py
self._cmd = ['acmd.py', 'filter-files'] self._cmd = ['acmd.py', 'filter-files']
......
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
__author__ = "Ayana.Holloway@cern.ch"
__doc__ = """Arguments for streaming batch test."""
from copy import copy
from PyJobTransformsCore.basic_trfarg import *
from RecJobTransforms.MixStreamConfig import *
class PartitionArg(IntegerArg):
"""Batch job ('partition') number of this job. The trf will calculate the run/lumi/sfo number, and the offset into the input file lists, from this."""
def __init__(self,help='default',name='default'):
IntegerArg.__init__(self,help,name)
self.__mixPartitioner = MixingPartitioner()
def metaData(self):
val = self.value()
if val is None:
return {}
else:
return { self.name() : self.value(),
"pseudoRun" : mixStreamConfig.WhichRunNumber(self.value()),
"pseudoSFO" : mixStreamConfig.WhichSFO(self.value()),
"pseudoLumiBlock" : mixStreamConfig.WhichLumiBlock(self.value()),
"pseudoLumiFraction" : mixStreamConfig.ThisLuminosityFraction(self.value()) }
def GetPartitioner(self):
return self.__mixPartitioner
def isFullArgument(self):
return True
def jobOrTask(self):
return 'job'
def preRunAction(self):
self.__mixPartitioner.SetPartition(self.value())
self.__mixPartitioner.ConfigureSelectors()
self.__mixPartitioner.preStageInputFiles()
def WritePartitionJobOptions(self):
EventMixerList, outstring = self.__mixPartitioner.ConfigureServices()
el = "',\n'"
sp = ", "
outstring += '\nprint ":::::::::: STREAMING JOB CONFIGURATION: LHC INSTANTANEOUS LUMINOSITY= %f x 10^%i cm^-2 s^-1"' % (self.__mixPartitioner.ScaleFactor(), 31)
outstring += "\nEventMixer.TriggerList += [ '%s' ]\n" % el.join(EventMixerList)
weighted = [str(i) for i in self.__mixPartitioner.ListOfSamplesToWeight()]
weights = [str(f) for f in self.__mixPartitioner.ListOfSampleFractions()]
if len(self.__mixPartitioner.ListOfSampleColonWeights()):
outstring += "\nWeightConfiguration = [ '%s' ]\n" % el.join(self.__mixPartitioner.ListOfSampleColonWeights())
else:
outstring += "\nWeightConfiguration = [ ]\n"
outstring += "\nListOfSamplesToWeight = [%s]\n" % sp.join(weighted)
outstring += "\nListOfSampleFractions = [%s]\n" % (sp).join(weights)
return outstring
#####################################
class FakeTableArg(BoolArg):
"""Toggle between simple trigger table and full table"""
def __init__(self,help='Toggle between simple trigger table and full table',name='default'):
BoolArg.__init__(self,help,name)
def isFullArgument(self):
return True
def jobOrTask(self):
return 'task'
# This sets up the trigger config for a BStoESD job
# to use the HLT output XML file generated by a previous BStoBS step
# and a fixed LVL1 file from the release which is broadly compatible
# with the 2009 cosmic runs.
# It is intended for special trigger reprocessing only.
# Contact: Sylvie Brunet, Clemencia Mora or other trigger configuration experts
##preInclude for all steps but enable only for RAWtoESD
from RecExConfig.RecFlags import rec
if rec.readRDO and rec.doESD:
from TriggerJobOpts.TriggerFlags import TriggerFlags as tf
tf.inputHLTconfigFile.set_Value_and_Lock("outputHLTconfig.xml")
tf.inputLVL1configFile.set_Value_and_Lock("TriggerMenuXML/LVL1config_Cosmic2009_v1_7-bit_trigger_types.xml")
tf.configForStartup.set_Value_and_Lock("HLToffline")
tf.configurationSourceList.set_Value_and_Lock(['xml'])
#don't set this in ESDtoAOD, it works with HLTonline since DS folders are stored in ESD metadata.
###############################################################
#
# Skeleton top job options for RAW->DESD
# Put here outputs that require rec.doDPD=True
#
# TODO: Review of options supported here...
#
#==============================================================
rec.doDPD = True
rec.DPDMakerScripts.append("PrimaryDPDMaker/PrimaryDPDMaker.py")
include( "RecExCommon/RecExCommon_topOptions.py" )
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment