Newer
Older
###############################################################################
# (c) Copyright 2000-2022 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
import GaudiPython as GP
from GaudiConf.reading import do_unpacking
from Configurables import (ApplicationMgr, LHCbApp, IODataManager,
EventSelector, LHCb__UnpackRawEvent,
HltDecReportsDecoder, createODIN)
from GaudiConf import IOHelper
from PyConf.application import configured_ann_svc
import json
import argparse
import csv
from pprint import pprint
from dataclasses import dataclass, field
from typing import List, Dict
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml, guess_wg, KNOWN_WORKING_GROUPS
Run over MDF or DST file to compute:
Per line (in form of single .csv table):
1. Inclusive retention
2. Inclusive rate
3. Exclusive retention
4. Exclusive rate
5. Average event size (all banks in particular stream)
6. Bandwidth
7. Average DstData bank size
8. DstData bandwidth
Per stream (i.e. the whole file)
1. Inclusive retention
2. Inclusive rate
3. Average event size (all banks in particular stream)
4. Bandwidth
5. Average DstData bank size
6. DstData bandwidth
'''
LHCb = GP.gbl.LHCb
RAW_BANK_TYPES = [(i, LHCb.RawBank.typeName(i))
for i in range(LHCb.RawBank.LastType)]
B_to_kB = 1e-3
MBps_to_GBps = 1e-3
@dataclass
class LineBW:
triggered: List[int] = field(
default_factory=lambda: []) # list of ~event numbers
raw: float = 0 # Stores whole event size for every event that fires this line (inclusive)
dst: float = 0 # Just DstData raw bank size, as above
@dataclass
class FileBW:
triggered: Dict[int, int] = field(
default_factory=lambda: dict()) # {event_no: n_times_triggered}
raw: float = 0
dst: float = 0
@dataclass
class WGBW:
n_triggered: int = 0
raw: float = 0
dst: float = 0
def rawbank_sizes(rawevent, lst):
"""Return (name, size) for each raw bank type."""
if rawevent:
def size(i):
return sum(bank.totalSize() for bank in rawevent.banks(i))
else:
def size(i):
return 0
return [(name, size(i)) for i, name in lst]
def rate_info_from_file(evt_max, lines, process, stream):
'''Loop through the file to work out per-line and per-stream triggers and
event sizes. Outputs used by functions below to compute rates and bandwidths.
decisions = [line + 'Decision' for line in list(lines)]
totals = FileBW() # Per file (stream) information
line_totals = {dec: LineBW() for dec in decisions} # per line
# This block only used for hlt2
event_info = {wg: set() for wg in KNOWN_WORKING_GROUPS}
event_info['Other'] = set()
event_info['TotalInclusive'] = set()
unknown_lines = set()
# Loop over all events
analysed = 0
while analysed < evt_max:
analysed += 1
# Run an event
appMgr.run(1)
report = evt[f'/Event/{process.capitalize()}/DecReports']
# Sprucing has stream-dependent raw event locations - different to HLT1/2
rawevent = evt[
f'/Event/{stream if process == "spruce" else "DAQ"}/RawEvent']
header = evt["/Event/myODIN"]
evtsize = sum(
bank[1] for bank in rawbank_sizes(rawevent, RAW_BANK_TYPES))
dstsize = sum(
bank[1] for bank in rawbank_sizes(rawevent, [(60, 'DstData')]))
# Will quit running if there are no more events in the input file
if report:
# Info per file/stream
totals.triggered.update({analysed: 0})
totals.raw += evtsize
totals.dst += dstsize
for dec in line_totals.keys():
# Info per line
if report.decReport(dec) and report.decReport(
dec).decision() == 1:
totals.triggered[analysed] += 1
line_totals[dec].triggered.append(analysed)
line_totals[dec].raw += evtsize
line_totals[dec].dst += dstsize
# Info per WG within the stream
if process == "hlt2":
info = (header.eventNumber(), evtsize, dstsize)
event_info['TotalInclusive'].add(info)
wg = guess_wg(dec, process)
event_info[wg].add(info)
if wg == "Other":
unknown_lines.add(dec)
if len(unknown_lines):
print(f"Found {len(unknown_lines)} unknown lines. They are...")
pprint(unknown_lines)
# Remove trigger-less WGs, and transform out of tuple for easier & more-robust lookups later
wg_event_numbers = {}
wg_bws = {}
if process == "hlt2":
for wg, event_info_set in event_info.items():
event_numbers = [info[0] for info in event_info_set]
if len(event_numbers) == 0: continue
wg_event_numbers[wg] = event_numbers
wg_bws[wg] = WGBW(
n_triggered=len(event_numbers),
raw=sum([info[1] for info in event_info_set]),
dst=sum([info[2] for info in event_info_set]))
return totals, line_totals, wg_bws, wg_event_numbers
def _write_to_csv(output_path, rows):
with open(output_path, 'w') as f:
for row in rows:
csv_out.writerow(row)
def rates_per_line(line_totals: dict[str, LineBW], file_totals: FileBW,
input_rate: float, output_file_path: str) -> None:
rows = []
for line, bw_info in line_totals.items():
n_events = LHCbApp().EvtMax
n_fired = len(bw_info.triggered)
retention = n_fired / n_events
rate = retention * input_rate # kHz, inclusive
n_fired_excl = len([
evt for evt in bw_info.triggered if file_totals.triggered[evt] == 1
])
excl_retention = n_fired_excl / n_events
excl_rate = excl_retention * input_rate # kHz
avg_evt_size = bw_info.raw * B_to_kB / n_fired if n_fired else 0 # kB
avg_dst_size = bw_info.dst * B_to_kB / n_fired if n_fired else 0 # kB
bw = rate * avg_evt_size * MBps_to_GBps # GB/s
dst_bw = rate * avg_dst_size * MBps_to_GBps # GB/s
rows.append([
line, retention * 100, rate, excl_retention * 100, excl_rate,
avg_evt_size, bw, avg_dst_size, dst_bw
])
_write_to_csv(output_file_path, rows)
def rates_per_stream(file_totals: FileBW, stream: str, input_rate: float,
output_file_path: str) -> None:
n_events = LHCbApp().EvtMax
n_fired = len(file_totals.triggered)
retention = n_fired / n_events
rate = retention * input_rate
avg_evt_size = file_totals.raw * B_to_kB / n_fired if n_fired else 0
avg_dst_size = file_totals.dst * B_to_kB / n_fired if n_fired else 0
bw = rate * avg_evt_size * MBps_to_GBps # GB/s
dst_bw = rate * avg_dst_size * MBps_to_GBps # GB/s
row = (stream, retention * 100, rate, avg_evt_size, bw, avg_dst_size,
dst_bw)
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
_write_to_csv(output_file_path, [row])
return
def rates_per_wg_intra_stream(wg_bws: dict[str, WGBW], input_rate: float,
output_file_path: str) -> None:
# Get inclusive rates/bandwidths of each WG within this stream
n_events = LHCbApp().EvtMax
wg_bw_infos = dict()
for wg, bw_info in wg_bws.items():
rate = bw_info.n_triggered * input_rate / n_events # kHz
wg_bw_infos[wg] = [
rate,
input_rate * (bw_info.raw * B_to_kB) * MBps_to_GBps /
n_events, # bandwidth, GBs
input_rate * (bw_info.dst * B_to_kB) * MBps_to_GBps /
n_events, # dst bandwidth GB/s
]
n_metrics = len(wg_bw_infos['TotalInclusive'])
wg_bw_infos['SumWGs'] = [
sum(bw_info[i] for wg, bw_info in wg_bw_infos.items()
if wg != "TotalInclusive") for i in range(n_metrics)
]
rows = [[wg] + bw_info for wg, bw_info in wg_bw_infos.items()]
_write_to_csv(output_file_path, rows)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Inspect Moore output')
parser.add_argument(
'-c',
'--config',
type=str,
required=True,
help='Path to yaml config file defining the input.')
parser.add_argument('-s', '--stream', type=str, required=True)
help='Compute for Hlt1, Hlt2 or Sprucing lines',
choices=['hlt1', 'hlt2', 'spruce'],
required=True)
parser.add_argument(
help='Choose production, per-WG or streamless stream configuration',
choices=['streamless', 'production', 'wg'],
parser.add_argument(
'--file-type',
choices=("ROOT", "MDF"),
required=True,
help=
"File type of incoming Moore output - ROOT for .dst or MDF for .mdf")
args = parser.parse_args()
fname_helper = FileNameHelper(args.process)

Luke Grazette
committed
n_events = int(parse_yaml(fname_helper.input_nevts_json())['n_evts'])
input_config = parse_yaml(args.config)
if args.process == "spruce" and args.stream_config != "wg":
'"production" and "streamless" stream configs are not defined for sprucing. Please use "wg".'
if args.process == "hlt1" and args.stream_config != "streamless":
raise RuntimeError(
'"production" and "wg" stream configs are not defined for hlt1. Please use "streamless".'
)
LHCbApp(DataType="Upgrade", Simulation=True, EvtMax=n_events)
EventSelector().PrintFreq = 10000
IODataManager(DisablePFNWarning=True)
with open(fname_helper.stream_config_json_path(args.stream_config)) as f:
lines = json.load(f)[args.stream]
file_ext = fname_helper.input_type_to_file_ext(args.file_type)
IOHelper(args.file_type).inputFiles([
fname_helper.mdfdst_fname_for_reading(args.stream_config, args.stream,
file_ext)
])
# we have to configure the algorithms manually instead of `do_unpacking`
# because we need to set `input_process='Hlt2'` in `unpack_rawevent`
# to read MDF/DST output from Sprucing
# Hlt1 requires different unpacking than hlt2/sprucing.
# TODO might be able to absorb into do_unpacking now
input_process = args.process.capitalize()
if args.process == "hlt1":
unpacker = LHCb__UnpackRawEvent(
"UnpackRawEvent",
RawBankLocations=["DAQ/RawBanks/HltDecReports"],
BankTypes=["HltDecReports"])
decDec = HltDecReportsDecoder(
"HltDecReportsDecoder/Hlt1DecReportsDecoder",
OutputHltDecReportsLocation="/Event/Hlt1/DecReports",
SourceID="Hlt1",
DecoderMapping="TCKANNSvc",
RawBanks=unpacker.RawBankLocations[0])
appMgr = ApplicationMgr(TopAlg=[unpacker, decDec])
appMgr.ExtSvc += [configured_ann_svc(name='TCKANNSvc')]
else:
raw_event_unpacker_kwargs = dict(
input_process=input_process, simulation=True)
if args.process == "spruce":
raw_event_unpacker_kwargs["stream"] = args.stream
# TODO do_unpacking will try to unpack HLT2 dec banks if spruce
# leads to annoying ERROR messages that do nothing.
algs = do_unpacking(**raw_event_unpacker_kwargs)
algs += [
LHCb__UnpackRawEvent(
RawEventLocation=
f"{args.stream if args.process == 'spruce' else 'DAQ'}/RawEvent",
BankTypes=['ODIN'],
RawBankLocations=["DAQ/RawBanks/ODIN"]),
createODIN(ODIN="myODIN"),
]
appMgr = ApplicationMgr(TopAlg=algs)
appMgr.ExtSvc += [
configured_ann_svc(json_file=fname_helper.tck(args.stream_config))
]
appMgr = GP.AppMgr()
evt = appMgr.evtsvc()
input_rate = int(input_config['input_rate'])
file_totals, line_totals, wg_bws_for_hlt2, wg_event_numbers_for_hlt2 = rate_info_from_file(
LHCbApp().EvtMax, lines, args.process, args.stream)
# Calculate key quantities per stream
rates_per_stream(
file_totals, args.stream, input_rate,
fname_helper.tmp_rate_table_per_stream_path(args.stream_config,
args.stream))
# Calculate key quantities per line
rates_per_line(
line_totals, file_totals, input_rate,
fname_helper.tmp_rate_table_per_line_path(args.stream_config,
args.stream))
if args.process == "hlt2":
# Save the event numbers for WG-by-WG overlaps
event_number_file = fname_helper.intra_stream_event_no_fname(
args.stream_config, args.stream)
with open(event_number_file, 'w') as f:
json.dump(wg_event_numbers_for_hlt2, f)
# Calculate rates/bws within a stream
rates_per_wg_intra_stream(
wg_bws_for_hlt2, input_rate,
fname_helper.tmp_rate_table_intra_stream_path(
args.stream_config, args.stream))