Skip to content
Snippets Groups Projects
Commit da59555c authored by Alex Pearce's avatar Alex Pearce
Browse files

Support a single anonymous stream (AKA 'no streaming').

parent 7f3895c2
No related branches found
No related tags found
2 merge requests!85merge master to future,!66Configuration of Tesla with selective persistence
......@@ -714,9 +714,9 @@ class Tesla(LHCbConfigurableUser):
online = mode == 'Online'
raw_format = self.getProp('RawFormatVersion')
if not streams and not simulation:
log.warning('Not streaming and not running on MC, nothing to do!')
return
# Treat the no-streaming case as a single anonymous stream
if not streams:
streams = {'': {'lines': self.getProp('TriggerLines')}}
# Setting up online
decoders_seq = GaudiSequencer('TeslaDecoders')
......@@ -755,12 +755,12 @@ class Tesla(LHCbConfigurableUser):
)
unpackers_seq.Members.append(unpack_psandvs)
# TODO cover the no-streams case, when `streams == {}`
stream_sequences = []
for stream_name in streams:
stream_seq = self._configureOutputTurboSPStream(
stream_name,
streams[stream_name]['lines']
streams[stream_name]['lines'],
prpacking
)
stream_sequences.append(stream_seq)
......@@ -768,7 +768,7 @@ class Tesla(LHCbConfigurableUser):
address_killer = AddressKillerAlg()
streaming_seq = GaudiSequencer(
'TeslaStreamSequence',
'TeslaStreamsSequence',
Members=(
[decoders_seq, unpackers_seq] +
stream_sequences +
......@@ -778,13 +778,20 @@ class Tesla(LHCbConfigurableUser):
)
return streaming_seq
def _configureOutputTurboSPStream(name, lines):
"""Copy line outputs for this stream to a stream-specific location.
def _configureOutputTurboSPStream(self, name, lines, packing):
"""Return a sequence for streaming the lines into the named location.
Copy line outputs for this stream to a stream-specific location.
Keyword arguments:
name -- Name of the stream
lines -- List of HLT2 line names that belong in the stream
packing -- Instance of PersistRecoPacking
"""
output_prefix = name.title()
decisions = [l + 'Decision' for l in lines]
# Naming convention for stream-specific algorithms
namer = lambda x: '{0}ForStream{1}'.format(x, name)
namer = lambda x: '{0}ForStream{1}'.format(x, output_prefix)
pack = self.getProp('Pack')
write_fsr = self.getProp('WriteFSR')
......@@ -798,8 +805,12 @@ class Tesla(LHCbConfigurableUser):
namer('Hlt2_HLTFilter'),
Code=filter_code
)
stream_seq.Members.append(filter)
# Don't filter an anonymous stream by default
if output_prefix or self.getProp('HDRFilter'):
stream_seq.Members.append(filter)
# No need to clone if the output prefix is empty (everything stays
# under /Event/Turbo)
# XXX change this to the TCK svc once finished debugging
persistence_svc = HltLinePersistenceSvc()
container_cloner = CopyLinePersistenceLocations(
......@@ -822,41 +833,47 @@ class Tesla(LHCbConfigurableUser):
],
IgnoreFilterPassed=True
)
stream_seq.Members.append(copy_line_outputs_seq)
if output_prefix:
stream_seq.Members.append(copy_line_outputs_seq)
tes_root = '/Event'
# /Event/<stream name>
stream_base = os.path.join(tes_root, output_prefix).rstrip('/')
required_output_locations = [
'/Event/DAQ/ODIN#1',
'/Event/Rec/Summary#1'
os.path.join(tes_root, 'DAQ/ODIN#1'),
os.path.join(tes_root, 'Rec/Summary#1')
]
optional_output_locations = []
stream_base = os.path.join('/Event', output_prefix)
if pack:
packers = []
# Pack the PersistReco locations that we've cloned
# Pack the PersistReco locations for this stream
prpacking_inputs = {
k: v.replace('/Event', stream_base)
for k, v in prpacking.outputs.items()
k: v.replace(tes_root, stream_base)
for k, v in packing.outputs.items()
}
stream_base += '/Turbo'
# /Event/<stream name>/Turbo
stream_base = os.path.join(stream_base, 'Turbo')
prpacking_stream = PersistRecoPacking(
self.getProp('DataType'),
inputs=prpacking_inputs
)
packers += prpacking_stream.packers(identifier=name)
packers += prpacking_stream.packers(identifier=namer(''))
for packer in packers:
# Put the packed containers under the stream
packer.OutputName = packer.OutputName.replace(
'/Event', stream_base
tes_root, stream_base
)
# Pack everything else that we've cloned
psandvs_packer = PackParticlesAndVertices(
name=namer('TurboPacker'),
namer('TurboPacker'),
InputStream=stream_base,
VetoedContainers=prpacking_stream.inputs.values()
)
packers.append(psandvs_packer)
# Packed locations already exist at the anonymous stream location
if output_prefix:
packers.append(psandvs_packer)
for packer in packers:
packer.OutputLevel = min(self.getProp('OutputLevel'),
......@@ -877,16 +894,21 @@ class Tesla(LHCbConfigurableUser):
# os.path.join(stream_base, 'pRec#*')
]
else:
# /Event/<stream name>/Turbo
stream_base = os.path.join(stream_base, 'Turbo')
# Save everything under the stream prefix
optional_output_locations += [
os.path.join(stream_base, '#*')
]
fname = '{0}{1}{2}'.format(
self.getProp('outputPrefix'),
name,
self.getProp('outputSuffix')
)
if output_prefix:
fname = '{0}{1}{2}'.format(
self.getProp('outputPrefix'),
output_prefix,
self.getProp('outputSuffix')
)
else:
fname = self.getProp('outputFile')
writer = OutputStream(namer(self.writerName))
writer.ItemList = required_output_locations
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment