From 09c94ed754d44bb29a6aa16ea072d90684d37677 Mon Sep 17 00:00:00 2001 From: sesen <sevda.esen@cern.ch> Date: Thu, 9 Nov 2023 13:26:46 +0100 Subject: [PATCH] fix stream(s) updating, other small fixes --- .../tests/options/allen_fwd_tracks_tos.py | 4 +- .../Sprucing_2022_1_production.py | 5 +- .../bandwidth/hlt2_bandwidth_input_2023.py | 1 - .../hlt2_bandwidth_production_streams.py | 7 +-- Hlt/Moore/python/Moore/config.py | 6 +-- .../python/Moore/persistence/__init__.py | 7 --- Hlt/Moore/python/Moore/reports_writers.py | 2 +- Hlt/Moore/python/Moore/stream_writers.py | 49 ++++++++++++++----- Hlt/Moore/python/Moore/streams.py | 25 +++++++++- Hlt/Moore/python/Moore/streams_hlt2.py | 0 Hlt/Moore/python/Moore/streams_spruce.py | 10 ---- 11 files changed, 72 insertions(+), 44 deletions(-) delete mode 100644 Hlt/Moore/python/Moore/streams_hlt2.py delete mode 100644 Hlt/Moore/python/Moore/streams_spruce.py diff --git a/Hlt/Hlt1Conf/tests/options/allen_fwd_tracks_tos.py b/Hlt/Hlt1Conf/tests/options/allen_fwd_tracks_tos.py index fea37ccc57d..80aa91d9e04 100644 --- a/Hlt/Hlt1Conf/tests/options/allen_fwd_tracks_tos.py +++ b/Hlt/Hlt1Conf/tests/options/allen_fwd_tracks_tos.py @@ -9,13 +9,13 @@ # or submit itself to any jurisdiction. # ############################################################################### from Moore import options -from Moore.config import get_allen_hlt1_decision_ids, allen_detectors +from Moore.config import get_allen_hlt1_decision_ids from RecoConf.hlt1_allen import ( make_allen_sel_reports, make_allen_forward_tracks, call_allen_decision_logger, make_allen_decision, allen_gaudi_config) from PyConf.application import configure_input, configure from PyConf.control_flow import CompositeNode, NodeLogic -from Allen.config import setup_allen_non_event_data_service +from Allen.config import setup_allen_non_event_data_service, allen_detectors def make_tos_filter(line_name): diff --git a/Hlt/Hlt2Conf/python/Hlt2Conf/sprucing_settings/Sprucing_2022_1_production.py b/Hlt/Hlt2Conf/python/Hlt2Conf/sprucing_settings/Sprucing_2022_1_production.py index c4874805e86..0e2f4364acb 100644 --- a/Hlt/Hlt2Conf/python/Hlt2Conf/sprucing_settings/Sprucing_2022_1_production.py +++ b/Hlt/Hlt2Conf/python/Hlt2Conf/sprucing_settings/Sprucing_2022_1_production.py @@ -149,8 +149,7 @@ def turcal_spruce_production(options: Options): PassLine( name="Pass" + wg, hlt2_filter_code=turcallinedict[wg]) ], - detectors=DETECTORS) for wg in turcallinedict - if "raw" not in wg + detectors=[]) for wg in turcallinedict if "raw" not in wg ] streams += [ @@ -162,7 +161,7 @@ def turcal_spruce_production(options: Options): hlt2_filter_code=turcallinedict[wg], prescale=0.1) ], - detectors=[]) for wg in turcallinedict if "raw" in wg + detectors=DETECTORS) for wg in turcallinedict if "raw" in wg ] return Streams(streams=streams) diff --git a/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_input_2023.py b/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_input_2023.py index eb7d287bb03..753141f2441 100644 --- a/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_input_2023.py +++ b/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_input_2023.py @@ -32,7 +32,6 @@ options.input_raw_format = 0.5 options.input_files = [ 'mdf:root://eoslhcb.cern.ch//eos/lhcb/wg/rta/WP3/bandwidth_division/Beam6800GeV-expected-2023-VeloClosed10mm-MagDown-Nu1.4-25ns/hlt1_filtered/30000000/hlt1_filtered_pp_tuned_june13_0.mdf' ] - make_TrackBestTrackCreator_tracks.global_bind(max_chi2ndof=4.2) make_PrKalmanFilter_Velo_tracks.global_bind(max_chi2ndof=4.2) make_PrKalmanFilter_noUT_tracks.global_bind(max_chi2ndof=4.2) diff --git a/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_production_streams.py b/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_production_streams.py index e91735ef85f..a0bab539291 100644 --- a/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_production_streams.py +++ b/Hlt/Hlt2Conf/tests/options/bandwidth/hlt2_bandwidth_production_streams.py @@ -82,15 +82,16 @@ def make_module_streams(): if len(filtered) != len(stream.lines): print("Manually removed {} line from {} stream".format( line, stream.name)) - stream.lines = filtered - # Write out stream configuration to JSON file for use later in the test + stream.update(filtered) + streams.update() + + #Write out stream configuration to JSON file for use later in the test #with open( # fname_helper.stream_config_json_path(stream_config="production"), # 'w') as f: # json.dump({stream.name: stream.lines # for stream in streams.streams}, f) - #streams.print() return streams diff --git a/Hlt/Moore/python/Moore/config.py b/Hlt/Moore/python/Moore/config.py index 309fe0729b7..6f0476ae7a8 100644 --- a/Hlt/Moore/python/Moore/config.py +++ b/Hlt/Moore/python/Moore/config.py @@ -250,8 +250,8 @@ def run_moore(options, standard Moore control flow and builds the the data flow (by calling the global lines maker). - If ``make_streams`` returns a list a default stream definition is created - from it as ``{"default": lines}``. + If ``make_streams`` returns a list of lines, a default stream definition is created + from it named "default"``. Args: options (ApplicationOptions): holder of application options @@ -266,7 +266,7 @@ def run_moore(options, config = configure_input(options) # Then create the data (and control) flow for all streams. streams = (make_streams or options.lines_maker)() - # Create default streams definition if make_streams returned a list + # Create default streams definition if make_streams returned a list of lines if isinstance(streams, list): streams = make_default_streams(streams) diff --git a/Hlt/Moore/python/Moore/persistence/__init__.py b/Hlt/Moore/python/Moore/persistence/__init__.py index b0ada057f32..71ed829a04d 100644 --- a/Hlt/Moore/python/Moore/persistence/__init__.py +++ b/Hlt/Moore/python/Moore/persistence/__init__.py @@ -336,10 +336,3 @@ def persist_line_outputs( # are not serialised to avoid complication and to save space in the clone_mc=False case return control_flow_nodes, packer_mc_locations, bank_writers - - -sim_veto_list = [ - '/Event/pSim', '/Event/Calo', '/Event/Unstripped', '/Event/HC', - '/Event/Tracker', '/Event/PersistReco', '/Event/Velo', '/Event/Muon', - '/Event/Rich', '/Event/Trigger', '/Event/pRec', '/Event/Rec', '/Event/DAQ' -] diff --git a/Hlt/Moore/python/Moore/reports_writers.py b/Hlt/Moore/python/Moore/reports_writers.py index c2a116acaf9..15e815aa4f6 100644 --- a/Hlt/Moore/python/Moore/reports_writers.py +++ b/Hlt/Moore/python/Moore/reports_writers.py @@ -138,7 +138,7 @@ def report_writers_nodes(options, streams, process): elif process == "hlt2": # when writing dst, take care of the existing banks in same raw event - if options.output_type == ROOT_KEY: + if options.output_type == ROOT_KEY and options.input_raw_format < 4.0: if options.input_process == 'Hlt2': # Running with Hlt2 over data already processed with Hlt2, # Hlt2DecReports, RoutingBits and DstData diff --git a/Hlt/Moore/python/Moore/stream_writers.py b/Hlt/Moore/python/Moore/stream_writers.py index 67102e9589b..39435403c92 100644 --- a/Hlt/Moore/python/Moore/stream_writers.py +++ b/Hlt/Moore/python/Moore/stream_writers.py @@ -23,9 +23,7 @@ from PyConf.application import (MDF_KEY, ROOT_KEY, mdf_writer, online_writer, root_writer) from PyConf.utilities import ConfigurationError -from .persistence import sim_veto_list -#from .lines import Hlt2LuminosityLine -from .lines import HltLine, Hlt2Line, Hlt2LuminosityLine, SpruceLine # noqa: forward import +from .lines import HltLine, Hlt2Line, Hlt2LuminosityLine, SpruceLine from .config_tools import unique from .streams import bank_types_for_detectors @@ -42,6 +40,13 @@ log = logging.getLogger(__name__) ROOT_RAW_EVENT_LOCATION = '/Event/DAQ/RawEvent' +sim_veto_list = [ + '/Event/pSim', '/Event/Calo', '/Event/Unstripped', '/Event/HC', + '/Event/Tracker', '/Event/PersistReco', '/Event/Velo', '/Event/Muon', + '/Event/Rich', '/Event/Trigger', '/Event/pRec', '/Event/Rec', + '/Event/DAQ/RawEvent' +] + @configurable def stream_writer(options, @@ -91,7 +96,8 @@ def stream_writer(options, # only hlt2 or sprucing lines can request additional detector raw banks if process == "hlt1": - consolidate_views, veto_list = write_hlt1(stream, hlt_raw_banks) + consolidate_views, veto_list = write_hlt1(stream, hlt_raw_banks, + output_type) elif process == "hlt2": consolidate_views, veto_list = write_hlt2(stream, hlt_raw_banks, dec_reports, output_type) @@ -161,17 +167,36 @@ def stream_writer(options, return writer_setup, writers -def write_hlt1(stream, hlt_raw_banks): - required_raw_banks = stream.raw_banks +def write_hlt1(stream, hlt_raw_banks, output_type): + + already_in_dst_banks = [] + if output_type == "ROOT": + # For hlt2, when writing dst we might be overwriting the input event + # In this case, check if any of the raw banks are from DAQ/RawEvent + for l in stream.raw_banks: + if "UnpackRawEvent" in l.producer.name and l.producer.inputs[ + "RawEventLocation"].location == ROOT_RAW_EVENT_LOCATION: + already_in_dst_banks.append(l) + + required_raw_banks = [ + rb for rb in stream.raw_banks if rb not in already_in_dst_banks + ] for k, v in hlt_raw_banks.items(): required_raw_banks += v if isinstance(v, list) else v[stream.name] + veto_list = [] + # For file with raw_event_format > 4.0, raw banks are in different raw event + # requested raw banks are already selected, so ignore these events when writing + for l in required_raw_banks: + if "UnpackRawEvent" in l.producer.name: + veto_list.append(l.producer.inputs["RawEventLocation"].location) + consolidate_views = CombineRawBankViewsToRawEvent( name="CombineRawBanks_for_" + stream.name, RawBankViews=required_raw_banks, outputs={'RawEvent': force_location(f"/Event/{stream.name}/RawEvent")}) - return consolidate_views, None + return consolidate_views, veto_list def write_pass(stream, hlt_raw_banks, dec_reports, input_manifest_file, @@ -186,12 +211,10 @@ def write_pass(stream, hlt_raw_banks, dec_reports, input_manifest_file, for k, v in hlt_raw_banks.items(): required_raw_banks += v if isinstance(v, list) else v[stream.name] - consolidate_views = SelectiveCombineRawBankViewsToRawEvent( - name="SelectiveCombineRawBanks_for_" + stream.name, - DecReports=dec_reports, - MapLinesRawBanks={}, - RequiredRawBanks=required_raw_banks, - SelectiveRawBanks=[], + # Use SelectiveCombineRawBankViewsToRawEvent once proper streaming is implemented + consolidate_views = CombineRawBankViewsToRawEvent( + name="CombineRawBanks_for_" + stream.name, + RawBankViews=required_raw_banks, outputs={'RawEvent': force_location(f"/Event/{stream.name}/RawEvent")}) return consolidate_views, sim_veto_list diff --git a/Hlt/Moore/python/Moore/streams.py b/Hlt/Moore/python/Moore/streams.py index d84a857063a..3c4af7571b4 100644 --- a/Hlt/Moore/python/Moore/streams.py +++ b/Hlt/Moore/python/Moore/streams.py @@ -101,6 +101,13 @@ class Stream(object): ] return sorted(lumi_lines, key=lambda l: l.name) + def update(self, new_lines): + self.lines = new_lines + self.physics_lines = [l for l in self.lines if l.produces_output] + self.lumi_lines = [ + l for l in self.lines if isinstance(l, Hlt2LuminosityLine) + ] + class Streams(object): """Moore streams for a given process @@ -121,7 +128,6 @@ class Streams(object): """Function to return flat list of DecisionLines from streams""" lines = [] for stream in self.streams: - print(stream.lines) lines += stream.lines return unique(lines) @@ -139,6 +145,23 @@ class Streams(object): lines += stream.lumi_lines return unique(lines) + def update(self): + """Function to return flat list of DecisionLines from streams""" + lines = [] + for stream in self.streams: + lines += stream.lines + self.lines = unique(lines) + + physics_lines = [] + for stream in self.streams: + physics_lines += stream.physics_lines + self.physics_lines = unique(physics_lines) + + lumi_lines = [] + for stream in self.streams: + lumi_lines += stream.lumi_lines + self.lumi_lines = unique(lumi_lines) + def print(self): for stream in self.streams: print("stream: ", stream.name, " routing_bit: ", diff --git a/Hlt/Moore/python/Moore/streams_hlt2.py b/Hlt/Moore/python/Moore/streams_hlt2.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/Hlt/Moore/python/Moore/streams_spruce.py b/Hlt/Moore/python/Moore/streams_spruce.py deleted file mode 100644 index 0a2d8e2c89c..00000000000 --- a/Hlt/Moore/python/Moore/streams_spruce.py +++ /dev/null @@ -1,10 +0,0 @@ -############################################################################### -# (c) Copyright 2021 CERN for the benefit of the LHCb Collaboration # -# # -# This software is distributed under the terms of the GNU General Public # -# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". # -# # -# In applying this licence, CERN does not waive the privileges and immunities # -# granted to it by virtue of its status as an Intergovernmental Organization # -# or submit itself to any jurisdiction. # -############################################################################### -- GitLab