Skip to content
Snippets Groups Projects
run_bandwidth_test_jobs.py 9.99 KiB
Newer Older
#!/usr/bin/env python
###############################################################################
# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration           #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
""" Launch one Moore job to produce MDF output for bandwidth measurement.

    The corresponding option files are under:
    $HLT2CONFROOT/tests/options/bandwidth/

"""

from __future__ import print_function, division
import argparse
import logging
import subprocess
import os
import socket
import tempfile
import atexit
import shutil
import json
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml
from MooreTests.list_event_numbers import input_nevts
from datetime import datetime
# Default cache dir is the current working directory as this is most convenient for the machine
# that the test runs on periodically. It assumes the working directory is not cleaned up often,
# and so the files remain available for subsequent jobs.
DEFAULT_CACHE_DIRS = {'default': ['.']}
# prefer XDG_RUNTIME_DIR which should be on tmpfs
FALLBACK_CACHE_DIR = os.getenv('XDG_RUNTIME_DIR', tempfile.gettempdir())

def default_cache_dirs():
    hostname = socket.getfqdn()
    dirs = DEFAULT_CACHE_DIRS.get(hostname, DEFAULT_CACHE_DIRS['default'])
    return dirs


def is_remote(url):
    return url.startswith('mdf:root:') or url.startswith('root:')


