Skip to content
Snippets Groups Projects
line-and-stream-rates.py 15.1 KiB
Newer Older
###############################################################################
# (c) Copyright 2000-2022 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################

import GaudiPython as GP
from GaudiConf.reading import do_unpacking
Sebastien Ponce's avatar
Sebastien Ponce committed
from Configurables import HltDecReportsDecoder
from PyConf.control_flow import CompositeNode, NodeLogic
from PyConf.application import configure_input, configure, default_raw_banks, create_or_reuse_rootIOAlg
import json
import argparse
import csv
from pprint import pprint
from dataclasses import dataclass, field
from typing import List, Dict
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml, guess_wg, KNOWN_WORKING_GROUPS
    Run over MDF or DST file to compute:
        Per line (in form of single .csv table):
            1. Inclusive retention
            2. Inclusive rate
            3. Exclusive retention
            4. Exclusive rate
            5. Average event size (all banks in particular stream)
            6. Bandwidth
            7. Average DstData bank size
            8. DstData bandwidth
        Per stream (i.e. the whole file)
            1. Inclusive retention
            2. Inclusive rate
            3. Average event size (all banks in particular stream)
            4. Bandwidth
            5. Average DstData bank size
            6. DstData bandwidth
'''

LHCb = GP.gbl.LHCb
RAW_BANK_TYPES = [(i, LHCb.RawBank.typeName(i))
                  for i in range(LHCb.RawBank.LastType)]
# Mirroring Moore.streams - passthrough sprucing drops all banks except these
PASSTHROUGH_BANK_TYPES = [(63, "VP"), (73, "VPRetinaCluster"), (66, "UT"),
                          (64, "FTCluster"), (9, "Rich"), (13, "Muon"),
                          (50, "MuonError"), (84, "Plume"), (77, "Calo"),
                          (78, "CaloError"), (21, "EcalPacked"),
                          (22, "HcalPacked"), (16, "ODIN"),
                          (17, "HltDecReports"), (54, 'HltSelReports'),
                          (53, 'HltRoutingBits'), (60, "DstData")]


@dataclass
class LineBW:
    triggered: List[int] = field(
        default_factory=lambda: [])  # list of ~event numbers
    raw: float = 0  # Stores whole event size for every event that fires this line (inclusive)
    dst: float = 0  # Just DstData raw bank size, as above


@dataclass
class FileBW:
    triggered: Dict[int, int] = field(
        default_factory=lambda: dict())  # {event_no: n_times_triggered}
    raw: float = 0
    dst: float = 0


@dataclass
class WGBW:
    n_triggered: int = 0
    raw: float = 0
    dst: float = 0


def rawbank_sizes(rawevent, lst):
    """Return (name, size) for each raw bank type."""
    if rawevent:

        def size(i):
            return sum(bank.totalSize() for bank in rawevent.banks(i))
    else:

        def size(i):
            return 0

    return [(name, size(i)) for i, name in lst]


def _compression_factor(process, filesize_path, file_totals, stream):
    with open(filesize_path, 'r') as ifile:
        fsizes = json.load(ifile)[stream]

    # Output of Hlt2 is uncompressed by default and then compressed by data movers
    # Output of Spruce is compresed by default and we estimate the uncompressed size by sum(all rawbanks in file)
    # We chose not to scale Hlt1.
    if process == 'hlt2':
        return fsizes['compressed'] / fsizes['default']
    elif process == 'spruce':
        return fsizes['default'] / file_totals.raw if file_totals.raw else 1.
    else:
        return 1.


def rate_info_from_file(evt_max, lines, process, stream):
    '''Loop through the file to work out per-line and per-stream triggers and
    event sizes. Outputs used by functions below to compute rates and bandwidths.
    decisions = [line + 'Decision' for line in list(lines)]
    totals = FileBW()  # Per file (stream) information
    line_totals = {dec: LineBW() for dec in decisions}  # per line
    # This block only used for hlt2
    event_info = {wg: set() for wg in KNOWN_WORKING_GROUPS}
    event_info['Other'] = set()
    event_info['TotalInclusive'] = set()
    unknown_lines = set()

    # Loop over all events
    analysed = 0
    while analysed < evt_max:
        analysed += 1

        # Run an event
        appMgr.run(1)
        report = evt[f'/Event/{process.capitalize()}/DecReports']
        # Sprucing has stream-dependent raw event locations - different to HLT1/2
        rawevent = evt[
            f'/Event/{stream if process == "spruce" else "DAQ"}/RawEvent']
        evtsize = sum(
            bank[1] for bank in rawbank_sizes(rawevent, RAW_BANK_TYPES))
        dstsize = sum(
            bank[1] for bank in rawbank_sizes(rawevent, [(60, 'DstData')]))
        spruce_passthrough_evt_size = sum(
            bank[1]
            for bank in rawbank_sizes(rawevent, PASSTHROUGH_BANK_TYPES))

        # Will quit running if there are no more events in the input file
        if report:
            # Info per file/stream
            totals.triggered.update({analysed: 0})
            totals.raw += evtsize
            totals.dst += dstsize
            for dec in line_totals.keys():
                # Info per line
                if report.decReport(dec) and report.decReport(
                        dec).decision() == 1:
                    totals.triggered[analysed] += 1
                    line_totals[dec].triggered.append(analysed)
                    line_totals[dec].raw += evtsize
                    line_totals[dec].dst += dstsize

                    # Info per WG within the stream
                    if process == "hlt2":
                        info = (header.eventNumber(),
                                spruce_passthrough_evt_size, dstsize)
                        event_info['TotalInclusive'].add(info)
                        wg = guess_wg(dec, process)
                        event_info[wg].add(info)
                        if wg == "Other":
                            unknown_lines.add(dec)

    if len(unknown_lines):
        print(f"Found {len(unknown_lines)} unknown lines. They are...")
        pprint(unknown_lines)

    # Remove trigger-less WGs, and transform out of tuple for easier & more-robust lookups later
    wg_event_numbers = {}
    wg_bws = {}
    if process == "hlt2":
        for wg, event_info_set in event_info.items():
            event_numbers = [info[0] for info in event_info_set]
            if len(event_numbers) == 0: continue

            wg_event_numbers[wg] = event_numbers
            wg_bws[wg] = WGBW(
                n_triggered=len(event_numbers),
                raw=sum([info[1] for info in event_info_set]),
                dst=sum([info[2] for info in event_info_set]))

    return totals, line_totals, wg_bws, wg_event_numbers


def _write_to_csv(output_path, rows):
    with open(output_path, 'w') as f:
        csv_out = csv.writer(f)
        for row in rows:
            csv_out.writerow(row)


def rates_per_line(line_totals: dict[str, LineBW], file_totals: FileBW,
                   input_rate: float, output_file_path: str,
Sebastien Ponce's avatar
Sebastien Ponce committed
                   compression_factor: float, n_events: int) -> None:
    for line, bw_info in line_totals.items():

        n_fired = len(bw_info.triggered)
        retention = n_fired / n_events
        rate = retention * input_rate  # kHz, inclusive

        n_fired_excl = len([
            evt for evt in bw_info.triggered if file_totals.triggered[evt] == 1
        ])
        excl_retention = n_fired_excl / n_events
        excl_rate = excl_retention * input_rate  # kHz

        avg_evt_size = compression_factor * bw_info.raw * B_to_kB / n_fired if n_fired else 0  # kB
        avg_dst_size = compression_factor * bw_info.dst * B_to_kB / n_fired if n_fired else 0  # kB
        bw = rate * avg_evt_size * MBps_to_GBps  # GB/s
        dst_bw = rate * avg_dst_size * MBps_to_GBps  # GB/s

        rows.append([
            line, retention * 100, rate, excl_retention * 100, excl_rate,
            avg_evt_size, bw, avg_dst_size, dst_bw
        ])

    _write_to_csv(output_file_path, rows)
def rates_per_stream(file_totals: FileBW, stream: str, input_rate: float,
Sebastien Ponce's avatar
Sebastien Ponce committed
                     output_file_path: str, compression_factor: float,
                     n_events: int) -> None:
    n_fired = len(file_totals.triggered)
    retention = n_fired / n_events
    rate = retention * input_rate
    avg_evt_size = compression_factor * file_totals.raw * B_to_kB / n_fired if n_fired else 0
    avg_dst_size = compression_factor * file_totals.dst * B_to_kB / n_fired if n_fired else 0
    bw = rate * avg_evt_size * MBps_to_GBps  # GB/s
    dst_bw = rate * avg_dst_size * MBps_to_GBps  # GB/s
    row = (stream, retention * 100, rate, avg_evt_size, bw, avg_dst_size,
           dst_bw)
    _write_to_csv(output_file_path, [row])
    return


def rates_per_wg_intra_stream(wg_bws: dict[str, WGBW], input_rate: float,
Sebastien Ponce's avatar
Sebastien Ponce committed
                              output_file_path: str, compression_factor: float,
                              n_events: int) -> None:
    # Get inclusive rates/bandwidths of each WG within this stream

    wg_bw_infos = dict()
    for wg, bw_info in wg_bws.items():
        rate = bw_info.n_triggered * input_rate / n_events  # kHz
        wg_bw_infos[wg] = [
            rate,
            compression_factor * input_rate * (bw_info.raw * B_to_kB) *
            MBps_to_GBps / n_events,  # bandwidth, GBs
            compression_factor * input_rate * (bw_info.dst * B_to_kB) *
            MBps_to_GBps / n_events,  # dst bandwidth GB/s
        ]

    n_metrics = len(wg_bw_infos['TotalInclusive'])
    wg_bw_infos['SumWGs'] = [
        sum(bw_info[i] for wg, bw_info in wg_bw_infos.items()
            if wg != "TotalInclusive") for i in range(n_metrics)
    ]
    rows = [[wg] + bw_info for wg, bw_info in wg_bw_infos.items()]

    _write_to_csv(output_file_path, rows)
if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='Inspect Moore output')
    parser.add_argument(
        '-c',
        '--config',
        type=str,
        required=True,
        help='Path to yaml config file defining the input.')
    parser.add_argument('-s', '--stream', type=str, required=True)
    parser.add_argument(
        help='Compute for Hlt1, Hlt2 or Sprucing lines',
        choices=['hlt1', 'hlt2', 'spruce'],
        required=True)
    parser.add_argument(
        '--stream-config',
        help='Choose production, per-WG or streamless stream configuration',
        choices=['streamless', 'production', 'wg', 'wgpass', 'turcal'],
    parser.add_argument(
        '--file-type',
        choices=("ROOT", "MDF"),
        required=True,
        help=
        "File type of incoming Moore output - ROOT for .dst or MDF for .mdf")
    fname_helper = FileNameHelper(args.process)

    n_events = int(
        parse_yaml(fname_helper.input_info_json(args.stream_config))['n_evts'])
    input_config = parse_yaml(args.config)

    with open(fname_helper.stream_config_json_path(args.stream_config)) as f:
        lines = json.load(f)[args.stream]
    file_ext = fname_helper.input_type_to_file_ext(args.file_type)
Sebastien Ponce's avatar
Sebastien Ponce committed
    options.dddb_tag = input_config['dddb_tag']
    options.conddb_tag = input_config['conddb_tag']
    options.input_files = [
        fname_helper.mdfdst_fname_for_reading(args.stream_config, args.stream,
                                              file_ext)
Sebastien Ponce's avatar
Sebastien Ponce committed
    ]
    options.input_type = args.file_type
    options.evt_max = n_events
    options.simulation = True
    options.gaudipython_mode = True
    config = configure_input(options)
    # we have to configure the algorithms manually instead of `do_unpacking`
    # because we need to set `input_process='Hlt2'` in `unpack_rawevent`
    # to read MDF/DST output from Sprucing
    # Hlt1 requires different unpacking than hlt2/sprucing.
    # TODO might be able to absorb into do_unpacking now
    if args.stream_config == "wgpass":
        input_process = "Turbo"
    else:
        input_process = args.process.capitalize()
    if args.process == "hlt1":
        decDec = HltDecReportsDecoder(
            "HltDecReportsDecoder/Hlt1DecReportsDecoder",
            OutputHltDecReportsLocation="/Event/Hlt1/DecReports",
            SourceID="Hlt1",
            DecoderMapping="TCKANNSvc",
Sebastien Ponce's avatar
Sebastien Ponce committed
            RawBanks=default_raw_banks("HltDecReports"))
        algs = [decDec]
Sebastien Ponce's avatar
Sebastien Ponce committed
        raw_event_unpacker_kwargs = dict(input_process=input_process)
        if args.process == "spruce":
            raw_event_unpacker_kwargs["stream"] = args.stream
Sebastien Ponce's avatar
Sebastien Ponce committed
        options.stream = args.stream
        # need ot create RootIOAlg for do_unpacking to work without
        # specifying options there
        create_or_reuse_rootIOAlg(options)
        # TODO do_unpacking will try to unpack HLT2 dec banks if spruce
        # leads to annoying ERROR messages that do nothing.
        algs = do_unpacking(**raw_event_unpacker_kwargs)

Sebastien Ponce's avatar
Sebastien Ponce committed
    cf_node = CompositeNode('TopAlg', algs)
    config.update(configure(options, cf_node))
    appMgr = GP.AppMgr()
    evt = appMgr.evtsvc()
    input_rate = float(input_config['input_rate'])
    file_totals, line_totals, wg_bws_for_hlt2, wg_event_numbers_for_hlt2 = rate_info_from_file(
Sebastien Ponce's avatar
Sebastien Ponce committed
        options.evt_max, lines, args.process, args.stream)
    compression_factor = _compression_factor(
        process=args.process,
        filesize_path=fname_helper.filesize_path(args.stream_config),
        file_totals=file_totals,
        stream=args.stream)

    # Calculate key quantities per stream
        file_totals, args.stream, input_rate,
Sebastien Ponce's avatar
Sebastien Ponce committed
        fname_helper.tmp_rate_table_per_stream_path(args.stream_config,
                                                    args.stream),
        compression_factor, options.evt_max)
    # Calculate key quantities per line
    rates_per_line(
        line_totals, file_totals, input_rate,
Sebastien Ponce's avatar
Sebastien Ponce committed
        fname_helper.tmp_rate_table_per_line_path(args.stream_config,
                                                  args.stream),
        compression_factor, options.evt_max)

    if args.process == "hlt2":
        # Save the event numbers for WG-by-WG overlaps
        event_number_file = fname_helper.intra_stream_event_no_fname(
            args.stream_config, args.stream)
        with open(event_number_file, 'w') as f:
            json.dump(wg_event_numbers_for_hlt2, f)

        # Calculate rates/bws within a stream
        rates_per_wg_intra_stream(
            wg_bws_for_hlt2, input_rate,
            fname_helper.tmp_rate_table_intra_stream_path(
Sebastien Ponce's avatar
Sebastien Ponce committed
                args.stream_config, args.stream), compression_factor,
            options.evt_max)