Skip to content
Snippets Groups Projects
Commit 9d661b79 authored by Alaettin Serhan Mete's avatar Alaettin Serhan Mete :eagle: Committed by Julien Maurer
Browse files

AthenaPoolCnvSvc: Simplify PoolWriteConfig

AthenaPoolCnvSvc: Simplify PoolWriteConfig
parent 874b0a80
No related branches found
No related tags found
2 merge requests!62244Daily merge of 23.0 into master,!62087AthenaPoolCnvSvc: Simplify PoolWriteConfig
"""Configuration for POOL file writing
Copyright (C) 2002-2022 CERN for the benefit of the ATLAS collaboration
Copyright (C) 2002-2023 CERN for the benefit of the ATLAS collaboration
"""
from AthenaConfiguration.AccumulatorCache import AccumulatorCache
......@@ -20,8 +20,25 @@ def _overrideTreeAutoFlush(logger, flags, stream, value):
return value
def _getStreamsFromFlags(flags):
"""
Helper to get all the streams from configuration flags
For each stream that's configured to be written out
we have two flags w/ the following convention:
+ Output.{STREAM}FileName
+ Output.doWrite{STREAM}
"""
result = []
for key, value in flags._flagdict.items():
if key.startswith("Output.") and key.endswith("FileName") and value.get():
stream = key.removeprefix("Output.").removesuffix("FileName")
if stream not in ["HIST"]: # AthenaPool is not responsible for HIST storage settings
result.append(stream)
return result
@AccumulatorCache
def PoolWriteCfg(flags, forceTreeAutoFlush=-1):
def PoolWriteCfg(flags):
"""Return ComponentAccumulator configured to Write POOL files"""
# based on WriteAthenaPool._configureWriteAthenaPool
......@@ -45,134 +62,98 @@ def PoolWriteCfg(flags, forceTreeAutoFlush=-1):
# Kept in sync with RecoUtils.py
from AthenaPoolCnvSvc import PoolAttributeHelper as pah
auto_flush = None
if flags.Output.EVNT_TRFileName:
# Default: Use LZMA w/ Level 1
# Temporary File: Use ZLIB w/ Level 1
comp_alg = 1 if flags.Output.EVNT_TRFileName.endswith('_000') or flags.Output.EVNT_TRFileName.startswith('tmp.') else 2
auto_flush = _overrideTreeAutoFlush(logger, flags, 'EVNT_TR', 1)
PoolAttributes += [ pah.setFileCompAlg( flags.Output.EVNT_TRFileName, comp_alg ) ]
PoolAttributes += [ pah.setFileCompLvl( flags.Output.EVNT_TRFileName, 1 ) ]
# Flush the CollectionTree, POOLContainer, and POOLContainerForm to disk at every 1 events
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.EVNT_TRFileName, "CollectionTree", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.EVNT_TRFileName, "POOLContainer", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.EVNT_TRFileName, "POOLContainerForm", auto_flush ) ]
if flags.Output.HITSFileName:
# Default: Use LZMA w/ Level 1
# Temporary File: Use ZLIB w/ Level 1
comp_alg = 1 if flags.Output.HITSFileName.endswith('_000') or flags.Output.HITSFileName.startswith('tmp.') else 2
auto_flush = _overrideTreeAutoFlush(logger, flags, 'HITS', 10)
PoolAttributes += [ pah.setFileCompAlg( flags.Output.HITSFileName, comp_alg ) ]
PoolAttributes += [ pah.setFileCompLvl( flags.Output.HITSFileName, 1 ) ]
# Flush the CollectionTree, POOLContainer, and POOLContainerForm to disk at every 1 events
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.HITSFileName, "CollectionTree", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.HITSFileName, "POOLContainer", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.HITSFileName, "POOLContainerForm", auto_flush ) ]
if flags.Output.RDOFileName:
# Default: Use LZMA w/ Level 1
# Temporary File: Use ZLIB w/ Level 1
comp_alg = 1 if flags.Output.RDOFileName.endswith('_000') or flags.Output.RDOFileName.startswith('tmp.') else 2
auto_flush = _overrideTreeAutoFlush(logger, flags, 'RDO', 10)
PoolAttributes += [ pah.setFileCompAlg( flags.Output.RDOFileName, comp_alg ) ]
PoolAttributes += [ pah.setFileCompLvl( flags.Output.RDOFileName, 1 ) ]
# Flush the CollectionTree, POOLContainer, and POOLContainerForm to disk at every 10 events
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.RDOFileName, "CollectionTree", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.RDOFileName, "POOLContainer", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.RDOFileName, "POOLContainerForm", auto_flush ) ]
if flags.Output.ESDFileName:
# Default: Use LZMA w/ Level 1
# Temporary File: Use ZLIB w/ Level 1
comp_alg = 1 if flags.Output.ESDFileName.endswith('_000') or flags.Output.ESDFileName.startswith('tmp.') else 2
auto_flush = _overrideTreeAutoFlush(logger, flags, 'ESD', 10)
PoolAttributes += [ pah.setFileCompAlg( flags.Output.ESDFileName, comp_alg ) ]
PoolAttributes += [ pah.setFileCompLvl( flags.Output.ESDFileName, 1 ) ]
# Flush the CollectionTree, POOLContainer, and POOLContainerForm to disk at every 10 events
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.ESDFileName, "CollectionTree", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.ESDFileName, "POOLContainer", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.ESDFileName, "POOLContainerForm", auto_flush ) ]
if flags.Output.AODFileName:
# Default: Use LZMA w/ Level 1
# Temporary File: Use ZLIB w/ Level 1
comp_alg = 1 if flags.Output.AODFileName.endswith('_000') or flags.Output.AODFileName.startswith('tmp.') else 2
auto_flush = _overrideTreeAutoFlush(logger, flags, 'AOD', 100)
PoolAttributes += [ pah.setFileCompAlg( flags.Output.AODFileName, comp_alg ) ]
PoolAttributes += [ pah.setFileCompLvl( flags.Output.AODFileName, 1 ) ]
# By default use a maximum basket buffer size of 128k and minimum buffer entries of 10
PoolAttributes += [ pah.setMaxBufferSize( flags.Output.AODFileName, "131072" ) ]
PoolAttributes += [ pah.setMinBufferEntries( flags.Output.AODFileName, "10" ) ]
# Flush the CollectionTree, POOLContainer, and POOLContainerForm to disk at every 100 events
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.AODFileName, "CollectionTree", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.AODFileName, "POOLContainer", auto_flush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( flags.Output.AODFileName, "POOLContainerForm", auto_flush ) ]
# Derivation framework output settings
use_parallel_compression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
max_auto_flush = auto_flush if auto_flush else -1
for flag in [key for key in flags._flagdict.keys() if (("Output.DAOD_" in key or "Output.D2AOD_" in key) and "FileName" in key)]:
# Since there may be several outputs, this has to be done in a loop
file_name = flags._flagdict[flag]._value
# Figure out if this is an augmentation child stream
# If so we need to set things up a bit differently
# because for now augmentations do not 'own' an output file
# Therefore, some setting do not apply, such as file-level
# compression etc. This might change in the future
stream_name = flag[7:-8] # Here we rely on the convention of Output.STREAMFileName
is_augmentation_child = flags.hasFlag(f"Output.{stream_name}ParentStream")
if not is_augmentation_child:
# Use ZSTD w/ Level 5 for DAODs
PoolAttributes += [ pah.setFileCompAlg( file_name, "5" ) ]
PoolAttributes += [ pah.setFileCompLvl( file_name, "5" ) ]
# By default use a maximum basket buffer size of 128k and minimum buffer entries of 10
PoolAttributes += [ pah.setMaxBufferSize( file_name, "131072" ) ]
PoolAttributes += [ pah.setMinBufferEntries( file_name, "10" ) ]
# Defaults for common formats
# Stream : [compression algorithm, compression level, auto flush, split level, dyn split level]
defaults = {
"EVNT_TR" : [2, 1, 1, 0, 0],
"HITS" : [2, 1, 10, 0, 0],
"RDO" : [2, 1, 10, 0, 0],
"ESD" : [2, 1, 10, 0, 0],
"AOD" : [2, 1, 100, 0, 0],
"DAOD_PHYSVAL" : [5, 5, 100, 0, 1],
"DAOD_PHYS" : [5, 5, 500, 0, 1],
"DAOD_PHYSLITE" : [5, 5, 1000, 1, 1],
"D2AOD_PHYSLITE" : [5, 5, 1000, 1, 1],
}
# Loop over all streams and set the appropriate attributes
maxAutoFlush = -1
for stream in _getStreamsFromFlags(flags):
# Get the file name - Guaranteed to exist at this point
fileName = getattr(flags.Output, f"{stream}FileName")
# Get the ROOT settings to be applied
compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 2, 1, 10, 0, 0 # Defaults: LZMA, Level 1, AutoFlush 10, No Splitting
if stream in defaults:
compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = defaults[stream]
elif "DAOD" in stream:
compAlg, compLvl, autoFlush, splitLvl, dynSplitLvl = 5, 5, 100, 0, 1 # Change the defaults for DAODs
# For temporary files we always use ZLIB for compression algorithm
compAlg = 1 if fileName.endswith('_000') or fileName.startswith('tmp.') else compAlg
# See if the user asked for the AutoFlush to be overwritten
autoFlush = _overrideTreeAutoFlush(logger, flags, stream, autoFlush)
# Print some debugging information
logger.debug(f"{fileName=} {stream=} {compAlg=} {compLvl=} {autoFlush=} {splitLvl=} {dynSplitLvl=}")
# Set the Collection/Container prefixes (make configurable?)
outputCollection = "POOLContainer"
poolContainerPrefix = "CollectionTree"
# Check to see if this stream is an augmentation
# Only set file-level attributes for the owning stream
isAugmentation = flags.hasFlag(f"Output.{stream}ParentStream")
if not isAugmentation:
# Set the Compression attributes
PoolAttributes += [ pah.setFileCompAlg( fileName, compAlg ) ]
PoolAttributes += [ pah.setFileCompLvl( fileName, compLvl ) ]
# By default use a maximum basket buffer size of 128k and minimum buffer entries of 10 for (D)AODs
if "AOD" in stream:
PoolAttributes += [ pah.setMaxBufferSize( fileName, "131072" ) ]
PoolAttributes += [ pah.setMinBufferEntries( fileName, "10" ) ]
else:
# Changes in this else block need to be coordinated w/ OutputStreamConfig!
# Set the index and friend tree information
PoolAttributes += [ f"DatabaseName = '{file_name}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
PoolAttributes += [ f"DatabaseName = '{file_name}'; FRIEND_TREE = 'CollectionTree:CollectionTree_{stream_name}'" ]
# By default use 20 MB AutoFlush [or 100 (10) events for DAODs (everything else) for SharedWriter w/ parallel compression]
# for event data except for a number of select formats (see below)
auto_flush = -20000000
if use_parallel_compression:
auto_flush = 100 if "DAOD_" in stream_name else 10
# By default use split-level 0 except for DAOD_PHYSLITE which is maximally split
split_level = 0
if stream_name in ["DAOD_PHYS"]:
auto_flush = 500
if stream_name in ["DAOD_PHYSLITE", "D2AOD_PHYSLITE"]:
auto_flush = 1000
split_level = 1
if stream_name in ["DAOD_PHYSVAL"]:
auto_flush = 100
# override if needed
auto_flush = _overrideTreeAutoFlush(logger, flags, stream_name, auto_flush)
tree_name = "CollectionTree" if not is_augmentation_child else f"CollectionTree_{stream_name}"
PoolAttributes += [ pah.setTreeAutoFlush( file_name, tree_name, auto_flush ) ]
PoolAttributes += [ pah.setContainerSplitLevel( file_name, tree_name, split_level ) ]
PoolAttributes += [ pah.setContainerSplitLevel( file_name, "Aux.", split_level ) ]
PoolAttributes += [ pah.setContainerSplitLevel( file_name, "Dyn.", 1 ) ]
PoolAttributes += [ f"DatabaseName = '{fileName}'; INDEX_MASTER = 'POOLContainer(DataHeader)'" ]
PoolAttributes += [ f"DatabaseName = '{fileName}'; FRIEND_TREE = '{poolContainerPrefix}:{poolContainerPrefix}_{stream}'" ]
# Set the Collection/Container prefixes
outputCollection += f"_{stream}"
poolContainerPrefix += f"_{stream}"
# Set the AutoFlush attributes
PoolAttributes += [ pah.setTreeAutoFlush( fileName, poolContainerPrefix, autoFlush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( fileName, outputCollection, autoFlush ) ]
PoolAttributes += [ pah.setTreeAutoFlush( fileName, "POOLContainerForm", autoFlush ) ]
# Set the Spit Level attributes
PoolAttributes += [ pah.setContainerSplitLevel( fileName, poolContainerPrefix, splitLvl ) ]
PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Aux.", splitLvl ) ]
PoolAttributes += [ pah.setContainerSplitLevel( fileName, "Dyn.", dynSplitLvl ) ]
# Find the maximum AutoFlush across all formats
if use_parallel_compression and auto_flush > max_auto_flush:
max_auto_flush = auto_flush
maxAutoFlush = max(maxAutoFlush, autoFlush)
# If we don't have "enough" events, disable parallelCompression if we're using SharedWriter
# In this context, "enough" means each worker has a chance to make at least one flush to the disk
if use_parallel_compression:
useParallelCompression = flags.MP.UseSharedWriter and flags.MP.UseParallelCompression
if useParallelCompression:
# Now compute the total number of events this job will process
requested_events = flags.Exec.MaxEvents
available_events = flags.Input.FileNentries - flags.Exec.SkipEvents
total_entries = available_events if requested_events == -1 else min( available_events, requested_events )
if ( total_entries > 0 ) and ( max_auto_flush > 0 ) and ( max_auto_flush * flags.Concurrency.NumProcs >= total_entries ):
requestedEvents = flags.Exec.MaxEvents
availableEvents = flags.Input.FileNentries - flags.Exec.SkipEvents
totalEntries = availableEvents if requestedEvents == -1 else min( availableEvents, requestedEvents )
if ( totalEntries > 0 ) and ( maxAutoFlush > 0 ) and ( maxAutoFlush * flags.Concurrency.NumProcs >= totalEntries ):
logger.info( "Not enough events to process, disabling parallel compression for SharedWriter!" )
logger.info( f"Processing {total_entries} events in {flags.Concurrency.NumProcs} workers "
f"and a maximum (across all outputs) AutoFlush of {max_auto_flush}")
use_parallel_compression = False
logger.info( f"Processing {totalEntries} events in {flags.Concurrency.NumProcs} workers "
f"and a maximum (across all outputs) AutoFlush of {maxAutoFlush}")
useParallelCompression = False
from AthenaPoolCnvSvc.PoolCommonConfig import AthenaPoolCnvSvcCfg
return AthenaPoolCnvSvcCfg(flags,
PoolAttributes=PoolAttributes,
ParallelCompression=use_parallel_compression,
ParallelCompression=useParallelCompression,
StorageTechnology=flags.Output.StorageTechnology)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment