Skip to content
Snippets Groups Projects
Commit bde9733c authored by Walter Lampl's avatar Walter Lampl
Browse files

Merge branch 'InputSorting.postInclude' into 'master'

postInclude joboptions fragment for input event sorting

See merge request !45579
parents 057bdbb4 e5e49771
No related branches found
No related tags found
No related merge requests found
...@@ -13,6 +13,7 @@ class SortedCollectionCreator: ...@@ -13,6 +13,7 @@ class SortedCollectionCreator:
def __init__(self, name= "sortEvents"): def __init__(self, name= "sortEvents"):
from AthenaCommon import Logging from AthenaCommon import Logging
Logging.log.name = name Logging.log.name = name
self.name = name
self.info = Logging.log.info self.info = Logging.log.info
self.debug = Logging.log.debug self.debug = Logging.log.debug
self.verbose = Logging.log.verbose self.verbose = Logging.log.verbose
...@@ -76,12 +77,12 @@ class SortedCollectionCreator: ...@@ -76,12 +77,12 @@ class SortedCollectionCreator:
self.verbose('='*80) self.verbose('='*80)
self.info("Finished reading input collections, total events read: {}".format(len(self.allRows)) ) self.info("Finished reading input collections, total events read: {}".format(len(self.allRows)) )
def sortEvents(self, sortAttrName): def sortEvents(self, sortAttrName, sortReverse=False):
"""sort the events based on an attribute name""" """sort the events based on an attribute name"""
self.info("Sorting on attribute {}".format(sortAttrName)) self.info("Sorting on attribute {}, sort order {}".format(sortAttrName, ("Descending" if sortReverse else "Ascending") ))
# +1 because the Ref is first # add 1 to offsets because the Ref is first
attrPos = self.attrNames.index(sortAttrName)+1 attrPos = self.attrNames.index(sortAttrName)+1
self.allRows.sort( key=lambda t: t[attrPos] ) self.allRows.sort( key=lambda t: t[attrPos], reverse=sortReverse )
for t in self.allRows: for t in self.allRows:
self.verbose( t[1:], t[0] ) self.verbose( t[1:], t[0] )
...@@ -106,16 +107,20 @@ class SortedCollectionCreator: ...@@ -106,16 +107,20 @@ class SortedCollectionCreator:
dstColl.commit() dstColl.commit()
dstColl.close() dstColl.close()
def execute(self, inputCollections, outputCollection="PFN:collection.root", sortAttribute="LumiBlockN"): def execute(self, inputCollections, outputCollection="PFN:collection.root", sortAttribute="LumiBlockN", sortOrder="Ascending"):
self.info("Executing SortedCollectionCreator, inputs={}, output={}, sort on: {}" sort_opts = ("Ascending", "Descending")
.format(inputCollections, outputCollection, sortAttribute)) self.info("Executing SortedCollectionCreator, inputs={}, output='{}', sort by: {}, order: {}"
.format(inputCollections, outputCollection, sortAttribute, sortOrder))
if isinstance(inputCollections, str): if isinstance(inputCollections, str):
inputs = [inputCollections] inputs = [inputCollections]
else: else:
inputs = inputCollections inputs = inputCollections
if sortOrder.lower() not in [opt.lower() for opt in sort_opts]:
raise Exception(self.name + ": Accepted sortOrder values are: " + str(sort_opts))
sortReverse = ( sortOrder.lower()[0] == "d" )
self.loadRoot() self.loadRoot()
self.readInputCollections(inputs) self.readInputCollections(inputs)
self.sortEvents(sortAttribute) self.sortEvents(sortAttribute, sortReverse)
self.writeCollection(outputCollection) self.writeCollection(outputCollection)
# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
# Joboptions fragment that should be post-included after a typicaly configured Athena job
# It will read original input files, create a list of Events in memory, sort them and produce
# an intermediate Event Collection file that Athena will read instead of the original inputs
# Event information is read from EventInfoTags (stored by default in all Athena data files)
# The default sort key value (Lumi) can be oveerriden, as the sorting order
# The intermediate Collection file can be inspected using CollQuery cmdline utility
import os
from AthenaCommon.AppMgr import ServiceMgr
inputs = ServiceMgr.EventSelector.InputCollections
# set default sort parameters, read overrides from locals()
tmpCollFile = locals().get("AthenaInputSortCollName", "sortedEventRefs" + str(os.getpid()) )
sortTag = locals().get("AthenaInputSortTag", "LumiBlockN")
sortOrd = locals().get("AthenaInputSortOrder", "Ascending")
from CollectionUtilities.SortedCollectionCreator import *
sorter = SortedCollectionCreator(name="SortEvents")
# Sort Inputs based on one of the EventInfoTag attributes
# Store sorted event collection in a temporary file
# This should run as postInclude, so we assume EventSelector.InputCollections is set earlier
sorter.execute(inputs, outputCollection=tmpCollFile, sortAttribute=sortTag, sortOrder=sortOrd)
# Reading Events through References require a populated FileCatalog
for inpfile in inputs:
os.system('pool_insertFileToCatalog {}'.format(inpfile))
# Tell Athena to use the sorted collection instead of the original inputs
ServiceMgr.EventSelector.InputCollections = [tmpCollFile + ".root"]
ServiceMgr.EventSelector.CollectionType = "ExplicitROOT"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment