Newer
Older
#!/usr/bin/env python
###############################################################################
# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
""" Launch one Moore job to produce MDF output for bandwidth measurement.
The corresponding option files are under:
$HLT2CONFROOT/tests/options/bandwidth/
Runs exactly one job
"""
from __future__ import print_function, division
import argparse
import logging
import subprocess
import os
import socket
import tempfile
import atexit
import shutil

Luke Grazette
committed
import json
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml
from MooreTests.list_event_numbers import input_nevts
from datetime import datetime
# Default cache dir is the current working directory as this is most convenient for the machine
# that the test runs on periodically. It assumes the working directory is not cleaned up often,
# and so the files remain available for subsequent jobs.
DEFAULT_CACHE_DIRS = {'default': ['.']}
# prefer XDG_RUNTIME_DIR which should be on tmpfs
FALLBACK_CACHE_DIR = os.getenv('XDG_RUNTIME_DIR', tempfile.gettempdir())
def default_cache_dirs():
hostname = socket.getfqdn()
dirs = DEFAULT_CACHE_DIRS.get(hostname, DEFAULT_CACHE_DIRS['default'])
return dirs
def is_remote(url):
return url.startswith('mdf:root:') or url.startswith('root:')

Luke Grazette
committed
def dump_nevts(n_evts, process):
fname_helper = FileNameHelper(process)
ofile = fname_helper.input_nevts_json()
with open(ofile, 'w') as f:
json.dump({"n_evts": f"{n_evts}"}, f)
return 0

Ross John Hunter
committed

Ross John Hunter
committed
def run_gaudi_job(args, config, job_input):
# Build command line

Luke Grazette
committed
n_evts = input_nevts(
input_files=job_input, evtmax=args.evt_max, isdigi=args.digi)

Luke Grazette
committed
dump_nevts(n_evts, args.process)

Ross John Hunter
committed
extra_options = [
f"n_threads = {args.threads}", f"n_event_slots = {args.evtSlots}",

Luke Grazette
committed
f"evt_max = {n_evts}",

Ross John Hunter
committed
f"input_raw_format = {config['input_raw_format']}",
f"input_files = {job_input}"
]
if "testfiledb_key" in config.keys():
extra_options += [
f"set_conds_from_testfiledb('{config['testfiledb_key']}')",
f"input_type = '{config['input_type']}'"
]

Ross John Hunter
committed
extra_options += [f"simulation = {config['simulation']}"] + [
f"{opt} = '{config[opt]}'"
for opt in ['input_type', 'data_type', 'conddb_tag', 'dddb_tag']
]

Ross John Hunter
committed
if args.download_input_files:
extra_options += ["event_store = 'EvtStoreSvc'", "use_iosvc = True"]

Luke Grazette
committed
if args.use_manifest:
extra_options += [
f'input_manifest_file = \'{config["input_manifest_file"]}\''
]

Ross John Hunter
committed
extra_options = [f"options.{opt_str}" for opt_str in extra_options]
extra_options.insert(0, "from Moore import options")
cmd = ['gaudirun.py', '--option', "\n".join(extra_options)
] + [os.path.expandvars(x) for x in args.options]
cmd.insert(1, '-T')
# Log command
logging.info("Launching bandwidth job with cmd: {}".format(" ".join(
map(repr, cmd))))
# run the test
subprocess.run(cmd)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument('options', nargs='+', help='Gaudi options files.')

Ross John Hunter
committed
parser.add_argument(
'-c',
'--config',
type=str,
required=True,
help='Path to yaml config file defining the input.')
parser.add_argument(
'-t',
'--threads',
type=int,
default=1,
help='Number of threads per job (defaults to # logical cpus')
parser.add_argument(
'-e',
'--evtSlots',
type=int,
default=None,
help='Number of event slots per job (defaults to max(1.2 * # threads, '
'1 + # threads)')
parser.add_argument(
'-n',

Luke Grazette
committed
'--evt-max',
default=100,
type=lambda x: int(round(float(x))),

Luke Grazette
committed
help='maximum nb of events to process per job')
parser.add_argument(
'-a',
'--avg_evt_size',
type=int,
help='average event size in input file in kB',
required=True)
parser.add_argument(
'--debug', action='store_true', help='Debugging output')
parser.add_argument(

Ross John Hunter
committed
'-d',
'--download-input-files',
action='store_true',
help=
"Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)"
)
parser.add_argument(
'--cache-dirs',
default=None,
help='Comma separated paths to directories, one per job, where the '
'input files will be cached (default is hostname dependent or '
'$XDG_RUNTIME_DIR).')
parser.add_argument(
'--digi',
default=False,
action='store_true',
help='Flag to download digi files as opposed to the default mdf files')