def dump_nevts(n_evts, process):
    fname_helper = FileNameHelper(process)
    ofile = fname_helper.input_nevts_json()
    with open(ofile, 'w') as f:
        json.dump({"n_evts": f"{n_evts}"}, f)
    return 0
    n_evts = input_nevts(
        input_files=job_input, evtmax=args.evt_max, isdigi=args.digi)
    extra_options = [
        f"n_threads = {args.threads}", f"n_event_slots = {args.evtSlots}",
        f"input_raw_format = {config['input_raw_format']}",
        f"input_files = {job_input}"
    ]

    if "testfiledb_key" in config.keys():
        extra_options += [
            f"set_conds_from_testfiledb('{config['testfiledb_key']}')",
            f"input_type = '{config['input_type']}'"
        ]
        extra_options += [f"simulation = {config['simulation']}"] + [
            f"{opt} = '{config[opt]}'"
            for opt in ['input_type', 'data_type', 'conddb_tag', 'dddb_tag']
        ]
    if args.download_input_files:
        extra_options += ["event_store = 'EvtStoreSvc'", "use_iosvc = True"]

    if args.use_manifest:
        extra_options += [
            f'input_manifest_file = \'{config["input_manifest_file"]}\''
        ]

    extra_options = [f"options.{opt_str}" for opt_str in extra_options]
    extra_options.insert(0, "from Moore import options")

    cmd = ['gaudirun.py', '--option', "\n".join(extra_options)
           ] + [os.path.expandvars(x) for x in args.options]
    cmd.insert(1, '-T')

    # Log command
    logging.info("Launching bandwidth job with cmd: {}".format(" ".join(
        map(repr, cmd))))

    # run the test
    subprocess.run(cmd)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
    parser.add_argument('options', nargs='+', help='Gaudi options files.')
    parser.add_argument(
        '-c',
        '--config',
        type=str,
        required=True,
        help='Path to yaml config file defining the input.')
    parser.add_argument(
        '-t',
        '--threads',
        type=int,
        default=1,
        help='Number of threads per job (defaults to # logical cpus')
    parser.add_argument(
        '-e',
        '--evtSlots',
        type=int,
        default=None,
        help='Number of event slots per job (defaults to max(1.2 * # threads, '
        '1 + # threads)')
    parser.add_argument(
        '-n',
        default=100,
        type=lambda x: int(round(float(x))),
        help='maximum nb of events to process per job')
    parser.add_argument(
        '-a',
        '--avg_evt_size',
        type=int,
        help='average event size in input file in kB',
        required=True)
    parser.add_argument(
        '--debug', action='store_true', help='Debugging output')
        '-d',
        '--download-input-files',
        action='store_true',
        help=
        "Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)"
    )
    parser.add_argument(
        '--cache-dirs',
        default=None,
        help='Comma separated paths to directories, one per job, where the '
        'input files will be cached (default is hostname dependent or '
        '$XDG_RUNTIME_DIR).')
    parser.add_argument(
        '--digi',
        default=False,
        action='store_true',
        help='Flag to download digi files as opposed to the default mdf files')
    parser.add_argument(
        '-um',
        '--use-manifest',
        action='store_true',
        help=
        "Flag to access and include config[input_manifest_file] as an extra option in the job."
    )
    parser.add_argument(
        '--read-evt-max-from-config',
        action='store_true',
        help="Flag to replace args.evtmax with config[n_evts]")
    parser.add_argument(
        '-p',
        '--process',
        type=str,
        help='Compute for Hlt1, Hlt2 or Sprucing lines',
        choices=['hlt1', 'hlt2', 'spruce'],
        required=True)
    args = parser.parse_args()

    logging.basicConfig(
        format='%(levelname)-7s %(message)s',
        level=(logging.DEBUG if args.debug else logging.INFO))

    if args.read_evt_max_from_config:
        if args.process != "spruce":
            raise RuntimeError(
                'read_evt_max_from_config only makes sense for Sprucing jobs with config = metadata generated about Hlt2 BW job outputs.'
            )
        config = parse_yaml(args.config)
        args.evt_max = min(args.evt_max, int(config['n_evts']))

    if args.evt_max == -1 or args.evt_max > 1e5:
        raise RuntimeError(
            "The BW tests are limited to 1e5 events to keep them to a reasonable runtime. Please re-configure"
        )

    if args.evtSlots is None:
        args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads)
    if args.use_manifest and "input_manifest_file" not in config.keys():
        raise KeyError(
            f'{args.config} does not provide "input_manifest_file" but --use-manifest is in use.'
        )

    # Always use config['input_files'] for inputs if available.
    # Otherwise, use config['testfiledb_key'] for inputs.
    if "input_files" in config.keys():
        inputs_fns = config["input_files"]
    elif "testfiledb_key" in config.keys():
        from PRConfig.TestFileDB import test_file_db
        inputs_fns = test_file_db[config['testfiledb_key']].filenames
    else:
        raise KeyError(
            f'{args.config} does not provide either the "testfiledb_key" or "input_files".'
        )
    job_inputs = [
        inputs_fns
    ]  # This is a list to allow for possible NUMA extension: see discussion on !316.

    # Set up local directories where inputs are cached
    if args.download_input_files:
        if args.cache_dirs:
            args.cache_dirs = args.cache_dirs.split(',')
        else:
            args.cache_dirs = default_cache_dirs()
            if any(not os.path.isdir(d) for d in args.cache_dirs):
                fallback_dir = tempfile.mkdtemp(
                    prefix='bandwidth-', dir=FALLBACK_CACHE_DIR)
                logging.warning(
                    'not all default cache dirs {!r} exist, using {}'.format(
                        args.cache_dirs, fallback_dir))
                args.cache_dirs = [fallback_dir]
                # if we use the fallback directory, clean up after ourselves
                atexit.register(shutil.rmtree, fallback_dir)
        for i, inputs in enumerate(job_inputs):
            if all(is_remote(url) for url in inputs):
                from Moore.qmtest.context import download_mdf_inputs_locally, download_digi_inputs_locally
                # download_inputs_locally only downloads if files
                # are not already available locally on the machine
                before_copy = datetime.now()
                logging.info(
                    f'Downloading inputs for bandwidth job to {args.cache_dirs[i]}'
                )
                    f'There are {len(inputs)} input files: [{inputs[0]} ' +
                    ']' if len(inputs) < 2 else '{inputs[1]}, ... ]')
                download_inputs_locally = download_digi_inputs_locally if args.digi else download_mdf_inputs_locally
                job_inputs[i] = download_inputs_locally(
                    max_size=args.avg_evt_size * kB_to_GB * args.evt_max)
                logging.info(
                    f"Finished file downloads. This took: {datetime.now() - before_copy}"
                )
            elif any(is_remote(url) for url in inputs_fns):
                parser.error('inputs must either be all xrootd or all local')
            else:
                pass  # They're all local so don't worry about it...

            run_gaudi_job(args, config, job_inputs[i])
    else:
        run_gaudi_job(args, config, job_inputs[0])