Commit 118fc9b0 authored by Rosen Matev's avatar Rosen Matev
Browse files

Merge branch 'apearce-hlt1-persistency-start' into 'master'

Support filtered writing of  MDF and DST files

See merge request lhcb/Moore!221
parents a6b87c4d 8735fc69
......@@ -8,7 +8,7 @@
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
from PyConf.environment import EverythingHandler
from PyConf.environment import EverythingHandler, setupInputFromTestFileDB
from PyConf.Algorithms import FTRawBankDecoder
from RecoConf.hlt1_tracking import require_gec
from Hlt1Conf.lines.track_mva import (one_track_mva_line, two_track_mva_line,
......@@ -29,6 +29,6 @@ with FTRawBankDecoder.bind(DecodingVersion=ftdec_v), \
for name, builder in builders.items():
env.registerLine(name, builder())
env.setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
env.configure()
# env.plotDataFlow()
###############################################################################
# (c) Copyright 2019 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
"""Read a DST file created by the `hlt1_dst_output.py` options."""
from PyConf.environment import setupInputFromTestFileDB
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI',
['test_hlt1_persistence_dst_write.dst'], 'ROOT')
###############################################################################
# (c) Copyright 2019 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
"""Write an HLT1-filtered DST file."""
from PyConf.environment import setupOutput
setupOutput('test_hlt1_persistence_dst_write.dst', 'ROOT')
###############################################################################
# (c) Copyright 2019 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
"""Read an MDF file created by the `hlt1_mdf_output.py` options."""
from PyConf.environment import setupInputFromTestFileDB
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI',
['test_hlt1_persistence_mdf_write.mdf'], 'MDF')
###############################################################################
# (c) Copyright 2019 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
"""Write an HLT1-filtered MDF file."""
from PyConf.environment import setupOutput
setupOutput('test_hlt1_persistence_mdf_write.mdf', 'MDF')
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<!--
(c) Copyright 2000-2018 CERN for the benefit of the LHCb Collaboration
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
In applying this licence, CERN does not waive the privileges and immunities
granted to it by virtue of its status as an Intergovernmental Organization
or submit itself to any jurisdiction.
-->
<!--
Run HLT1 on an HLT1-filtered DST file.
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="prerequisites"><set>
<tuple><text>persistency.dst_write</text><enumeral>PASS</enumeral></tuple>
</set></argument>
<argument name="program"><text>gaudirun.py</text></argument>
<argument name="args"><set>
<text>$HLT1CONFROOT/options/hlt1_example.py</text>
<text>$HLT1CONFROOT/tests/options/hlt1_dst_input.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="validator"><text>
countErrorLines({"FATAL": 0, "ERROR": 0, "WARNING": 0})
import re
pattern = re.compile(r'\s+NONLAZY_OR: hlt_decision\s+#=(\d+)\s+Sum=(\d+)')
# Check that:
# 1. We processed the same number of events as filtered by the previous job
# 2. We made the same number of positive decisions (which should be 100% of input events)
nread = nselected = -1
for line in stdout.split('\n'):
m = re.match(pattern, line)
if m:
nread, nselected = map(int, m.groups())
break
else:
causes.append('could not parse event statistics from reader stdout')
# We're running the same HLT1 configuration as the one that created the
# filtered data, so all input events should have a positive decision
if nread != nselected:
causes.append('expected all input events to pass')
nread_writing = nselected_writing = -1
with open('test_hlt1_persistence_dst_write.stdout') as f_ref:
for line in f_ref.readlines():
m = re.match(pattern, line)
if m:
nread_writing, nselected_writing = map(int, m.groups())
break
else:
causes.append('could not parse event statistics from writer stdout')
if nread != nselected_writing:
causes.append('did not read the same number of events as written out')
</text></argument>
</extension>
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<!--
(c) Copyright 2000-2018 CERN for the benefit of the LHCb Collaboration
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
In applying this licence, CERN does not waive the privileges and immunities
granted to it by virtue of its status as an Intergovernmental Organization
or submit itself to any jurisdiction.
-->
<!--
Run HLT1 and save an DST file.
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="program"><text>gaudirun.py</text></argument>
<argument name="args"><set>
<text>$HLT1CONFROOT/options/hlt1_example.py</text>
<text>$HLT1CONFROOT/tests/options/hlt1_dst_output.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="validator"><text>
# Expect a single WARNING:
# HiveDataBrokerSvc WARNING non-reentrant algorithm: OutputStream
countErrorLines({"FATAL": 0, "ERROR": 0, "WARNING": 1})
import re
pattern = re.compile(r'\s+NONLAZY_OR: hlt_decision\s+#=(\d+)\s+Sum=(\d+)')
# Check that:
# 1. We read at least two events
# 2. We make a positive decision on at least one event
# 3. We make a negative decision on at least one event
nread = nselected = -1
for line in stdout.split('\n'):
m = re.match(pattern, line)
if m:
nread, nselected = map(int, m.groups())
break
else:
causes.append('could not parse event statistics from stdout')
if nread &lt; 2:
causes.append('expected at least two events to be processed')
if nselected &lt; 1:
causes.append('expected at least one event to be selected')
if nselected == nread:
causes.append('expected at least one event to be filtered out')
# Write out the log file so that we can compare the number of
# selected events here with the number of events processed by
# a second HLT1 job that uses the output file as input
with open('test_hlt1_persistence_dst_write.stdout', 'w') as f:
f.write(stdout)
</text></argument>
</extension>
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<!--
(c) Copyright 2000-2018 CERN for the benefit of the LHCb Collaboration
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
In applying this licence, CERN does not waive the privileges and immunities
granted to it by virtue of its status as an Intergovernmental Organization
or submit itself to any jurisdiction.
-->
<!--
Run HLT1 on an HLT1-filtered MDF file.
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="prerequisites"><set>
<tuple><text>persistency.mdf_write</text><enumeral>PASS</enumeral></tuple>
</set></argument>
<argument name="program"><text>gaudirun.py</text></argument>
<argument name="args"><set>
<text>$HLT1CONFROOT/options/hlt1_example.py</text>
<text>$HLT1CONFROOT/tests/options/hlt1_mdf_input.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="validator"><text>
countErrorLines({"FATAL": 0, "ERROR": 0, "WARNING": 0})
import re
pattern = re.compile(r'\s+NONLAZY_OR: hlt_decision\s+#=(\d+)\s+Sum=(\d+)')
# Check that:
# 1. We processed the same number of events as filtered by the previous job
# 2. We made the same number of positive decisions (which should be 100% of input events)
nread = nselected = -1
for line in stdout.split('\n'):
m = re.match(pattern, line)
if m:
nread, nselected = map(int, m.groups())
break
else:
causes.append('could not parse event statistics from reader stdout')
# We're running the same HLT1 configuration as the one that created the
# filtered data, so all input events should have a positive decision
if nread != nselected:
causes.append('expected all input events to pass')
nread_writing = nselected_writing = -1
with open('test_hlt1_persistence_mdf_write.stdout') as f_ref:
for line in f_ref.readlines():
m = re.match(pattern, line)
if m:
nread_writing, nselected_writing = map(int, m.groups())
break
else:
causes.append('could not parse event statistics from writer stdout')
if nread != nselected_writing:
causes.append('did not read the same number of events as written out')
</text></argument>
</extension>
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE extension PUBLIC '-//QM/2.3/Extension//EN' 'http://www.codesourcery.com/qm/dtds/2.3/-//qm/2.3/extension//en.dtd'>
<!--
(c) Copyright 2000-2018 CERN for the benefit of the LHCb Collaboration
This software is distributed under the terms of the GNU General Public
Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".
In applying this licence, CERN does not waive the privileges and immunities
granted to it by virtue of its status as an Intergovernmental Organization
or submit itself to any jurisdiction.
-->
<!--
Run HLT1 and save an MDF file.
-->
<extension class="GaudiTest.GaudiExeTest" kind="test">
<argument name="program"><text>gaudirun.py</text></argument>
<argument name="args"><set>
<text>$HLT1CONFROOT/options/hlt1_example.py</text>
<text>$HLT1CONFROOT/tests/options/hlt1_mdf_output.py</text>
</set></argument>
<argument name="use_temp_dir"><enumeral>true</enumeral></argument>
<argument name="validator"><text>
# Expect a single WARNING:
# HiveDataBrokerSvc WARNING non-reentrant algorithm: LHCb::MDFWriter/LHCb__MDFWriter
countErrorLines({"FATAL": 0, "ERROR": 0, "WARNING": 1})
import re
pattern = re.compile(r'\s+NONLAZY_OR: hlt_decision\s+#=(\d+)\s+Sum=(\d+)')
# Check that:
# 1. We read at least two events
# 2. We make a positive decision on at least one event
# 3. We make a negative decision on at least one event
nread = nselected = -1
for line in stdout.split('\n'):
m = re.match(pattern, line)
if m:
nread, nselected = map(int, m.groups())
break
else:
causes.append('could not parse event statistics from stdout')
if nread &lt; 2:
causes.append('expected at least two events to be processed')
if nselected &lt; 1:
causes.append('expected at least one event to be selected')
if nselected == nread:
causes.append('expected at least one event to be filtered out')
# Write out the log file so that we can compare the number of
# selected events here with the number of events processed by
# a second HLT1 job that uses the output file as input
with open('test_hlt1_persistence_mdf_write.stdout', 'w') as f:
f.write(stdout)
</text></argument>
</extension>
......@@ -16,7 +16,7 @@ Run like any other options file:
"""
from __future__ import absolute_import, division, print_function
from PyConf.environment import EverythingHandler
from PyConf.environment import EverythingHandler, setupInput
from PyConf.Algorithms import FTRawBankDecoder
......@@ -47,7 +47,7 @@ raw_event_format = 4.3
env = EverythingHandler(
threadPoolSize=1, nEventSlots=1, evtMax=100, debug=False)
env.setupInput(
setupInput(
input_files,
dataType='Upgrade',
DDDBTag='dddb-20171126',
......
......@@ -14,7 +14,7 @@ from RecoConf.hlt1_tracking import (require_gec, require_pvs, make_pvs,
make_velokalman_fitted_tracks)
from PyConf.Algorithms import FTRawBankDecoder
from PyConf.environment import EverythingHandler
from PyConf.environment import EverythingHandler, setupInputFromTestFileDB
ftdec_v = 4
......@@ -31,5 +31,5 @@ def hlt1_full_track_reco_line():
env = EverythingHandler(
threadPoolSize=1, nEventSlots=1, evtMax=1000, debug=True)
env.registerLine('Hlt1_reco_baseline', algs=hlt1_full_track_reco_line())
env.setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
env.configure()
......@@ -9,7 +9,7 @@
# or submit itself to any jurisdiction. #
###############################################################################
from PyConf.environment import EverythingHandler
from PyConf.environment import EverythingHandler, setupInputFromTestFileDB
from PyConf.Algorithms import FTRawBankDecoder
from RecoConf.hlt1_tracking import (
......@@ -34,5 +34,5 @@ def hlt2_full_track_reco_line():
env = EverythingHandler(
threadPoolSize=1, nEventSlots=1, evtMax=100, debug=True)
env.registerLine("Hlt2_reco_baseline", algs=hlt2_full_track_reco_line())
env.setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
env.configure()
......@@ -9,7 +9,7 @@
# or submit itself to any jurisdiction. #
###############################################################################
from PyConf.environment import EverythingHandler
from PyConf.environment import EverythingHandler, setupInputFromTestFileDB
from RecoConf.hlt1_tracking import (
require_gec,
require_pvs,
......@@ -35,5 +35,5 @@ env = EverythingHandler(
threadPoolSize=1, nEventSlots=1, evtMax=100, debug=True)
env.registerLine(
"Hlt2_reco_full_geometry", algs=hlt2_full_geometry_track_reco_line())
env.setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
env.configure()
......@@ -17,7 +17,7 @@ from RecoConf.hlt1_tracking import (
)
from PyConf.Algorithms import (FTRawBankDecoder)
from RecoConf.mc_checking import (monitor_track_efficiency)
from PyConf.environment import (EverythingHandler)
from PyConf.environment import (EverythingHandler, setupInputFromTestFileDB)
ftdec_v = 4
......@@ -38,5 +38,5 @@ env = EverythingHandler(
debug=True,
HistoFile="MCMatching_Hlt1ForwardTracking.root")
env.registerLine('MCMatching_Hlt1ForwardTracking', algs=mc_matching_line())
env.setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
setupInputFromTestFileDB('MiniBrunel_2018_MinBias_FTv4_DIGI')
env.configure()
......@@ -21,6 +21,7 @@ from Configurables import (
LHCb__DetDesc__ReserveDetDescForEvent as reserveIOV,
LHCb__Tests__FakeEventTimeProducer as DummyEventTime,
)
from Configurables import LHCb__MDFWriter, InputCopyStream, OutputStream
from Gaudi.Configuration import ConfigurableUser, DEBUG
from GaudiConf import IOHelper
from PRConfig.TestFileDB import test_file_db
......@@ -36,12 +37,144 @@ __all__ = [
]
log = logging.getLogger(__name__)
# Writer algorithm and configuration used for a given file type
KNOWN_WRITERS = {
'MDF': (LHCb__MDFWriter, dict(BankLocation='/Event/DAQ/RawEvent')),
# FIXME(AP): InputCopyStream is broken because it uses incidents to
# try to ensure all data it writes originated from the same input
# file
# 'ROOT': (InputCopyStream, dict()),
'ROOT': (OutputStream, dict(ItemList=['/Event/DAQ/RawEvent#1'])),
}
def _is_node(arg):
return isinstance(arg, CompositeNode)
def _output_writer(writer_cls, filename, **kwargs):
"""Return a configured file writer instance.
Parameters
----------
writer_cls : PyConf.components.Algorithm
File writer algorithm class to use. An instance of this is returned,
configured to write a file named `filename`.
filename : str
Path of the file to be written.
"""
if writer_cls in [InputCopyStream, OutputStream]:
output = "DATAFILE='{}' SVC='Gaudi::RootCnvSvc' OPT='RECREATE'".format(
filename)
kwargs['Output'] = output
elif writer_cls == LHCb__MDFWriter:
connection = 'file://{}'.format(filename)
kwargs['Connection'] = connection
else:
assert False, 'Do not know writer type {}'.format(writer_cls)
return writer_cls(**kwargs)
def setupInput(inputFiles, dataType, DDDBTag, CONDDBTag, Simulation,
inputFileType):
# FIXME(AP) we need to modify the ApplicationMgr and query the
# HiveWhiteBoard held by the EverythingHandler; the former modifies global
# state which is not ideal
whiteboard = setup_component('HiveWhiteBoard', instanceName='EventDataSvc')
if inputFileType != 'MDF' and whiteboard.EventSlots > 1:
raise ConfigurationError(
"only MDF files can run in multithreaded mode, please change number of eventslots to 1"
)
setup_component(
'ApplicationMgr',
packageName='Gaudi.Configuration',
EvtSel="EventSelector")
input_iohelper = IOHelper(inputFileType, None)
input_iohelper.setupServices()
evtSel = input_iohelper.inputFiles(inputFiles, clear=True)
inputs = []
for inp in evtSel.Input:
inputs.append(inp + " IgnoreChecksum='YES'")
evtSel.Input = inputs
evtSel.PrintFreq = 10000
setup_component('DDDBConf', Simulation=Simulation, DataType=dataType)
setup_component(
'CondDB', Upgrade=True, Tags={
'DDDB': DDDBTag,
'SIMCOND': CONDDBTag
})
setup_component('IODataManager', DisablePFNWarning=True)
def setupInputFromTestFileDB(testFileDBkey, inputFiles=None, fileType=None):
"""Run from files defined by a TestFileDB key.
Parameters
----------
testFileDBkey : str
inputFiles : list of str
Overrides the input files defined by the TestFileDB entry.
fileType : str
"""
if inputFiles is not None and fileType is None:
raise ValueError('Must specify fileType when inputFiles is specified')
qualifiers = test_file_db[testFileDBkey].qualifiers
dataType = qualifiers['DataType']
Simulation = qualifiers['Simulation']
fileType = fileType or 'ROOT' if qualifiers[
'Format'] != 'MDF' else qualifiers['Format']
CondDBTag = qualifiers['CondDB']
DDDBTag = qualifiers['DDDB']
if not inputFiles:
inputFiles = test_file_db[testFileDBkey].filenames
setupInput(inputFiles, dataType, DDDBTag, CondDBTag, Simulation, fileType)
def setupOutput(filename, filetype):
"""Configure the application to write out a file.
Only the raw event, under `/Event/DAQ/RawEvent`, is persisted.
Must be called after `EverythingHandler.configure` has been called.
Parameters
----------
filename : str
filetype : str
One of the keys of `KNOWN_WRITERS`.
"""
# FIXME(AP) we need to modify the HLTControlFlowMgr and query the
# HiveWhiteBoard held by the EverythingHandler; the former modifies global
# state which is not ideal
whiteboard = setup_component('HiveWhiteBoard', instanceName='EventDataSvc')
scheduler = setup_component('HLTControlFlowMgr')
assert whiteboard.EventSlots == 1, 'Cannot write output multithreaded'
assert scheduler.ThreadPoolSize == 1, 'Cannot write output multithreaded'
assert filetype in KNOWN_WRITERS, 'Output filetype not supported: {}'.format(
filename)
writer_cls, configuration = KNOWN_WRITERS[filetype]
writer = _output_writer(writer_cls, filename, **configuration)
# FIXME(AP) Hack to add the writer to the overall control flow
# This causes the writer to only run when the HLT decision is positive
for node_name, _, members, _ in scheduler.CompositeCFNodes:
if node_name == 'HLT':
assert len(members) == 1, 'setupOutput called twice'
members.append(writer.getFullName())
break
else:
raise ConfigurationError(
'Must called EverythingHandler.configure before setupOutput')
hiveDataBroker = setup_component('HiveDataBrokerSvc')
hiveDataBroker.DataProducers.append(writer.getFullName())
return writer
class PythonLoggingConf(ConfigurableUser):
"""Takes care of configuring the python logging verbosity."""
# Make sure we're applied before anything else by listing
......@@ -157,51 +290,6 @@ class EverythingHandler(object):
Algorithm(
CallgrindProfile, StartFromEventN=start, StopAtEventN=stop))
def setupInput(self,
inputFiles,
dataType,
DDDBTag,
CONDDBTag,
Simulation,
inputFileType,
outputFileType=None):
if inputFileType != 'MDF' and self._whiteboard.EventSlots > 1:
raise ConfigurationError(
"only MDF files can run in multithreaded mode, please change number of eventslots to 1"
)
self._appMgr.EvtSel = "EventSelector"
iohelper = IOHelper(inputFileType, outputFileType)
iohelper.setupServices()
evtSel = iohelper.inputFiles(inputFiles)
inputs = []
for inp in evtSel.Input:
inputs.append(inp + " IgnoreChecksum='YES'")
evtSel.Input = inputs
evtSel.PrintFreq = 10000
setup_component('DDDBConf', Simulation=Simulation, DataType=dataType)
setup_component(
'CondDB',
Upgrade=True,
Tags={
'DDDB': DDDBTag,
'SIMCOND': CONDDBTag
})
setup_component('IODataManager', DisablePFNWarning=True)
def setupInputFromTestFileDB(self, testFileDBkey, InputFiles=[]):