Skip to content
Snippets Groups Projects

make packing configuration simpler

Merged Sevda Esen requested to merge sevda-simpler-packing into master
Compare and
5 files
+ 122
123
Compare changes
  • Side-by-side
  • Inline
Files
5
@@ -11,23 +11,22 @@
"""Configuration for persisting HLT2 objects in output ROOT/MDF files."""
from __future__ import absolute_import
import itertools
import logging, os
import logging, os, json
from pprint import pformat
from Configurables import HltLinePersistenceSvc
#from RecoConf.data_from_file import unpacked_mc_locations
from PyConf import configurable
from PyConf.control_flow import CompositeNode, NodeLogic
from PyConf.components import get_output
from PyConf.location_prefix import prefix, unpacked_prefix, packed_prefix
from PyConf.location_prefix import prefix
from PyConf.application import register_encoding_dictionary
from GaudiConf.reading import type_map
from GaudiConf.PersistRecoConf import PersistRecoPacking
from .cloning import clone_line_outputs
from .packing import pack_stream_objects, pack_stream_mc, pack_stream_mc_locations
from .persistreco import persistreco_line_outputs, persistreco_line_outputs_packed
from .serialisation import serialise_packed_containers
from .packing import pack_stream_objects, pack_stream_mc, pack_stream_mc_locations, packers_map
from .persistreco import persistreco_line_outputs
from .truth_matching import truth_match_lines, CHARGED_PP2MC_LOC, NEUTRAL_PP2MC_LOC
log = logging.getLogger(__name__)
@@ -103,6 +102,25 @@ def get_type(dh):
return None
def get_packed_locations(inputs, stream):
packed_dhs = []
types = type_map()
k = list(types.keys())
v = list(types.values())
for key, locs in inputs.items():
for i in locs:
t = i.type
if i.type == "unknown_t":
t = k[v.index(key)]
packed_dhs += [(prefix(i.location, stream), t)]
packed_dhs = list(dict.fromkeys(packed_dhs))
return {'PackedLocations': packed_dhs}
@configurable
def persist_line_outputs(
lines,
@@ -110,6 +128,7 @@ def persist_line_outputs(
dec_reports,
associate_mc,
source_id,
output_manifest_file,
stream=DEFAULT_OUTPUT_PREFIX, #this is where everything goes
reco_stream=DEFAULT_OUTPUT_PREFIX, #this is where reco objects come from
clone_mc=True):
@@ -153,8 +172,9 @@ def persist_line_outputs(
if log.isEnabledFor(logging.DEBUG):
log.debug('line_locations: ' + pformat(persistence_svc.Locations))
# Make a dictionary for all known object types with empty values
inputs = PersistRecoPacking().dictionary()
# Make a dictinary for all known object types with emty values
p_map = packers_map()
inputs = {t: [] for t in p_map.keys()}
#add line outputs to fill the dictionary
inputs = _referenced_inputs(lines, inputs)
@@ -172,6 +192,14 @@ def persist_line_outputs(
for p in protoparticle_relations:
inputs["PP2MCPRelations"] += [p]
if output_manifest_file:
with open(output_manifest_file, 'w') as f:
json.dump(
get_packed_locations(inputs, stream),
f,
indent=4,
sort_keys=True)
locify = lambda i: i.location if hasattr(i, 'location') else i
inputs = {t: [locify(i) for i in dhs] for t, dhs in inputs.items()}
@@ -198,30 +226,8 @@ def persist_line_outputs(
pformat(output_cloner_locations))
cf.append(output_cloner_cf)
#Make a dictionary for output packer locations
#For line outputs, "stream+/p" added to input locations
#For reco objects, there are pre-defined output locations
#This is to be able to find reco objects regardless of their producer
prdict_packed = persistreco_line_outputs_packed(stream, reco_stream)
outputs = {}
for key, value in inputs.items():
outputs[key] = []
for v in value:
if v in prdict_packed.keys():
outputs[key] += [prdict_packed[v]] #reco
else:
outputs[key] += [packed_prefix(v, stream)] #line
prpacking = PersistRecoPacking(
stream=stream,
unpacked=inputs,
packed=outputs,
data_type=data_type,
)
### TODO: reduce the set of encoding keys to the smallest possible one...
locations = set([ unpacked_prefix(i, stream) for i in prpacking.packedLocations() ]) | \
set([ i for i in prpacking.unpackedLocations()]) | \
locations = set([ i for ilist in inputs.values() for i in ilist]) | \
set([ i.location for i in itertools.chain( *_referenced_locations(lines).values()) ])
if clone_mc:
@@ -233,10 +239,6 @@ def persist_line_outputs(
register_encoding_dictionary("PackedObjectLocations",
sorted(locations)), 16)
packer_cf, packer_handles = pack_stream_objects(stream, prpacking,
encoding_key)
cf.append(packer_cf)
packer_mc_locations = []
if clone_mc:
@@ -244,15 +246,17 @@ def persist_line_outputs(
cf.append(mc_packer_cf)
if log.isEnabledFor(logging.DEBUG):
log.debug('packer_handles: ' + pformat(packer_handles))
log.debug('packer_locations: ' + pformat(inputs.values()))
log.debug('packer_mc_locations: ' + pformat(packer_mc_locations))
serialisation_cf, output_raw_data = serialise_packed_containers(
packer_handles, source_id)
packers_cf, serialisation_cf, output_raw_data = pack_stream_objects(
stream, inputs, encoding_key, source_id)
cf.append(packers_cf)
cf.append(serialisation_cf)
if log.isEnabledFor(logging.DEBUG):
log.debug('output_raw_data: %s', pformat(output_raw_data))
cf.append(serialisation_cf)
control_flow_node = CompositeNode(
"hlt2_line_output_persistence",
Loading