Skip to content
Snippets Groups Projects
line-and-stream-rates.py 16.1 KiB
Newer Older
###############################################################################
# (c) Copyright 2000-2022 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################

import GaudiPython as GP
from GaudiConf.reading import do_unpacking
from Configurables import (ApplicationMgr, LHCbApp, IODataManager,
                           EventSelector, LHCb__UnpackRawEvent,
                           HltDecReportsDecoder, createODIN)
from GaudiConf import IOHelper
from PyConf.application import configured_ann_svc
import json
import argparse
import csv
from pprint import pprint
from dataclasses import dataclass, field
from typing import List, Dict
from PRConfig.bandwidth_helpers import (FileNameHelper, parse_yaml, guess_wg,
                                        KNOWN_WORKING_GROUPS,
                                        KNOWN_STREAM_CONFIGS)
    Run over MDF or DST file to compute:
        Per line (in form of single .csv table):
            1. Inclusive retention
            2. Inclusive rate
            3. Exclusive retention
            4. Exclusive rate
            5. Average event size (all banks in particular stream)
            6. Bandwidth
            7. Average DstData bank size
            8. DstData bandwidth
        Per stream (i.e. the whole file)
            1. Inclusive retention
            2. Inclusive rate
            3. Average event size (all banks in particular stream)
            4. Bandwidth
            5. Average DstData bank size
            6. DstData bandwidth
'''

LHCb = GP.gbl.LHCb
RAW_BANK_TYPES = [(i, LHCb.RawBank.typeName(i))
                  for i in range(LHCb.RawBank.LastType)]
# Mirroring Moore.streams - passthrough sprucing drops all banks except these
PASSTHROUGH_BANK_TYPES = [(63, "VP"), (73, "VPRetinaCluster"), (66, "UT"),
                          (68, "UTError"), (64, "FTCluster"), (9, "Rich"),
                          (13, "Muon"), (50, "MuonError"), (84, "Plume"),
                          (77, "Calo"), (78, "CaloError"), (21, "EcalPacked"),
                          (22, "HcalPacked"), (16, "ODIN"),
                          (17, "HltDecReports"), (54, 'HltSelReports'),
                          (53, 'HltRoutingBits'), (60, "DstData")]


@dataclass
class LineBW:
    triggered: List[int] = field(
        default_factory=lambda: [])  # list of ~event numbers
    raw: float = 0  # Stores whole event size for every event that fires this line (inclusive)
    dst: float = 0  # Just DstData raw bank size, as above


@dataclass
class FileBW:
    triggered: Dict[int, int] = field(
        default_factory=lambda: dict())  # {event_no: n_times_triggered}
    raw: float = 0
    dst: float = 0


@dataclass
class WGBW:
    n_triggered: int = 0
    raw: float = 0
    dst: float = 0


def rawbank_sizes(rawevent, lst):
    """Return (name, size) for each raw bank type."""
    if rawevent:

        def size(i):
            return sum(bank.totalSize() for bank in rawevent.banks(i))
    else:

        def size(i):
            return 0

    return [(name, size(i)) for i, name in lst]


def _compression_factor(process, filesize_path, file_totals, stream):
    with open(filesize_path, 'r') as ifile:
        fsizes = json.load(ifile)[stream]

    # Output of Hlt2 is uncompressed by default and then compressed by data movers
    # Output of Spruce is compresed by default and we estimate the uncompressed size by sum(all rawbanks in file)
    # We chose not to scale Hlt1.
    if process == 'hlt2':
        if fsizes['default'] > 0:
            return fsizes['compressed'] / fsizes['default']
        else:
            # If the file is empty, we assume the compression factor is 1
            if file_totals.triggered:
                raise RuntimeError(
                    "File size is 0 but we appear to have some triggered events. Should not happen."
                )
            return 1.
    elif process == 'spruce':
        return fsizes['default'] / file_totals.raw if file_totals.raw else 1.
    else:
        return 1.


def rate_info_from_file(evt_max, lines, process, stream):
    '''Loop through the file to work out per-line and per-stream triggers and
    event sizes. Outputs used by functions below to compute rates and bandwidths.
    decisions = [line + 'Decision' for line in list(lines)]
    totals = FileBW()  # Per file (stream) information
    line_totals = {dec: LineBW() for dec in decisions}  # per line
    # This block only used for hlt2
    event_info = {wg: set() for wg in KNOWN_WORKING_GROUPS}
    event_info['Other'] = set()
    event_info['TotalInclusive'] = set()
    unknown_lines = set()

    # Loop over all events
    analysed = 0
    while analysed < evt_max:
        analysed += 1

        # Run an event
        appMgr.run(1)
        report = evt[f'/Event/{process.capitalize()}/DecReports']
        # Sprucing has stream-dependent raw event locations - different to HLT1/2
        rawevent = evt[
            f'/Event/{stream if process == "spruce" else "DAQ"}/RawEvent']
        evtsize = sum(
            bank[1] for bank in rawbank_sizes(rawevent, RAW_BANK_TYPES))
        dstsize = sum(
            bank[1] for bank in rawbank_sizes(rawevent, [(60, 'DstData')]))
        spruce_passthrough_evt_size = sum(
            bank[1]
            for bank in rawbank_sizes(rawevent, PASSTHROUGH_BANK_TYPES))

        # Will quit running if there are no more events in the input file
        if report:
            # Info per file/stream
            totals.triggered.update({analysed: 0})
            totals.raw += evtsize
            totals.dst += dstsize
            for dec in line_totals.keys():
                # Info per line
                if report.decReport(dec) and report.decReport(
                        dec).decision() == 1:
                    totals.triggered[analysed] += 1
                    line_totals[dec].triggered.append(analysed)
                    line_totals[dec].raw += evtsize
                    line_totals[dec].dst += dstsize

                    # Info per WG within the stream
                    if process == "hlt2":
                        info = (header.eventNumber(),
                                spruce_passthrough_evt_size, dstsize)
                        event_info['TotalInclusive'].add(info)
                        wg = guess_wg(dec, process)
                        event_info[wg].add(info)
                        if wg == "Other":
                            unknown_lines.add(dec)

    if len(unknown_lines):
        print(f"Found {len(unknown_lines)} unknown lines. They are...")
        pprint(unknown_lines)

    # Remove trigger-less WGs, and transform out of tuple for easier & more-robust lookups later
    wg_event_numbers = {}
    wg_bws = {}
    if process == "hlt2":
        for wg, event_info_set in event_info.items():
            event_numbers = [info[0] for info in event_info_set]
            if len(event_numbers) == 0: continue

            wg_event_numbers[wg] = event_numbers
            wg_bws[wg] = WGBW(
                n_triggered=len(event_numbers),
                raw=sum([info[1] for info in event_info_set]),
                dst=sum([info[2] for info in event_info_set]))

    return totals, line_totals, wg_bws, wg_event_numbers


def _write_to_csv(output_path, rows):
    with open(output_path, 'w') as f:
        csv_out = csv.writer(f)
        for row in rows:
            csv_out.writerow(row)


def rates_per_line(line_totals: dict[str, LineBW], file_totals: FileBW,
                   input_rate: float, output_file_path: str,
                   compression_factor: float, n_events: int) -> None:
    if not n_events:
        print("No denominator for rates, writing empty .csv of rates.")
        _write_to_csv(output_file_path, [])
        return
    for line, bw_info in line_totals.items():

        n_fired = len(bw_info.triggered)
        retention = n_fired / n_events
        rate = retention * input_rate  # kHz, inclusive

        n_fired_excl = len([
            evt for evt in bw_info.triggered if file_totals.triggered[evt] == 1
        ])
        excl_retention = n_fired_excl / n_events
        excl_rate = excl_retention * input_rate  # kHz

        avg_evt_size = compression_factor * bw_info.raw * B_to_kB / n_fired if n_fired else 0  # kB
        avg_dst_size = compression_factor * bw_info.dst * B_to_kB / n_fired if n_fired else 0  # kB
        bw = rate * avg_evt_size * MBps_to_GBps  # GB/s
        dst_bw = rate * avg_dst_size * MBps_to_GBps  # GB/s

        rows.append([
            line, retention * 100, rate, excl_retention * 100, excl_rate,
            avg_evt_size, bw, avg_dst_size, dst_bw
        ])

    _write_to_csv(output_file_path, rows)
def rates_per_stream(file_totals: FileBW, stream: str, input_rate: float,
                     output_file_path: str, compression_factor: float,
                     n_events: int) -> None:

    if not n_events:
        print("No denominator for rates, writing empty .csv of rates.")
        _write_to_csv(output_file_path, [])
        return
    n_fired = len(file_totals.triggered)
    retention = n_fired / n_events
    rate = retention * input_rate
    avg_evt_size = compression_factor * file_totals.raw * B_to_kB / n_fired if n_fired else 0
    avg_dst_size = compression_factor * file_totals.dst * B_to_kB / n_fired if n_fired else 0
    bw = rate * avg_evt_size * MBps_to_GBps  # GB/s
    dst_bw = rate * avg_dst_size * MBps_to_GBps  # GB/s
    row = (stream, retention * 100, rate, avg_evt_size, bw, avg_dst_size,
           dst_bw)
    _write_to_csv(output_file_path, [row])
    return


def rates_per_wg_intra_stream(wg_bws: dict[str, WGBW], input_rate: float,
                              output_file_path: str, compression_factor: float,
                              n_events: int) -> None:
    # Get inclusive rates/bandwidths of each WG within this stream
    if not n_events:
        print("No denominator for rates, writing empty .csv of rates.")
        _write_to_csv(output_file_path, [])
        return

    wg_bw_infos = dict()
    for wg, bw_info in wg_bws.items():
        rate = bw_info.n_triggered * input_rate / n_events  # kHz
        wg_bw_infos[wg] = [
            rate,
            compression_factor * input_rate * (bw_info.raw * B_to_kB) *
            MBps_to_GBps / n_events,  # bandwidth, GBs
            compression_factor * input_rate * (bw_info.dst * B_to_kB) *
            MBps_to_GBps / n_events,  # dst bandwidth GB/s
    if wg_bw_infos:
        # Will be fals-y if no triggers for any WG
        n_metrics = len(wg_bw_infos['TotalInclusive'])
        wg_bw_infos['SumWGs'] = [
            sum(bw_info[i] for wg, bw_info in wg_bw_infos.items()
                if wg != "TotalInclusive") for i in range(n_metrics)
        ]
    rows = [[wg] + bw_info for wg, bw_info in wg_bw_infos.items()]

    _write_to_csv(output_file_path, rows)
if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='Inspect Moore output')
    parser.add_argument(
        '-c',
        '--config',
        type=str,
        required=True,
        help='Path to yaml config file defining the input.')
    parser.add_argument('-s', '--stream', type=str, required=True)
    parser.add_argument(
        help='Compute for Hlt1, Hlt2 or Sprucing lines',
        choices=['hlt1', 'hlt2', 'spruce'],
        required=True)
    parser.add_argument(
        '--stream-config',
    parser.add_argument(
        '--file-type',
        choices=("ROOT", "MDF"),
        required=True,
        help=
        "File type of incoming Moore output - ROOT for .dst or MDF for .mdf")
    fname_helper = FileNameHelper(args.process, args.stream_config)
    n_events = int(parse_yaml(fname_helper.input_info_json())['n_evts'])
    input_config = parse_yaml(args.config)

    LHCbApp(DataType="Upgrade", Simulation=True, EvtMax=n_events)
    EventSelector().PrintFreq = 10000
    IODataManager(DisablePFNWarning=True)

    with open(fname_helper.stream_config_json_path()) as f:
    file_ext = fname_helper.input_type_to_file_ext(args.file_type)
    IOHelper(args.file_type).inputFiles(
        [fname_helper.mdfdst_fname_for_reading(args.stream, file_ext)])
    # we have to configure the algorithms manually instead of `do_unpacking`
    # because we need to set `input_process='Hlt2'` in `unpack_rawevent`
    # to read MDF/DST output from Sprucing
    # Hlt1 requires different unpacking than hlt2/sprucing.
    # TODO might be able to absorb into do_unpacking now
        input_process = "Turbo"
    else:
        input_process = args.process.capitalize()
        unpacker = LHCb__UnpackRawEvent(
            "UnpackRawEvent",
            RawBankLocations=["DAQ/RawBanks/HltDecReports"],
            BankTypes=["HltDecReports"])
        decDec = HltDecReportsDecoder(
            "HltDecReportsDecoder/Hlt1DecReportsDecoder",
            OutputHltDecReportsLocation="/Event/Hlt1/DecReports",
            SourceID="Hlt1",
            DecoderMapping="TCKANNSvc",
            RawBanks=unpacker.RawBankLocations[0])
        appMgr = ApplicationMgr(TopAlg=[unpacker, decDec])
        appMgr.ExtSvc += [configured_ann_svc(name='TCKANNSvc')]
        raw_event_unpacker_kwargs = dict(
            input_process=input_process, simulation=True)
        if args.process == "spruce":
            raw_event_unpacker_kwargs["stream"] = args.stream
        # TODO do_unpacking will try to unpack HLT2 dec banks if spruce
        # leads to annoying ERROR messages that do nothing.
        algs = do_unpacking(**raw_event_unpacker_kwargs)
        algs += [
            LHCb__UnpackRawEvent(
                RawEventLocation=
                f"{args.stream if args.process == 'spruce' else 'DAQ'}/RawEvent",
                BankTypes=['ODIN'],
                RawBankLocations=["DAQ/RawBanks/ODIN"]),
            createODIN(ODIN="myODIN"),
        ]
        appMgr = ApplicationMgr(TopAlg=algs)
        appMgr.ExtSvc += [configured_ann_svc(json_file=fname_helper.tck())]
    appMgr = GP.AppMgr()
    evt = appMgr.evtsvc()
    input_rate = float(input_config['input_rate'])
    file_totals, line_totals, wg_bws_for_hlt2, wg_event_numbers_for_hlt2 = rate_info_from_file(
        LHCbApp().EvtMax, lines, args.process, args.stream)
    compression_factor = _compression_factor(
        process=args.process,
        file_totals=file_totals,
        stream=args.stream)

    # Calculate key quantities per stream
    rates_per_stream(file_totals, args.stream, input_rate,
                     fname_helper.tmp_rate_table_per_stream_path(args.stream),
                     compression_factor, n_evts)
    # Calculate key quantities per line
    rates_per_line(line_totals, file_totals, input_rate,
                   fname_helper.tmp_rate_table_per_line_path(args.stream),
                   compression_factor, n_evts)

    if args.process == "hlt2":
        # Save the event numbers for WG-by-WG overlaps
        event_number_file = fname_helper.intra_stream_event_no_fname(
        with open(event_number_file, 'w') as f:
            json.dump(wg_event_numbers_for_hlt2, f)

        # Calculate rates/bws within a stream
        rates_per_wg_intra_stream(
            wg_bws_for_hlt2, input_rate,
            fname_helper.tmp_rate_table_intra_stream_path(args.stream),
            compression_factor, n_evts)