Newer
Older
###############################################################################
# (c) Copyright 2000-2022 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
import GaudiPython as GP
from GaudiConf.reading import do_unpacking
from Configurables import (ApplicationMgr, LHCbApp, IODataManager,
EventSelector, LHCb__UnpackRawEvent,
HltDecReportsDecoder, createODIN)
from GaudiConf import IOHelper
from PyConf.application import configured_ann_svc
import json
import argparse
import csv
from pprint import pprint
from dataclasses import dataclass, field
from typing import List, Dict
from PRConfig.bandwidth_helpers import (FileNameHelper, parse_yaml, guess_wg,
KNOWN_WORKING_GROUPS,
KNOWN_STREAM_CONFIGS)
Run over MDF or DST file to compute:
Per line (in form of single .csv table):
1. Inclusive retention
2. Inclusive rate
3. Exclusive retention
4. Exclusive rate
5. Average event size (all banks in particular stream)
6. Bandwidth
7. Average DstData bank size
8. DstData bandwidth
Per stream (i.e. the whole file)
1. Inclusive retention
2. Inclusive rate
3. Average event size (all banks in particular stream)
4. Bandwidth
5. Average DstData bank size
6. DstData bandwidth
'''
LHCb = GP.gbl.LHCb
RAW_BANK_TYPES = [(i, LHCb.RawBank.typeName(i))
for i in range(LHCb.RawBank.LastType)]
B_to_kB = 1e-3
MBps_to_GBps = 1e-3
# Mirroring Moore.streams - passthrough sprucing drops all banks except these
PASSTHROUGH_BANK_TYPES = [(63, "VP"), (73, "VPRetinaCluster"), (66, "UT"),
(68, "UTError"), (64, "FTCluster"), (9, "Rich"),
(13, "Muon"), (50, "MuonError"), (84, "Plume"),
(77, "Calo"), (78, "CaloError"), (21, "EcalPacked"),
(22, "HcalPacked"), (16, "ODIN"),
(17, "HltDecReports"), (54, 'HltSelReports'),
(53, 'HltRoutingBits'), (60, "DstData")]
@dataclass
class LineBW:
triggered: List[int] = field(
default_factory=lambda: []) # list of ~event numbers
raw: float = 0 # Stores whole event size for every event that fires this line (inclusive)
dst: float = 0 # Just DstData raw bank size, as above
@dataclass
class FileBW:
triggered: Dict[int, int] = field(
default_factory=lambda: dict()) # {event_no: n_times_triggered}
raw: float = 0
dst: float = 0
@dataclass
class WGBW:
n_triggered: int = 0
raw: float = 0
dst: float = 0
def rawbank_sizes(rawevent, lst):
"""Return (name, size) for each raw bank type."""
if rawevent:
def size(i):
return sum(bank.totalSize() for bank in rawevent.banks(i))
else:
def size(i):
return 0
return [(name, size(i)) for i, name in lst]
def _compression_factor(process, filesize_path, file_totals, stream):
with open(filesize_path, 'r') as ifile:
fsizes = json.load(ifile)[stream]
# Output of Hlt2 is uncompressed by default and then compressed by data movers
# Output of Spruce is compresed by default and we estimate the uncompressed size by sum(all rawbanks in file)
# We chose not to scale Hlt1.
if process == 'hlt2':
if fsizes['default'] > 0:
return fsizes['compressed'] / fsizes['default']
else:
# If the file is empty, we assume the compression factor is 1
if file_totals.triggered:
raise RuntimeError(
"File size is 0 but we appear to have some triggered events. Should not happen."
)
return 1.
elif process == 'spruce':
return fsizes['default'] / file_totals.raw if file_totals.raw else 1.
else:
return 1.
def rate_info_from_file(evt_max, lines, process, stream):
'''Loop through the file to work out per-line and per-stream triggers and
event sizes. Outputs used by functions below to compute rates and bandwidths.
decisions = [line + 'Decision' for line in list(lines)]
totals = FileBW() # Per file (stream) information
line_totals = {dec: LineBW() for dec in decisions} # per line
# This block only used for hlt2
event_info = {wg: set() for wg in KNOWN_WORKING_GROUPS}
event_info['Other'] = set()
event_info['TotalInclusive'] = set()
unknown_lines = set()
# Loop over all events
analysed = 0
while analysed < evt_max:
analysed += 1
# Run an event
appMgr.run(1)
report = evt[f'/Event/{process.capitalize()}/DecReports']
# Sprucing has stream-dependent raw event locations - different to HLT1/2
rawevent = evt[
f'/Event/{stream if process == "spruce" else "DAQ"}/RawEvent']
header = evt["/Event/myODIN"]
evtsize = sum(
bank[1] for bank in rawbank_sizes(rawevent, RAW_BANK_TYPES))
dstsize = sum(
bank[1] for bank in rawbank_sizes(rawevent, [(60, 'DstData')]))
spruce_passthrough_evt_size = sum(
bank[1]
for bank in rawbank_sizes(rawevent, PASSTHROUGH_BANK_TYPES))
# Will quit running if there are no more events in the input file
if report:
# Info per file/stream
totals.triggered.update({analysed: 0})
totals.raw += evtsize
totals.dst += dstsize
for dec in line_totals.keys():
# Info per line
if report.decReport(dec) and report.decReport(
dec).decision() == 1:
totals.triggered[analysed] += 1
line_totals[dec].triggered.append(analysed)
line_totals[dec].raw += evtsize
line_totals[dec].dst += dstsize
# Info per WG within the stream
if process == "hlt2":
info = (header.eventNumber(),
spruce_passthrough_evt_size, dstsize)
event_info['TotalInclusive'].add(info)
wg = guess_wg(dec, process)
event_info[wg].add(info)
if wg == "Other":
unknown_lines.add(dec)
if len(unknown_lines):
print(f"Found {len(unknown_lines)} unknown lines. They are...")
pprint(unknown_lines)
# Remove trigger-less WGs, and transform out of tuple for easier & more-robust lookups later
wg_event_numbers = {}
wg_bws = {}
if process == "hlt2":
for wg, event_info_set in event_info.items():
event_numbers = [info[0] for info in event_info_set]
if len(event_numbers) == 0: continue
wg_event_numbers[wg] = event_numbers
wg_bws[wg] = WGBW(
n_triggered=len(event_numbers),
raw=sum([info[1] for info in event_info_set]),
dst=sum([info[2] for info in event_info_set]))
return totals, line_totals, wg_bws, wg_event_numbers
def _write_to_csv(output_path, rows):
with open(output_path, 'w') as f:
for row in rows:
csv_out.writerow(row)
def rates_per_line(line_totals: dict[str, LineBW], file_totals: FileBW,
input_rate: float, output_file_path: str,
compression_factor: float, n_events: int) -> None:
rows = []
if not n_events:
print("No denominator for rates, writing empty .csv of rates.")
_write_to_csv(output_file_path, [])
return
for line, bw_info in line_totals.items():
n_fired = len(bw_info.triggered)
retention = n_fired / n_events
rate = retention * input_rate # kHz, inclusive
n_fired_excl = len([
evt for evt in bw_info.triggered if file_totals.triggered[evt] == 1
])
excl_retention = n_fired_excl / n_events
excl_rate = excl_retention * input_rate # kHz
avg_evt_size = compression_factor * bw_info.raw * B_to_kB / n_fired if n_fired else 0 # kB
avg_dst_size = compression_factor * bw_info.dst * B_to_kB / n_fired if n_fired else 0 # kB
bw = rate * avg_evt_size * MBps_to_GBps # GB/s
dst_bw = rate * avg_dst_size * MBps_to_GBps # GB/s
rows.append([
line, retention * 100, rate, excl_retention * 100, excl_rate,
avg_evt_size, bw, avg_dst_size, dst_bw
])
_write_to_csv(output_file_path, rows)
def rates_per_stream(file_totals: FileBW, stream: str, input_rate: float,
output_file_path: str, compression_factor: float,
n_events: int) -> None:
if not n_events:
print("No denominator for rates, writing empty .csv of rates.")
_write_to_csv(output_file_path, [])
return
n_fired = len(file_totals.triggered)
retention = n_fired / n_events
rate = retention * input_rate
avg_evt_size = compression_factor * file_totals.raw * B_to_kB / n_fired if n_fired else 0
avg_dst_size = compression_factor * file_totals.dst * B_to_kB / n_fired if n_fired else 0
bw = rate * avg_evt_size * MBps_to_GBps # GB/s
dst_bw = rate * avg_dst_size * MBps_to_GBps # GB/s
row = (stream, retention * 100, rate, avg_evt_size, bw, avg_dst_size,
dst_bw)
_write_to_csv(output_file_path, [row])
return
def rates_per_wg_intra_stream(wg_bws: dict[str, WGBW], input_rate: float,
output_file_path: str, compression_factor: float,
n_events: int) -> None:
# Get inclusive rates/bandwidths of each WG within this stream
if not n_events:
print("No denominator for rates, writing empty .csv of rates.")
_write_to_csv(output_file_path, [])
return
wg_bw_infos = dict()
for wg, bw_info in wg_bws.items():
rate = bw_info.n_triggered * input_rate / n_events # kHz
wg_bw_infos[wg] = [
rate,
compression_factor * input_rate * (bw_info.raw * B_to_kB) *
MBps_to_GBps / n_events, # bandwidth, GBs
compression_factor * input_rate * (bw_info.dst * B_to_kB) *
MBps_to_GBps / n_events, # dst bandwidth GB/s
]
if wg_bw_infos:
# Will be fals-y if no triggers for any WG
n_metrics = len(wg_bw_infos['TotalInclusive'])
wg_bw_infos['SumWGs'] = [
sum(bw_info[i] for wg, bw_info in wg_bw_infos.items()
if wg != "TotalInclusive") for i in range(n_metrics)
]
rows = [[wg] + bw_info for wg, bw_info in wg_bw_infos.items()]
_write_to_csv(output_file_path, rows)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Inspect Moore output')
parser.add_argument(
'-c',
'--config',
type=str,
required=True,
help='Path to yaml config file defining the input.')
parser.add_argument('-s', '--stream', type=str, required=True)
help='Compute for Hlt1, Hlt2 or Sprucing lines',
choices=['hlt1', 'hlt2', 'spruce'],
required=True)
parser.add_argument(
choices=KNOWN_STREAM_CONFIGS,
parser.add_argument(
'--file-type',
choices=("ROOT", "MDF"),
required=True,
help=
"File type of incoming Moore output - ROOT for .dst or MDF for .mdf")
args = parser.parse_args()

Ross John Hunter
committed
fname_helper = FileNameHelper(args.process, args.stream_config)

Ross John Hunter
committed
n_events = int(parse_yaml(fname_helper.input_info_json())['n_evts'])
input_config = parse_yaml(args.config)
LHCbApp(DataType="Upgrade", Simulation=True, EvtMax=n_events)
EventSelector().PrintFreq = 10000
IODataManager(DisablePFNWarning=True)

Ross John Hunter
committed
with open(fname_helper.stream_config_json_path()) as f:
lines = json.load(f)[args.stream]
file_ext = fname_helper.input_type_to_file_ext(args.file_type)

Ross John Hunter
committed
IOHelper(args.file_type).inputFiles(
[fname_helper.mdfdst_fname_for_reading(args.stream, file_ext)])
# we have to configure the algorithms manually instead of `do_unpacking`
# because we need to set `input_process='Hlt2'` in `unpack_rawevent`
# to read MDF/DST output from Sprucing
# Hlt1 requires different unpacking than hlt2/sprucing.
# TODO might be able to absorb into do_unpacking now

Ross John Hunter
committed
if args.stream_config == "turbo":
input_process = "Turbo"
else:
input_process = args.process.capitalize()
if args.process == "hlt1":
unpacker = LHCb__UnpackRawEvent(
"UnpackRawEvent",
RawBankLocations=["DAQ/RawBanks/HltDecReports"],
BankTypes=["HltDecReports"])
decDec = HltDecReportsDecoder(
"HltDecReportsDecoder/Hlt1DecReportsDecoder",
OutputHltDecReportsLocation="/Event/Hlt1/DecReports",
SourceID="Hlt1",
DecoderMapping="TCKANNSvc",
RawBanks=unpacker.RawBankLocations[0])
appMgr = ApplicationMgr(TopAlg=[unpacker, decDec])
appMgr.ExtSvc += [configured_ann_svc(name='TCKANNSvc')]
else:
raw_event_unpacker_kwargs = dict(
input_process=input_process, simulation=True)
if args.process == "spruce":
raw_event_unpacker_kwargs["stream"] = args.stream
# TODO do_unpacking will try to unpack HLT2 dec banks if spruce
# leads to annoying ERROR messages that do nothing.
algs = do_unpacking(**raw_event_unpacker_kwargs)
algs += [
LHCb__UnpackRawEvent(
RawEventLocation=
f"{args.stream if args.process == 'spruce' else 'DAQ'}/RawEvent",
BankTypes=['ODIN'],
RawBankLocations=["DAQ/RawBanks/ODIN"]),
createODIN(ODIN="myODIN"),
]
appMgr = ApplicationMgr(TopAlg=algs)

Ross John Hunter
committed
appMgr.ExtSvc += [configured_ann_svc(json_file=fname_helper.tck())]
appMgr = GP.AppMgr()
evt = appMgr.evtsvc()
input_rate = float(input_config['input_rate'])
file_totals, line_totals, wg_bws_for_hlt2, wg_event_numbers_for_hlt2 = rate_info_from_file(
LHCbApp().EvtMax, lines, args.process, args.stream)
compression_factor = _compression_factor(
process=args.process,

Ross John Hunter
committed
filesize_path=fname_helper.filesize_path(),
file_totals=file_totals,
stream=args.stream)
n_evts = LHCbApp().EvtMax
# Calculate key quantities per stream

Ross John Hunter
committed
rates_per_stream(file_totals, args.stream, input_rate,
fname_helper.tmp_rate_table_per_stream_path(args.stream),
compression_factor, n_evts)
# Calculate key quantities per line

Ross John Hunter
committed
rates_per_line(line_totals, file_totals, input_rate,
fname_helper.tmp_rate_table_per_line_path(args.stream),
compression_factor, n_evts)
if args.process == "hlt2":
# Save the event numbers for WG-by-WG overlaps
event_number_file = fname_helper.intra_stream_event_no_fname(

Ross John Hunter
committed
args.stream)
with open(event_number_file, 'w') as f:
json.dump(wg_event_numbers_for_hlt2, f)
# Calculate rates/bws within a stream
rates_per_wg_intra_stream(
wg_bws_for_hlt2, input_rate,

Ross John Hunter
committed
fname_helper.tmp_rate_table_intra_stream_path(args.stream),
compression_factor, n_evts)