Luke Grazette
committed
parser.add_argument(
'-um',
'--use-manifest',
action='store_true',
help=
"Flag to access and include config[input_manifest_file] as an extra option in the job."
)
parser.add_argument(
'--read-evt-max-from-config',
action='store_true',
help="Flag to replace args.evtmax with config[n_evts]")
parser.add_argument(
'-p',
'--process',
type=str,
help='Compute for Hlt1, Hlt2 or Sprucing lines',
choices=['hlt1', 'hlt2', 'spruce'],
required=True)
args = parser.parse_args()
logging.basicConfig(
format='%(levelname)-7s %(message)s',
level=(logging.DEBUG if args.debug else logging.INFO))

Luke Grazette
committed
if args.read_evt_max_from_config:
if args.process != "spruce":
raise RuntimeError(
'read_evt_max_from_config only makes sense for Sprucing jobs with config = metadata generated about Hlt2 BW job outputs.'
)
config = parse_yaml(args.config)
args.evt_max = min(args.evt_max, int(config['n_evts']))

Luke Grazette
committed
if args.evt_max == -1 or args.evt_max > 1e5:
raise RuntimeError(
"The BW tests are limited to 1e5 events to keep them to a reasonable runtime. Please re-configure"
)
if args.evtSlots is None:
args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads)

Ross John Hunter
committed
config = parse_yaml(args.config)

Luke Grazette
committed
if args.use_manifest and "input_manifest_file" not in config.keys():
raise KeyError(
f'{args.config} does not provide "input_manifest_file" but --use-manifest is in use.'
)
# Always use config['input_files'] for inputs if available.
# Otherwise, use config['testfiledb_key'] for inputs.
if "input_files" in config.keys():
inputs_fns = config["input_files"]
elif "testfiledb_key" in config.keys():
from PRConfig.TestFileDB import test_file_db

Ross John Hunter
committed
inputs_fns = test_file_db[config['testfiledb_key']].filenames
else:
raise KeyError(
f'{args.config} does not provide either the "testfiledb_key" or "input_files".'
)
job_inputs = [
inputs_fns
] # This is a list to allow for possible NUMA extension: see discussion on !316.
# Set up local directories where inputs are cached
if args.download_input_files:
if args.cache_dirs:
args.cache_dirs = args.cache_dirs.split(',')
else:
args.cache_dirs = default_cache_dirs()
if any(not os.path.isdir(d) for d in args.cache_dirs):
fallback_dir = tempfile.mkdtemp(
prefix='bandwidth-', dir=FALLBACK_CACHE_DIR)
logging.warning(
'not all default cache dirs {!r} exist, using {}'.format(
args.cache_dirs, fallback_dir))
args.cache_dirs = [fallback_dir]
# if we use the fallback directory, clean up after ourselves
atexit.register(shutil.rmtree, fallback_dir)

Ross John Hunter
committed
# Now download files
for i, inputs in enumerate(job_inputs):
if all(is_remote(url) for url in inputs):
from Moore.qmtest.context import download_mdf_inputs_locally, download_digi_inputs_locally
# download_inputs_locally only downloads if files
# are not already available locally on the machine
before_copy = datetime.now()
logging.info(
f'Downloading inputs for bandwidth job to {args.cache_dirs[i]}'
)

Luke Grazette
committed
f'There are {len(inputs)} input files: [{inputs[0]} ' +
']' if len(inputs) < 2 else '{inputs[1]}, ... ]')

Ross John Hunter
committed
kB_to_GB = 1e3
download_inputs_locally = download_digi_inputs_locally if args.digi else download_mdf_inputs_locally
job_inputs[i] = download_inputs_locally(
inputs,
args.cache_dirs[i],

Luke Grazette
committed
max_size=args.avg_evt_size * kB_to_GB * args.evt_max)
logging.info(
f"Finished file downloads. This took: {datetime.now() - before_copy}"
)
elif any(is_remote(url) for url in inputs_fns):
parser.error('inputs must either be all xrootd or all local')

Ross John Hunter
committed
else:
pass # They're all local so don't worry about it...
run_gaudi_job(args, config, job_inputs[i])
else:
run_gaudi_job(args, config, job_inputs[0])