############################################################################### # (c) Copyright 2000-2022 CERN for the benefit of the LHCb Collaboration # # # # This software is distributed under the terms of the GNU General Public # # Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". # # # # In applying this licence, CERN does not waive the privileges and immunities # # granted to it by virtue of its status as an Intergovernmental Organization # # or submit itself to any jurisdiction. # ############################################################################### import GaudiPython as GP from GaudiConf.reading import do_unpacking from Configurables import (ApplicationMgr, LHCbApp, IODataManager, EventSelector, LHCb__UnpackRawEvent, HltDecReportsDecoder, createODIN) from GaudiConf import IOHelper from PyConf.application import configured_ann_svc import json import argparse import csv from pprint import pprint from dataclasses import dataclass, field from typing import List, Dict from PRConfig.bandwidth_helpers import (FileNameHelper, parse_yaml, guess_wg, KNOWN_WORKING_GROUPS, KNOWN_STREAM_CONFIGS) ''' Run over MDF or DST file to compute: Per line (in form of single .csv table): 1. Inclusive retention 2. Inclusive rate 3. Exclusive retention 4. Exclusive rate 5. Average event size (all banks in particular stream) 6. Bandwidth 7. Average DstData bank size 8. DstData bandwidth Per stream (i.e. the whole file) 1. Inclusive retention 2. Inclusive rate 3. Average event size (all banks in particular stream) 4. Bandwidth 5. Average DstData bank size 6. DstData bandwidth ''' LHCb = GP.gbl.LHCb RAW_BANK_TYPES = [(i, LHCb.RawBank.typeName(i)) for i in range(LHCb.RawBank.LastType)] B_to_kB = 1e-3 MBps_to_GBps = 1e-3 # Mirroring Moore.streams - passthrough sprucing drops all banks except these PASSTHROUGH_BANK_TYPES = [(63, "VP"), (73, "VPRetinaCluster"), (66, "UT"), (68, "UTError"), (64, "FTCluster"), (9, "Rich"), (13, "Muon"), (50, "MuonError"), (84, "Plume"), (77, "Calo"), (78, "CaloError"), (21, "EcalPacked"), (22, "HcalPacked"), (16, "ODIN"), (17, "HltDecReports"), (54, 'HltSelReports'), (53, 'HltRoutingBits'), (60, "DstData")] @dataclass class LineBW: triggered: List[int] = field( default_factory=lambda: []) # list of ~event numbers raw: float = 0 # Stores whole event size for every event that fires this line (inclusive) dst: float = 0 # Just DstData raw bank size, as above @dataclass class FileBW: triggered: Dict[int, int] = field( default_factory=lambda: dict()) # {event_no: n_times_triggered} raw: float = 0 dst: float = 0 @dataclass class WGBW: n_triggered: int = 0 raw: float = 0 dst: float = 0 def rawbank_sizes(rawevent, lst): """Return (name, size) for each raw bank type.""" if rawevent: def size(i): return sum(bank.totalSize() for bank in rawevent.banks(i)) else: def size(i): return 0 return [(name, size(i)) for i, name in lst] def _compression_factor(process, filesize_path, file_totals, stream): with open(filesize_path, 'r') as ifile: fsizes = json.load(ifile)[stream] # Output of Hlt2 is uncompressed by default and then compressed by data movers # Output of Spruce is compresed by default and we estimate the uncompressed size by sum(all rawbanks in file) # We chose not to scale Hlt1. if process == 'hlt2': if fsizes['default'] > 0: return fsizes['compressed'] / fsizes['default'] else: # If the file is empty, we assume the compression factor is 1 if file_totals.triggered: raise RuntimeError( "File size is 0 but we appear to have some triggered events. Should not happen." ) return 1. elif process == 'spruce': return fsizes['default'] / file_totals.raw if file_totals.raw else 1. else: return 1. def rate_info_from_file(evt_max, lines, process, stream): '''Loop through the file to work out per-line and per-stream triggers and event sizes. Outputs used by functions below to compute rates and bandwidths. ''' decisions = [line + 'Decision' for line in list(lines)] totals = FileBW() # Per file (stream) information line_totals = {dec: LineBW() for dec in decisions} # per line # This block only used for hlt2 event_info = {wg: set() for wg in KNOWN_WORKING_GROUPS} event_info['Other'] = set() event_info['TotalInclusive'] = set() unknown_lines = set() # Loop over all events analysed = 0 while analysed < evt_max: analysed += 1 # Run an event appMgr.run(1) report = evt[f'/Event/{process.capitalize()}/DecReports'] # Sprucing has stream-dependent raw event locations - different to HLT1/2 rawevent = evt[ f'/Event/{stream if process == "spruce" else "DAQ"}/RawEvent'] header = evt["/Event/myODIN"] evtsize = sum( bank[1] for bank in rawbank_sizes(rawevent, RAW_BANK_TYPES)) dstsize = sum( bank[1] for bank in rawbank_sizes(rawevent, [(60, 'DstData')])) spruce_passthrough_evt_size = sum( bank[1] for bank in rawbank_sizes(rawevent, PASSTHROUGH_BANK_TYPES)) # Will quit running if there are no more events in the input file if report: # Info per file/stream totals.triggered.update({analysed: 0}) totals.raw += evtsize totals.dst += dstsize for dec in line_totals.keys(): # Info per line if report.decReport(dec) and report.decReport( dec).decision() == 1: totals.triggered[analysed] += 1 line_totals[dec].triggered.append(analysed) line_totals[dec].raw += evtsize line_totals[dec].dst += dstsize # Info per WG within the stream if process == "hlt2": info = (header.eventNumber(), spruce_passthrough_evt_size, dstsize) event_info['TotalInclusive'].add(info) wg = guess_wg(dec, process) event_info[wg].add(info) if wg == "Other": unknown_lines.add(dec) else: break if len(unknown_lines): print(f"Found {len(unknown_lines)} unknown lines. They are...") pprint(unknown_lines) # Remove trigger-less WGs, and transform out of tuple for easier & more-robust lookups later wg_event_numbers = {} wg_bws = {} if process == "hlt2": for wg, event_info_set in event_info.items(): event_numbers = [info[0] for info in event_info_set] if len(event_numbers) == 0: continue wg_event_numbers[wg] = event_numbers wg_bws[wg] = WGBW( n_triggered=len(event_numbers), raw=sum([info[1] for info in event_info_set]), dst=sum([info[2] for info in event_info_set])) return totals, line_totals, wg_bws, wg_event_numbers def _write_to_csv(output_path, rows): with open(output_path, 'w') as f: csv_out = csv.writer(f) for row in rows: csv_out.writerow(row) def rates_per_line(line_totals: dict[str, LineBW], file_totals: FileBW, input_rate: float, output_file_path: str, compression_factor: float, n_events: int) -> None: rows = [] if not n_events: print("No denominator for rates, writing empty .csv of rates.") _write_to_csv(output_file_path, []) return for line, bw_info in line_totals.items(): n_fired = len(bw_info.triggered) retention = n_fired / n_events rate = retention * input_rate # kHz, inclusive n_fired_excl = len([ evt for evt in bw_info.triggered if file_totals.triggered[evt] == 1 ]) excl_retention = n_fired_excl / n_events excl_rate = excl_retention * input_rate # kHz avg_evt_size = compression_factor * bw_info.raw * B_to_kB / n_fired if n_fired else 0 # kB avg_dst_size = compression_factor * bw_info.dst * B_to_kB / n_fired if n_fired else 0 # kB bw = rate * avg_evt_size * MBps_to_GBps # GB/s dst_bw = rate * avg_dst_size * MBps_to_GBps # GB/s rows.append([ line, retention * 100, rate, excl_retention * 100, excl_rate, avg_evt_size, bw, avg_dst_size, dst_bw ]) _write_to_csv(output_file_path, rows) return def rates_per_stream(file_totals: FileBW, stream: str, input_rate: float, output_file_path: str, compression_factor: float, n_events: int) -> None: if not n_events: print("No denominator for rates, writing empty .csv of rates.") _write_to_csv(output_file_path, []) return n_fired = len(file_totals.triggered) retention = n_fired / n_events rate = retention * input_rate avg_evt_size = compression_factor * file_totals.raw * B_to_kB / n_fired if n_fired else 0 avg_dst_size = compression_factor * file_totals.dst * B_to_kB / n_fired if n_fired else 0 bw = rate * avg_evt_size * MBps_to_GBps # GB/s dst_bw = rate * avg_dst_size * MBps_to_GBps # GB/s row = (stream, retention * 100, rate, avg_evt_size, bw, avg_dst_size, dst_bw) _write_to_csv(output_file_path, [row]) return def rates_per_wg_intra_stream(wg_bws: dict[str, WGBW], input_rate: float, output_file_path: str, compression_factor: float, n_events: int) -> None: # Get inclusive rates/bandwidths of each WG within this stream if not n_events: print("No denominator for rates, writing empty .csv of rates.") _write_to_csv(output_file_path, []) return wg_bw_infos = dict() for wg, bw_info in wg_bws.items(): rate = bw_info.n_triggered * input_rate / n_events # kHz wg_bw_infos[wg] = [ rate, compression_factor * input_rate * (bw_info.raw * B_to_kB) * MBps_to_GBps / n_events, # bandwidth, GBs compression_factor * input_rate * (bw_info.dst * B_to_kB) * MBps_to_GBps / n_events, # dst bandwidth GB/s ] if wg_bw_infos: # Will be fals-y if no triggers for any WG n_metrics = len(wg_bw_infos['TotalInclusive']) wg_bw_infos['SumWGs'] = [ sum(bw_info[i] for wg, bw_info in wg_bw_infos.items() if wg != "TotalInclusive") for i in range(n_metrics) ] rows = [[wg] + bw_info for wg, bw_info in wg_bw_infos.items()] _write_to_csv(output_file_path, rows) return if __name__ == '__main__': parser = argparse.ArgumentParser(description='Inspect Moore output') parser.add_argument( '-c', '--config', type=str, required=True, help='Path to yaml config file defining the input.') parser.add_argument('-s', '--stream', type=str, required=True) parser.add_argument( '-p', '--process', type=str, help='Compute for Hlt1, Hlt2 or Sprucing lines', choices=['hlt1', 'hlt2', 'spruce'], required=True) parser.add_argument( '--stream-config', type=str, choices=KNOWN_STREAM_CONFIGS, required=True) parser.add_argument( '--file-type', choices=("ROOT", "MDF"), required=True, help= "File type of incoming Moore output - ROOT for .dst or MDF for .mdf") args = parser.parse_args() fname_helper = FileNameHelper(args.process, args.stream_config) n_events = int(parse_yaml(fname_helper.input_info_json())['n_evts']) input_config = parse_yaml(args.config) LHCbApp(DataType="Upgrade", Simulation=True, EvtMax=n_events) EventSelector().PrintFreq = 10000 IODataManager(DisablePFNWarning=True) with open(fname_helper.stream_config_json_path()) as f: lines = json.load(f)[args.stream] file_ext = fname_helper.input_type_to_file_ext(args.file_type) IOHelper(args.file_type).inputFiles( [fname_helper.mdfdst_fname_for_reading(args.stream, file_ext)]) # we have to configure the algorithms manually instead of `do_unpacking` # because we need to set `input_process='Hlt2'` in `unpack_rawevent` # to read MDF/DST output from Sprucing # Hlt1 requires different unpacking than hlt2/sprucing. # TODO might be able to absorb into do_unpacking now if args.stream_config == "turbo": input_process = "Turbo" else: input_process = args.process.capitalize() if args.process == "hlt1": unpacker = LHCb__UnpackRawEvent( "UnpackRawEvent", RawBankLocations=["DAQ/RawBanks/HltDecReports"], BankTypes=["HltDecReports"]) decDec = HltDecReportsDecoder( "HltDecReportsDecoder/Hlt1DecReportsDecoder", OutputHltDecReportsLocation="/Event/Hlt1/DecReports", SourceID="Hlt1", DecoderMapping="TCKANNSvc", RawBanks=unpacker.RawBankLocations[0]) appMgr = ApplicationMgr(TopAlg=[unpacker, decDec]) appMgr.ExtSvc += [configured_ann_svc(name='TCKANNSvc')] else: raw_event_unpacker_kwargs = dict( input_process=input_process, simulation=True) if args.process == "spruce": raw_event_unpacker_kwargs["stream"] = args.stream # TODO do_unpacking will try to unpack HLT2 dec banks if spruce # leads to annoying ERROR messages that do nothing. algs = do_unpacking(**raw_event_unpacker_kwargs) algs += [ LHCb__UnpackRawEvent( RawEventLocation= f"{args.stream if args.process == 'spruce' else 'DAQ'}/RawEvent", BankTypes=['ODIN'], RawBankLocations=["DAQ/RawBanks/ODIN"]), createODIN(ODIN="myODIN"), ] appMgr = ApplicationMgr(TopAlg=algs) appMgr.ExtSvc += [configured_ann_svc(json_file=fname_helper.tck())] appMgr = GP.AppMgr() evt = appMgr.evtsvc() input_rate = float(input_config['input_rate']) file_totals, line_totals, wg_bws_for_hlt2, wg_event_numbers_for_hlt2 = rate_info_from_file( LHCbApp().EvtMax, lines, args.process, args.stream) compression_factor = _compression_factor( process=args.process, filesize_path=fname_helper.filesize_path(), file_totals=file_totals, stream=args.stream) n_evts = LHCbApp().EvtMax # Calculate key quantities per stream rates_per_stream(file_totals, args.stream, input_rate, fname_helper.tmp_rate_table_per_stream_path(args.stream), compression_factor, n_evts) # Calculate key quantities per line rates_per_line(line_totals, file_totals, input_rate, fname_helper.tmp_rate_table_per_line_path(args.stream), compression_factor, n_evts) if args.process == "hlt2": # Save the event numbers for WG-by-WG overlaps event_number_file = fname_helper.intra_stream_event_no_fname( args.stream) with open(event_number_file, 'w') as f: json.dump(wg_event_numbers_for_hlt2, f) # Calculate rates/bws within a stream rates_per_wg_intra_stream( wg_bws_for_hlt2, input_rate, fname_helper.tmp_rate_table_intra_stream_path(args.stream), compression_factor, n_evts)