Skip to content
Snippets Groups Projects
run_bandwidth_test_jobs.py 10.9 KiB
Newer Older
#!/usr/bin/env python
###############################################################################
# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration           #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
""" Launch one Moore job to produce MDF/DST output for bandwidth measurement.

    The corresponding option files are under:
    $HLT2CONFROOT/tests/options/bandwidth/

"""

from __future__ import print_function, division
import argparse
import logging
import subprocess
import os
import tempfile
import atexit
import shutil
import json
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml
from datetime import datetime
# prefer XDG_RUNTIME_DIR which should be on tmpfs
FALLBACK_CACHE_DIR = os.getenv('XDG_RUNTIME_DIR', tempfile.gettempdir())
# Default cache dir is the current working directory as this is most convenient for the machine
# that the test runs on periodically. It assumes the working directory is not cleaned up often,
# and so the files remain available for subsequent jobs.
DEFAULT_CACHE_DIR = '.'


def is_remote(url):
    return url.startswith('mdf:root:') or url.startswith('root:')


def dump_fnames(process, stream_config, filenames):
    fname_helper = FileNameHelper(process, stream_config)
    ofile = fname_helper.input_info_json()
        json.dump({"fnames": filenames}, f)
    extra_options = [
        f"n_threads = {args.threads}", f"n_event_slots = {args.evtSlots}",
        f"input_raw_format = {config['input_raw_format']}",
        f"input_files = {job_input}"
    ]

    if "testfiledb_key" in config.keys():
        extra_options += [
            f"set_conds_from_testfiledb('{config['testfiledb_key']}')",
            f"input_type = '{config['input_type']}'"
        ]
        # The code below allows to overwrite the dddb_tag and conddb_tag from the
        # test file database when testfiledb_key is used. This option was added to allow
        # hlt2 lines that rely on a newer particle table version to work with older simulation
        # used in the bendwidth tests. See PRConfig!435 and Moore!3533 for more details.
        # This workaround is temporary and can be removed, once newer simulation samples,
        # based on dddb-20240427 directly, are available for these tests.
        for opt in ['conddb_tag', 'dddb_tag']:
            if opt in config.keys():
                extra_options += [f"{opt} = '{config[opt]}'"]
        extra_options += [f"simulation = {config['simulation']}"] + [
            f"{opt} = '{config[opt]}'"
            for opt in filter(lambda x: x in config.keys(), [
                'input_type', 'data_type', 'conddb_tag', 'dddb_tag',
                'conditions_version', 'geometry_version'
            ])
    if args.use_manifest:
        extra_options += [
            f'input_manifest_file = \'{config["input_manifest_file"]}\''
        ]

    extra_options = [f"options.{opt_str}" for opt_str in extra_options]
    extra_options.insert(0, "from Moore import options")

    # FIXME to be dropped when 2024-patches branch is gone
    if args.download_input_files and args.process != "spruce":
        extra_options += [
            "if 'use_iosvc' in options.__slots__: options.event_store = 'EvtStoreSvc'; options.use_iosvc = True"
        ]

    # Increase IOAlg buffer size to cope with large events - FIXME do by default when 2024-patches branch is gone
    extra_options += [
        f"if 'use_iosvc' not in options.__slots__: options.ioalg_buffer_nb_events = {os.environ['IOALG_BUFFER_EVENTS']}"
    cmd = ['gaudirun.py', '--option', "\n".join(extra_options)
           ] + [os.path.expandvars(x) for x in args.options]
    cmd.insert(1, '-T')

    # Log command
    logging.info("Launching bandwidth job with cmd: {}".format(" ".join(
        map(repr, cmd))))

    # run the test
    subprocess.run(cmd)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
    parser.add_argument('options', nargs='+', help='Gaudi options files.')
    parser.add_argument(
        '-c',
        '--config',
        type=str,
        required=True,
        help='Path to yaml config file defining the input.')
    parser.add_argument(
        '-t',
        '--threads',
        type=int,
        default=1,
        help='Number of threads per job (defaults to # logical cpus')
    parser.add_argument(
        '-e',
        '--evtSlots',
        type=int,
        default=None,
        help='Number of event slots per job (defaults to max(1.2 * # threads, '
        '1 + # threads)')
    parser.add_argument(
        '-n',
        default=100,
        type=lambda x: int(round(float(x))),
        help='maximum nb of events to process per job')
    parser.add_argument(
        '-a',
        '--avg_evt_size',
        type=int,
        help='average event size in input file in kB',
        required=True)
    parser.add_argument(
        '--debug', action='store_true', help='Debugging output')
        '-d',
        '--download-input-files',
        action='store_true',
        help=
        "Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)"
    )
        '--cache-dir',
        default=DEFAULT_CACHE_DIR,
        help='Comma separated paths to directories, one per job, where the '
        'input files will be cached (default is hostname dependent or '
        '$XDG_RUNTIME_DIR).')
    parser.add_argument(
        '--digi',
        default=False,
        action='store_true',
        help='Flag to download digi files as opposed to the default mdf files')
    parser.add_argument(
        '-um',
        '--use-manifest',
        action='store_true',
        help=
        "Flag to access and include config[input_manifest_file] as an extra option in the job."
    )
    parser.add_argument(
        '-p',
        '--process',
        type=str,
        help='Compute for Hlt1, Hlt2 or Sprucing lines',
        choices=['hlt1', 'hlt2', 'spruce'],
        required=True)
    parser.add_argument('-sc', '--stream-config', type=str, required=True)
    args = parser.parse_args()

    logging.basicConfig(
        format='%(levelname)-7s %(message)s',
        level=(logging.DEBUG if args.debug else logging.INFO))

    if args.evtSlots is None:
        args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads)
    if args.use_manifest and "input_manifest_file" not in config.keys():
        raise KeyError(
            f'{args.config} does not provide "input_manifest_file" but --use-manifest is in use.'
        )

    # Always use config['input_files'] for inputs if available.
    # use config['testfiledb_key'] if it exists
    if "testfiledb_key" in config.keys():
        from PRConfig.TestFileDB import test_file_db
        tfdb_entry = test_file_db[config['testfiledb_key']]
        qualifiers = tfdb_entry.qualifiers
        if "input_files" not in config:
            config["input_files"] = tfdb_entry.filenames
        if "input_type" not in config:
            file_format = qualifiers['Format']
            config["input_type"] = 'ROOT' if file_format != 'MDF' else 'RAW'
        if "data_type" not in config:
            config["data_type"] = qualifiers['DataType']
        if "simulation" not in config:
            config["simulation"] = qualifiers['Simulation']
        if "dddb_tag" not in config:
            config["dddb_tag"] = qualifiers['DDDB']
        if "conddb_tag" not in config:
            config["conddb_tag"] = qualifiers['CondDB']
        if "GeometryVersion" in qualifiers and "geometry_version" not in config:
            config["geometry_version"] = qualifiers["GeometryVersion"]
        if "ConditionsVersion" in qualifiers and "conditions_version" not in config:
            config["conditions_version"] = qualifiers["ConditionsVersion"]
    if "input_files" in config.keys():
        inputs_fns = config["input_files"]
    else:
        raise KeyError(
            f'{args.config} does not provide either the "testfiledb_key" or "input_files".'
        )

        # Set up local directories where inputs are cached
        if not os.path.isdir(args.cache_dir):
            fallback_dir = tempfile.mkdtemp(
                prefix='bandwidth-', dir=FALLBACK_CACHE_DIR)
            logging.warning(
                'Default cache dir {!r} does not exist, using {}'.format(
                    args.cache_dir, fallback_dir))
            args.cache_dir = fallback_dir
            # if we use the fallback directory, clean up after ourselves
            atexit.register(shutil.rmtree, fallback_dir)
        if all(is_remote(url) for url in job_inputs):
            from Moore.qmtest.context import download_mdf_inputs_locally, download_digi_inputs_locally
            # download_inputs_locally only downloads if files
            # are not already available locally on the machine
            before_copy = datetime.now()
            logging.info(
                f'Downloading inputs for bandwidth job to {args.cache_dir}')
            logging.info(
                f'There are {len(job_inputs)} input files: [{job_inputs[0]} ' +
                ']' if len(job_inputs) < 2 else f'{job_inputs[1]}, ... ]')
            kB_to_GB = 1e3
            download_inputs_locally = download_digi_inputs_locally if args.digi else download_mdf_inputs_locally
            job_inputs = download_inputs_locally(
                job_inputs,
                args.cache_dir,
                max_size=args.avg_evt_size * kB_to_GB * args.evt_max)
            logging.info(
                f"Finished file downloads. This took: {datetime.now() - before_copy}"
            )
        elif any(is_remote(url) for url in job_inputs):
            parser.error('inputs must either be all xrootd or all local')
        else:
            pass  # They're all local so don't worry about it...
    # Dump the input file names so we can work out the rate denominator afterwards
    # Dump local file names that have been downloaded?
    dump_fnames(args.process, args.stream_config, job_inputs)

    run_gaudi_job(args, config, job_inputs)