Commit 5e3d12fd authored by Alex Pearce's avatar Alex Pearce
Browse files

Support filtered DST and MDF writing in single-threaded jobs.

Towards #55.
parent afd88954
......@@ -21,6 +21,7 @@ from Configurables import (
LHCb__DetDesc__ReserveDetDescForEvent as reserveIOV,
LHCb__Tests__FakeEventTimeProducer as DummyEventTime,
)
from PyConf.Algorithms import LHCb__MDFWriter, InputCopyStream, OutputStream
from Gaudi.Configuration import ConfigurableUser, DEBUG
from GaudiConf import IOHelper
from PRConfig.TestFileDB import test_file_db
......@@ -42,6 +43,29 @@ def _is_node(arg):
return isinstance(arg, CompositeNode)
def _output_writer(writer_cls, filename, **kwargs):
"""Return a configured file writer instance.
Parameters
----------
writer_cls : PyConf.components.Algorithm
File writer algorithm class to use. An instance of this is returned,
configured to write a file named `filename`.
filename : str
Path of the file to be written.
"""
if writer_cls in [InputCopyStream, OutputStream]:
output = "DATAFILE='{}' SVC='Gaudi::RootCnvSvc' OPT='RECREATE'".format(
filename)
kwargs['Output'] = output
elif writer_cls == LHCb__MDFWriter:
connection = 'file://{}'.format(filename)
kwargs['Connection'] = connection
else:
assert False, 'Do not know writer type {}'.format(writer_cls)
return writer_cls(**kwargs)
class PythonLoggingConf(ConfigurableUser):
"""Takes care of configuring the python logging verbosity."""
# Make sure we're applied before anything else by listing
......@@ -123,6 +147,10 @@ class EverythingHandler(object):
if debug:
self._scheduler.OutputLevel = DEBUG
self._hiveDataBroker.OutputLevel = DEBUG
# IOHelper instance used to configure input files
self._input_iohelper = None
# Output file writer algorithm instance
self._output_writer = None
# Instantiate the configurable which will set the python
# logging verbosity at the right time.
PythonLoggingConf()
......@@ -157,22 +185,16 @@ class EverythingHandler(object):
Algorithm(
CallgrindProfile, StartFromEventN=start, StopAtEventN=stop))
def setupInput(self,
inputFiles,
dataType,
DDDBTag,
CONDDBTag,
Simulation,
inputFileType,
outputFileType=None):
def setupInput(self, inputFiles, dataType, DDDBTag, CONDDBTag, Simulation,
inputFileType):
if inputFileType != 'MDF' and self._whiteboard.EventSlots > 1:
raise ConfigurationError(
"only MDF files can run in multithreaded mode, please change number of eventslots to 1"
)
self._appMgr.EvtSel = "EventSelector"
iohelper = IOHelper(inputFileType, outputFileType)
iohelper.setupServices()
evtSel = iohelper.inputFiles(inputFiles)
self._input_iohelper = IOHelper(inputFileType, None)
self._input_iohelper.setupServices()
evtSel = self._input_iohelper.inputFiles(inputFiles)
inputs = []
for inp in evtSel.Input:
inputs.append(inp + " IgnoreChecksum='YES'")
......@@ -188,20 +210,54 @@ class EverythingHandler(object):
})
setup_component('IODataManager', DisablePFNWarning=True)
def setupInputFromTestFileDB(self, testFileDBkey, InputFiles=[]):
# if you want to have another file but use the testfileDB qualifiers, then set InputFiles
def setupInputFromTestFileDB(self,
testFileDBkey,
inputFiles=None,
fileType=None):
"""Run from files defined by a TestFileDB key.
Parameters
----------
testFileDBkey
"""
if inputFiles is not None and fileType is None:
raise ValueError(
'Must specify fileType when inputFiles is specified')
# if you want to have another file but use the testfileDB qualifiers, then set inputFiles
qualifiers = test_file_db[testFileDBkey].qualifiers
dataType = qualifiers['DataType']
Simulation = qualifiers['Simulation']
fileType = 'ROOT' if qualifiers['Format'] != 'MDF' else qualifiers[
'Format']
fileType = fileType or 'ROOT' if qualifiers[
'Format'] != 'MDF' else qualifiers['Format']
CondDBTag = qualifiers['CondDB']
DDDBTag = qualifiers['DDDB']
if not InputFiles:
InputFiles = test_file_db[testFileDBkey].filenames
self.setupInput(InputFiles, dataType, DDDBTag, CondDBTag, Simulation,
if not inputFiles:
inputFiles = test_file_db[testFileDBkey].filenames
self.setupInput(inputFiles, dataType, DDDBTag, CondDBTag, Simulation,
fileType)
def setupOutput(self, filename, filetype):
# The input IOHelper will set up services we need
assert self._input_iohelper is not None, 'Must call setupInput before setupOutput'
assert self._whiteboard.EventSlots == 1, 'Cannot write output multithreaded'
assert self._scheduler.ThreadPoolSize == 1, 'Cannot write output multithreaded'
writers = {
'MDF': (LHCb__MDFWriter, dict(BankLocation='/Event/DAQ/RawEvent')),
# FIXME(AP): InputCopyStream is broken because it uses incidents to
# try to ensure all data it writes originated from the same input
# file
# 'ROOT': (InputCopyStream, dict()),
'ROOT': (OutputStream, dict(ItemList=['/Event/DAQ/RawEvent#1'])),
}
assert filetype in writers, 'Output filetype not supported: {}'.format(
filename)
writer_cls, configuration = writers[filetype]
self._output_writer = _output_writer(writer_cls, filename,
**configuration)
def _addAlg(self, alg):
if not is_algorithm(alg):
raise TypeError('{} is not of type Algorithm'.format(alg))
......@@ -220,7 +276,7 @@ class EverythingHandler(object):
elif is_algorithm(c):
self._addAlg(c)
def setup_default_controlflow(self, lines):
def setup_default_controlflow(self, lines, writer=None):
dec = CompositeNode(
'hlt_decision',
combineLogic=NodeLogic.NONLAZY_OR,
......@@ -229,7 +285,7 @@ class EverythingHandler(object):
hlt = CompositeNode(
'HLT',
combineLogic=NodeLogic.LAZY_AND,
children=[dec], # TODO maybe some writer here
children=[dec, writer] if writer is not None else [dec],
forceOrder=True)
self.addNode(hlt)
......@@ -263,7 +319,7 @@ class EverythingHandler(object):
the information it needs to schedule all algorithms.
"""
if self._lines: # if not, you probably set it up yoself?
self.setup_default_controlflow(self._lines)
self.setup_default_controlflow(self._lines, self._output_writer)
configuration = dataflow_config()
for alg in self._algs:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment