Skip to content
Snippets Groups Projects
run_bandwidth_test_jobs.py 8.97 KiB
Newer Older
#!/usr/bin/env python
###############################################################################
# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration           #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
""" Launch one Moore job to produce MDF/DST output for bandwidth measurement.

    The corresponding option files are under:
    $HLT2CONFROOT/tests/options/bandwidth/

"""

from __future__ import print_function, division
import argparse
import logging
import subprocess
import os
import tempfile
import atexit
import shutil
import json
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml
from datetime import datetime
# prefer XDG_RUNTIME_DIR which should be on tmpfs
FALLBACK_CACHE_DIR = os.getenv('XDG_RUNTIME_DIR', tempfile.gettempdir())
# Default cache dir is the current working directory as this is most convenient for the machine
# that the test runs on periodically. It assumes the working directory is not cleaned up often,
# and so the files remain available for subsequent jobs.
DEFAULT_CACHE_DIR = '.'


def is_remote(url):
    return url.startswith('mdf:root:') or url.startswith('root:')


def dump_fnames(process, stream_config, filenames):
    ofile = fname_helper.input_info_json(stream_config)
        json.dump({"fnames": filenames}, f)
    extra_options = [
        f"n_threads = {args.threads}", f"n_event_slots = {args.evtSlots}",
        f"input_raw_format = {config['input_raw_format']}",
        f"input_files = {job_input}"
    ]

    if "testfiledb_key" in config.keys():
        extra_options += [
            f"set_conds_from_testfiledb('{config['testfiledb_key']}')",
            f"input_type = '{config['input_type']}'"
        ]
        extra_options += [f"simulation = {config['simulation']}"] + [
            f"{opt} = '{config[opt]}'"
            for opt in ['input_type', 'data_type', 'conddb_tag', 'dddb_tag']
        ]
    if args.download_input_files and args.process != "spruce":
        extra_options += ["event_store = 'EvtStoreSvc'", "use_iosvc = True"]

    if args.use_manifest:
        extra_options += [
            f'input_manifest_file = \'{config["input_manifest_file"]}\''
        ]

    extra_options = [f"options.{opt_str}" for opt_str in extra_options]
    extra_options.insert(0, "from Moore import options")

    cmd = ['gaudirun.py', '--option', "\n".join(extra_options)
           ] + [os.path.expandvars(x) for x in args.options]
    cmd.insert(1, '-T')

    # Log command
    logging.info("Launching bandwidth job with cmd: {}".format(" ".join(
        map(repr, cmd))))

    # run the test
    subprocess.run(cmd)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
    parser.add_argument('options', nargs='+', help='Gaudi options files.')
    parser.add_argument(
        '-c',
        '--config',
        type=str,
        required=True,
        help='Path to yaml config file defining the input.')
    parser.add_argument(
        '-t',
        '--threads',
        type=int,
        default=1,
        help='Number of threads per job (defaults to # logical cpus')
    parser.add_argument(
        '-e',
        '--evtSlots',
        type=int,
        default=None,
        help='Number of event slots per job (defaults to max(1.2 * # threads, '
        '1 + # threads)')
    parser.add_argument(
        '-n',
        default=100,
        type=lambda x: int(round(float(x))),
        help='maximum nb of events to process per job')
    parser.add_argument(
        '-a',
        '--avg_evt_size',
        type=int,
        help='average event size in input file in kB',
        required=True)
    parser.add_argument(
        '--debug', action='store_true', help='Debugging output')
        '-d',
        '--download-input-files',
        action='store_true',
        help=
        "Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)"
    )
        '--cache-dir',
        default=DEFAULT_CACHE_DIR,
        help='Comma separated paths to directories, one per job, where the '
        'input files will be cached (default is hostname dependent or '
        '$XDG_RUNTIME_DIR).')
    parser.add_argument(
        '--digi',
        default=False,
        action='store_true',
        help='Flag to download digi files as opposed to the default mdf files')
    parser.add_argument(
        '-um',
        '--use-manifest',
        action='store_true',
        help=
        "Flag to access and include config[input_manifest_file] as an extra option in the job."
    )
    parser.add_argument(
        '-p',
        '--process',
        type=str,
        help='Compute for Hlt1, Hlt2 or Sprucing lines',
        choices=['hlt1', 'hlt2', 'spruce'],
        required=True)
    parser.add_argument('-sc', '--stream-config', type=str, required=True)
    args = parser.parse_args()

    logging.basicConfig(
        format='%(levelname)-7s %(message)s',
        level=(logging.DEBUG if args.debug else logging.INFO))

    if args.evt_max == -1 or args.evt_max > 1e5:
        raise RuntimeError(
            "The BW tests are limited to 1e5 events to keep them to a reasonable runtime. Please re-configure"
        )

    if args.evtSlots is None:
        args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads)
    if args.use_manifest and "input_manifest_file" not in config.keys():
        raise KeyError(
            f'{args.config} does not provide "input_manifest_file" but --use-manifest is in use.'
        )

    # Always use config['input_files'] for inputs if available.
    # Otherwise, use config['testfiledb_key'] for inputs.
    if "input_files" in config.keys():
        inputs_fns = config["input_files"]
    elif "testfiledb_key" in config.keys():
        from PRConfig.TestFileDB import test_file_db
        inputs_fns = test_file_db[config['testfiledb_key']].filenames
    else:
        raise KeyError(
            f'{args.config} does not provide either the "testfiledb_key" or "input_files".'
        )

        # Set up local directories where inputs are cached
        if not os.path.isdir(args.cache_dir):
            fallback_dir = tempfile.mkdtemp(
                prefix='bandwidth-', dir=FALLBACK_CACHE_DIR)
            logging.warning(
                'Default cache dir {!r} does not exist, using {}'.format(
                    args.cache_dir, fallback_dir))
            args.cache_dir = fallback_dir
            # if we use the fallback directory, clean up after ourselves
            atexit.register(shutil.rmtree, fallback_dir)
        if all(is_remote(url) for url in job_inputs):
            from Moore.qmtest.context import download_mdf_inputs_locally, download_digi_inputs_locally
            # download_inputs_locally only downloads if files
            # are not already available locally on the machine
            before_copy = datetime.now()
            logging.info(
                f'Downloading inputs for bandwidth job to {args.cache_dir}')
            logging.info(
                f'There are {len(job_inputs)} input files: [{job_inputs[0]} ' +
                ']' if len(job_inputs) < 2 else f'{job_inputs[1]}, ... ]')
            kB_to_GB = 1e3
            download_inputs_locally = download_digi_inputs_locally if args.digi else download_mdf_inputs_locally
            job_inputs = download_inputs_locally(
                job_inputs,
                args.cache_dir,
                max_size=args.avg_evt_size * kB_to_GB * args.evt_max)
            logging.info(
                f"Finished file downloads. This took: {datetime.now() - before_copy}"
            )
        elif any(is_remote(url) for url in job_inputs):
            parser.error('inputs must either be all xrootd or all local')
        else:
            pass  # They're all local so don't worry about it...
    # Dump the input file names so we can work out the rate denominator afterwards
    # Dump local file names that have been downloaded?
    dump_fnames(args.process, args.stream_config, job_inputs)

    run_gaudi_job(args, config, job_inputs)