Skip to content
Snippets Groups Projects
Commit f1293188 authored by Sevda Esen's avatar Sevda Esen Committed by Christopher Rob Jones
Browse files

Allow extra_outputs to be any persistable type

parent e70bc85c
No related branches found
No related tags found
5 merge requests!4232Run5: rebase, restructure run5 code, add back examples,!3700Synchronize master branch with 2024-patches,!3694Draft: Sprucing line for Dark Higgs to dimuon with T-Tracks,!3689Draft: Enlarging mass windows for B2cc lines,!3323Allow extra_outputs to be any persistable type
......@@ -32,12 +32,13 @@ def prefilters():
def b2jpsik_with_flavourtagging(b2jpsik, name):
all_tags = run2_all_taggers(b2jpsik)
return Hlt2Line(
name=name,
algs=upfront_reconstruction() + prefilters() + [b2jpsik],
prescale=1,
tagging_particles=True,
extra_outputs=[("FlavourTags", run2_all_taggers(b2jpsik))])
extra_outputs=[("FlavourTags", all_tags.OutputFlavourTags)])
def default_b2jpsik_line_with_flavourtagging():
......
......@@ -68,7 +68,8 @@ def default_bs2jpsiphi_line_with_flavourtagging():
ssKaons = noUT_sameside_tagging_kaons()
# ssKaons = make_sameside_tagging_kaons()
ssKaonTagging = Run2SSKaonTagger(
BCandidates=bs2jpsiphi, TaggingKaons=ssKaons, PrimaryVertices=pvs)
BCandidates=bs2jpsiphi, TaggingKaons=ssKaons,
PrimaryVertices=pvs).OutputFlavourTags
return Hlt2Line(
name="Hlt2Bs2JpsiPhi",
......@@ -91,7 +92,8 @@ def modified_bs2jpsiphi_line_with_flavourtagging():
ssKaons = noUT_sameside_tagging_kaons()
# ssKaons = make_sameside_tagging_kaons()
ssKaonTagging = Run2SSKaonTagger(
BCandidates=bs2jpsiphi, TaggingKaons=ssKaons, PrimaryVertices=pvs)
BCandidates=bs2jpsiphi, TaggingKaons=ssKaons,
PrimaryVertices=pvs).OutputFlavourTags
return Hlt2Line(
name="Hlt2B2JpsiKModified",
......
......@@ -121,9 +121,11 @@ def dielectron_sp_prompt_line(name='Hlt2QEE_DiElectronPrompt_PersistPhotons',
name=name,
algs=upfront_reconstruction() + [require_pvs(pvs), dielectrons_prompt],
persistreco=persistreco,
extra_outputs=[('DiElectronPrompt_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations))],
extra_outputs=[
('DiElectronPrompt_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation)
],
prescale=prescale,
monitoring_variables=_default_monitoring_variables,
hlt1_filter_code=["Hlt1DiElectronLowMass_massSlice.*_promptDecision"])
......@@ -156,10 +158,12 @@ def dielectron_sp_prompt_line_full(
name=name,
algs=upfront_reconstruction() + [require_pvs(pvs), dielectrons_prompt],
persistreco=persistreco,
extra_outputs=[('DiElectronPrompt_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations)),
('ChargedPions', extra_pions)],
extra_outputs=[
('DiElectronPrompt_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation),
('ChargedPions', extra_pions)
],
prescale=prescale,
monitoring_variables=_default_monitoring_variables,
hlt1_filter_code=["Hlt1DiElectronLowMass_massSlice.*_promptDecision"])
......@@ -191,9 +195,11 @@ def dielectron_sp_displaced_line(
algs=upfront_reconstruction() +
[require_pvs(pvs), dielectrons_displaced],
persistreco=persistreco,
extra_outputs=[('DiElectronDisplaced_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations))],
extra_outputs=[
('DiElectronDisplaced_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation)
],
prescale=prescale,
monitoring_variables=_default_monitoring_variables,
hlt1_filter_code=[
......@@ -229,10 +235,12 @@ def dielectron_sp_displaced_line_full(
algs=upfront_reconstruction() +
[require_pvs(pvs), dielectrons_displaced],
persistreco=persistreco,
extra_outputs=[('DiElectronDisplaced_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations)),
('ChargedPions', extra_pions)],
extra_outputs=[
('DiElectronDisplaced_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation),
('ChargedPions', extra_pions)
],
prescale=prescale,
monitoring_variables=_default_monitoring_variables,
hlt1_filter_code=[
......@@ -264,9 +272,11 @@ def dielectron_sp_prompt_same_sign_line(
name=name,
algs=upfront_reconstruction() + [require_pvs(pvs), dielectrons_prompt],
persistreco=persistreco,
extra_outputs=[('DiElectronPrompt_ss_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations))],
extra_outputs=[
('DiElectronPrompt_ss_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation)
],
prescale=prescale,
monitoring_variables=_default_monitoring_variables,
hlt1_filter_code=[
......@@ -298,9 +308,11 @@ def dielectron_sp_displaced_same_sign_line(
algs=upfront_reconstruction() +
[require_pvs(pvs), dielectrons_displaced],
persistreco=persistreco,
extra_outputs=[('DiElectronDisplaced_ss_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations))],
extra_outputs=[
('DiElectronDisplaced_ss_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation)
],
prescale=prescale,
monitoring_variables=_default_monitoring_variables,
hlt1_filter_code=[
......
......@@ -841,10 +841,12 @@ def dielectron_sp_prompt_sprucing_line(
return SpruceLine(
name=name,
algs=upfront_reconstruction() + [require_pvs(pvs), dielectrons_prompt],
extra_outputs=[('DiElectronPrompt_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations)),
('ChargedPions', charged_pions)],
extra_outputs=[
('DiElectronPrompt_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation),
('ChargedPions', charged_pions)
],
prescale=prescale,
hlt2_filter_code=_hlt2_decision_regex(dielectron_sp_prompt_line_full))
......@@ -873,10 +875,12 @@ def dielectron_sp_displaced_sprucing_line(
name=name,
algs=upfront_reconstruction() +
[require_pvs(pvs), dielectrons_displaced],
extra_outputs=[('DiElectronDisplaced_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations)),
('ChargedPions', charged_pions)],
extra_outputs=[
('DiElectronDisplaced_Photons',
SelectionFromWeightedRelationTable(
InputRelations=photons_table.OutputRelations).OutputLocation),
('ChargedPions', charged_pions)
],
prescale=prescale,
hlt2_filter_code=_hlt2_decision_regex(
dielectron_sp_displaced_line_full))
......@@ -24,8 +24,9 @@ from GaudiKernel.SystemOfUnits import picosecond
from PyConf.Algorithms import ParticleUnbiasedPVAdder
from SelAlgorithms.monitoring import histogram_1d, monitor
from Moore.lines import Hlt2Line
from Moore.persistence.persistreco import persistreco_line_outputs
from Moore.config import register_line_builder
from RecoConf.reconstruction_objects import make_pvs, make_extended_pvs, upfront_reconstruction
from RecoConf.reconstruction_objects import make_pvs, make_extended_pvs, upfront_reconstruction, reconstruction
from RecoConf.event_filters import require_pvs, require_gec
from Hlt2Conf.algorithms_thor import ParticleFilter, ParticleCombiner
......@@ -664,3 +665,29 @@ def Bds_PhiPhi_line(name='Hlt2Test_BdsToPhiPhi', prescale=1):
name=name,
prescale=prescale,
algs=upfront_reconstruction() + [require_pvs(pvs), phi, BdsToPhiPhi])
#From Hlt2Conf/lines/bnoc/hlt2_bnoc.py
@register_line_builder(hlt2_test_lines)
def Bds_PhiPhi_line_with_SP_reco(name='Hlt2Test_BdsToPhiPhi_with_SP_reco',
prescale=1):
pvs = make_pvs()
phi = _make_phi()
BdsToPhiPhi = _make_BdsToPhiPhi(phi, pvs)
# Selective persistreco as line output
# first get the reco objects
reco = reconstruction()
# get the persistreco locations for these objects which is version dependent
# listed in LHCb/PyConf/python/PyConf/persistency_locations.py
pr_objs = persistreco_line_outputs(reco)
return Hlt2Line(
name=name,
prescale=prescale,
algs=upfront_reconstruction() + [require_pvs(pvs), phi, BdsToPhiPhi],
#When selecting persistreco location as extra_output, don't give a location component
#these will be saved in same location for all lines if ""
#if a location is provided, a copy will be made at the selected location
extra_outputs=[("", pr_objs["CaloElectrons"]),
("", pr_objs["LongProtos"])])
......@@ -39,7 +39,7 @@ LHCbApp(
IOExtension().inputFiles([args.input], clear=True)
stream = "/Event/HLT2"
flavour_tags = ['FlavourTags']
flavour_tags = ['FlavourTags/FlavourTags']
locations = [
'/Event/HLT2/HltDecReports', '/Event/HLT2/Hlt2B2JpsiK/Particles',
......
......@@ -22,6 +22,7 @@ from PyConf.application import make_odin
from PyConf.components import Algorithm
from PyConf.reading import get_decreports
from PyConf.control_flow import CompositeNode, NodeLogic
from PyConf.dataflow import DataHandle
from Moore.selreports import (
UnconvertableAlgorithmError,
......@@ -29,6 +30,8 @@ from Moore.selreports import (
)
from .monitoring import monitoring, run_default_monitoring
from Moore.persistence.persistreco import persistreco_line_outputs
from RecoConf.reconstruction_objects import reconstruction
from .optimization import default_controlflow
......@@ -370,10 +373,6 @@ class Hlt2Line(DecisionLine):
`node` attribute) will ensure that the reconstruction algorithms
that produce the reconstruction objects will run when the line
fires
locations_to_move (dict of strings): Dictionary of TES paths, which
the persistence code will need to move (keys) and the paths they
should be moved to (values). This is currently on filled in case
a FlavourTag object is found in extra_outputs.
hlt1_filter_code (list(str)): If not empty, the string is used to define a
HLT1 filter that is prepended to the control flow defined by `algs`.
If empty, a default filter according to 'get_default_hlt1_filter_code_for_hlt2' is applied.
......@@ -450,7 +449,7 @@ class Hlt2Line(DecisionLine):
# this line made a positive decision
self.objects_to_persist = []
if self.produces_output:
output_node, self.objects_to_persist, self.locations_to_move = self._output_node(
output_node, self.objects_to_persist = self._output_node(
self.node, self.output_producer, self.extra_outputs,
self.persistreco, self.tagging_particles)
# Wrap the decision and output nodes
......@@ -473,11 +472,29 @@ class Hlt2Line(DecisionLine):
persistreco, tagging_particles):
# Build 2-tuple of (output path components, output producer) for main candidate of the HLT2 line
main_output = (output_producer, (decision_node.name, ))
# Build list of 2-tuple of (output path components, output producer) for the extra_outputs
additional_outputs = []
reco_objs = reconstruction()
prdict = persistreco_line_outputs(reco_objs)
for prefix, output in extra_outputs:
additional_outputs.append((output, (decision_node.name, prefix)))
algs, objects_to_persist, locations_to_move = Hlt2Line._line_outputs(
if not isinstance(output, DataHandle):
raise ValueError(
"extra_outputs for lines have to be DataHandles. {!r} for line {!r} is not a DataHandle. "
.format(output, decision_node.name))
if prefix:
additional_outputs.append((output, (decision_node.name,
prefix)))
elif output in prdict.values():
additional_outputs.append((output, None))
else:
raise ValueError(
"extra_output {!r} for line {!r} has not specified any location prefix and it's not a persitreco location. You should specify a location prefix to decide where it should be cloned to. "
.format(output, decision_node.name))
algs, objects_to_persist = Hlt2Line._line_outputs(
main_output, additional_outputs, persistreco, tagging_particles)
# Algorithms already in the decision CF don't need to be in the
# output CF. Although one can imagine a scheduler that allows this
......@@ -496,10 +513,9 @@ class Hlt2Line(DecisionLine):
else:
output_node = None
return output_node, objects_to_persist, locations_to_move
return output_node, objects_to_persist
@staticmethod
#def _line_outputs(outputs_locations, persistreco):
def _line_outputs(main_output_location, additional_output_locations,
persistreco, tagging_particles):
"""Return output-producing algorithms and their outputs.
......@@ -543,67 +559,59 @@ class Hlt2Line(DecisionLine):
from RecoConf.reconstruction_objects import reconstruction
from .persistence.particle_moving import (
CopyParticles, CopyFlavourTags, is_particle_producer,
particle_output, is_flavourtag_producer, flavourtag_output)
copy_to_final_location, is_particle_producer, particle_output)
from .persistence.persistreco import persistreco_line_outputs
reco = reconstruction()
pr_objs = list(persistreco_line_outputs(reco).values())
pvs = reco["PVs"]
rec_summary = reco["RecSummary"]
algs = []
objects_to_persist = []
locations_to_move = {}
main_output, main_location_components = main_output_location
main_producer = _producer(main_output)
if main_output is not None:
# Branch on the C++ type of the outputs of the producer
# For legacy LHCb::Particle outputs, we must navigate back to the
# producer to determine whether we should run the LHCb::Particle
# moving algorithm
if main_output is not None:
# Copy the main output of the HLT2 line and add the moved container
# the list of objects to be persisted.
if is_particle_producer(main_producer):
main_particles = particle_output(main_producer)
main_mover = CopyParticles(main_particles,
main_location_components)
main_mover = copy_to_final_location(main_particles,
main_location_components)
algs.append(main_mover)
objects_to_persist += main_mover.outputs.values()
else:
log.warning(
"Unsupported type {} for line output {} (for location {}); this will not be persisted"
"Unsupported type {} for line output {} for line {}; this will not be persisted. Only Particles are persisted as line outputs."
.format(main_output.type, main_output,
main_location_components))
# Loop over the extra outputs and copy them, then add the moved containers
# the list of objects to be persisted.
for output, location_components in additional_output_locations:
producer = _producer(output)
if is_particle_producer(producer):
particles = particle_output(producer)
mover = CopyParticles(particles, location_components)
algs.append(mover)
objects_to_persist += mover.outputs.values()
elif is_flavourtag_producer(producer):
if not is_particle_producer(main_producer):
log.warning(
"Trying to persist FlavourTag but the last algorithm in the line is not a particle producer. Therefor cannot persist FlavourTags objects."
)
if location_components:
if output in pr_objs:
log.error(
"{} is a persistreco location and should not be copied. Remove {} for extra_outputs {} for line {}; otherwise this will not be persisted."
.format(output, output, location_components,
main_location_components))
else:
flavourtags = flavourtag_output(producer)
mover = CopyFlavourTags(flavourtags, location_components)
algs.append(mover)
objects_to_persist += mover.outputs.values()
mover = copy_to_final_location(output, location_components)
if mover:
algs.append(mover)
objects_to_persist += mover.outputs.values()
else:
log.warning(
"Unsupported type {} for extra_outputs {} for line {}; this will not be persisted."
.format(output.type, output,
main_location_components))
else:
log.warning(
"Unsupported type {} for line output {} (for location {}); this will not be persisted"
.format(output.type, output, location_components))
algs.append(output.producer)
objects_to_persist += [output]
if persistreco:
pr_objs = list(persistreco_line_outputs(reco).values())
algs += pr_objs
objects_to_persist += pr_objs
......@@ -621,7 +629,7 @@ class Hlt2Line(DecisionLine):
if rec_summary not in algs: algs.append(rec_summary)
objects_to_persist.append(rec_summary)
return algs, objects_to_persist, locations_to_move
return algs, objects_to_persist
class SpruceLine(Hlt2Line):
......
......@@ -16,7 +16,7 @@ only the information of HLT2 candidates needs to be extracted from the HLT2 DstD
This module provides the necessary functions.
"""
from .particle_moving import CopyParticles
from .particle_moving import copy_to_final_location
from PyConf.tonic import configurable
from PyConf.reading import get_particles
......@@ -37,7 +37,8 @@ def make_reduced_hlt2_candidates():
hlt2_particles = get_particles(
f"/Event/HLT2/{line}/Particles",
WriteEmptyContainerIfBufferNotKnown=True)
mover = CopyParticles(hlt2_particles, ["HLT2", "TISTOS", f"{line}"])
mover = copy_to_final_location(hlt2_particles,
["HLT2", "TISTOS", f"{line}"])
movers += [mover]
list_of_copied_info += mover.outputs.values()
......
......@@ -11,9 +11,8 @@
import logging
import os
from functools import lru_cache
from GaudiConf.reading import type_map
from PyConf.Algorithms import CopyParticles as CopyParticlesAlg
from PyConf.Algorithms import CopyFlavourTagsWithParticlePointers
from PyConf.components import force_location
#: Final component of a TES location holding Particle objects
......@@ -23,8 +22,6 @@ PARTICLE_CONTAINER_TYPES = {
"KeyedContainer<LHCb::Particle,Containers::KeyedObjectManager<Containers::hashmap> >",
"SharedObjectsContainer<LHCb::Particle>",
}
FLAVOURTAG_CONTAINER_TYPE = "KeyedContainer<LHCb::FlavourTag,Containers::KeyedObjectManager<Containers::hashmap> >"
# Set of locations we've forced to specific values; must keep track of these to
# ensure we never force the same location twice
_FORCED_LOCATIONS = set()
......@@ -32,55 +29,66 @@ _FORCED_LOCATIONS = set()
log = logging.getLogger(__name__)
def CopyParticles(particles, path_components):
"""Return an algorithm that copies Particle objects.
def __get_type(dh):
types = type_map()
if dh.type in types.keys():
return types[dh.type].replace("Selection", "")
Args:
particles (DataHandle): LHCb::Particle container to be copied.
path_components (list of str): Definition of TES path, relative to
'/Event', under which to store copied Particle objects.
"""
# Path components should not contain slashes
assert all('/' not in pc for pc in path_components)
# We will add the suffix in this method to ensure consistency
assert path_components[-1] != PARTICLES_LOCATION_SUFFIX
return None
base = os.path.join("/Event", *path_components)
output_loc_particles = os.path.join(base, PARTICLES_LOCATION_SUFFIX)
# Keep track of forced output locations to make sure we don't write to the
# same place twice
assert output_loc_particles not in _FORCED_LOCATIONS, "Duplicate CopyParticles instantiation"
_FORCED_LOCATIONS.add(output_loc_particles)
return CopyParticlesAlg(
InputParticles=particles,
outputs=dict(
# Force output locations to match the pattern of those created by
# DVCommonBase algorithms
OutputParticles=force_location(output_loc_particles)))
def cloning_map():
from PyConf.Algorithms import (
CopyParticles, CopyFlavourTags, CopyTracks, CopyCaloClusters,
CopyCaloHypos, CopyProtoParticles, CopyRecVertices, CopyVertices,
CopyP2InfoRelations, CopyP2IntRelations, CopyP2MCPRelations,
CopyP2VRelations, CopyPP2MCPRelations)
return {
"Particles": CopyParticles,
"FlavourTags": CopyFlavourTags,
"Tracks": CopyTracks,
"CaloHypos": CopyCaloHypos,
"CaloClusters": CopyCaloClusters,
"PVs": CopyRecVertices,
"ProtoParticles": CopyProtoParticles,
"Vertices": CopyVertices,
"P2VRelations": CopyP2VRelations,
"P2MCPRelations": CopyP2MCPRelations,
"P2IntRelations": CopyP2IntRelations,
"P2InfoRelations": CopyP2InfoRelations,
"PP2MCPRelations": CopyPP2MCPRelations,
}
def CopyFlavourTags(flavourtags, path_components):
def copy_to_final_location(input, path_components):
"""Return an algorithm that copies Particle objects.
Args:
flavourtags (DataHandle): LHCb::FlavourTags container to be copied.
input (DataHandle): input container to be copied.
path_components (list of str): Definition of TES path, relative to
'/Event', under which to store copied FlavourTags objects.
'/Event', under which to store copied objects.
"""
input_type = __get_type(input)
if not input_type:
log.warning(
"Unsupported type {} for line output {} (for location {}); this will not be persisted"
.format(input.type, input, path_components))
return None
# Path components should not contain slashes
assert all('/' not in pc for pc in path_components)
# We will add the suffix in this method to ensure consistency
base = os.path.join("/Event", *path_components)
output_loc = os.path.join(base, input_type)
output_loc_flavourtags = os.path.join("/Event", *path_components)
# Keep track of forced output locations to make sure we don't write to the
# same place twice
assert output_loc_flavourtags not in _FORCED_LOCATIONS, "Duplicate CopyFlavourTags instantiation"
_FORCED_LOCATIONS.add(output_loc_flavourtags)
assert output_loc not in _FORCED_LOCATIONS, "Duplicate Copy" + input_type + " instantiation"
_FORCED_LOCATIONS.add(output_loc)
return CopyFlavourTagsWithParticlePointers(
InputFlavourTags=flavourtags,
outputs=dict(OutputFlavourTags=force_location(output_loc_flavourtags)),
)
return cloning_map()[input_type](
Input=input, outputs=dict(Output=force_location(output_loc)))
@lru_cache(maxsize=None)
......@@ -145,40 +153,3 @@ def particle_output(producer):
# raise ValueError("Could not find unique LHCb::Particle outputs of {}".
# format(producer))
return particles.pop()
def is_flavourtag_producer(producer):
"""Return True if `producer` outputs an `LHCb::FlavourTag` container.
"""
# Check if the producer has outputs of the correct C++ type
if FLAVOURTAG_CONTAINER_TYPE in {
dh.type
for dh in producer.outputs.values()
}:
return True
try:
flavourtags = producer.FlavourTags
return flavourtags.type == "unknown_t"
except AttributeError:
return False
def flavourtag_output(producer):
"""Return the LHCb::FlavourTags output of the producer algorithm.
Raises:
ValueError: If `producer` does not output a LHCb::FlavourTags container.
"""
# First assume we have a functional algorithm and can introspect the
# output types, such that we just pick the output with the right type
flavourtags = [
dh for dh in producer.outputs.values()
if dh.type in FLAVOURTAG_CONTAINER_TYPE
]
if not flavourtags:
try:
flavourtags = [producer.FlavourTags]
except AttributeError:
flavourtags = []
return flavourtags.pop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment