Newer
Older
#!/usr/bin/env python
###############################################################################
# (c) Copyright 2022 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
""" Launch one Moore job to produce MDF/DST output for bandwidth measurement.
The corresponding option files are under:
$HLT2CONFROOT/tests/options/bandwidth/
Runs exactly one job
"""
from __future__ import print_function, division
import argparse
import logging
import subprocess
import os
import tempfile
import atexit
import shutil

Luke Grazette
committed
import json
from PRConfig.bandwidth_helpers import FileNameHelper, parse_yaml
from datetime import datetime
# prefer XDG_RUNTIME_DIR which should be on tmpfs
FALLBACK_CACHE_DIR = os.getenv('XDG_RUNTIME_DIR', tempfile.gettempdir())
# Default cache dir is the current working directory as this is most convenient for the machine
# that the test runs on periodically. It assumes the working directory is not cleaned up often,
# and so the files remain available for subsequent jobs.
DEFAULT_CACHE_DIR = '.'
def is_remote(url):
return url.startswith('mdf:root:') or url.startswith('root:')
def dump_fnames(process, stream_config, filenames):

Ross John Hunter
committed
fname_helper = FileNameHelper(process, stream_config)
ofile = fname_helper.input_info_json()

Luke Grazette
committed
with open(ofile, 'w') as f:
json.dump({"fnames": filenames}, f)

Ross John Hunter
committed

Ross John Hunter
committed
def run_gaudi_job(args, config, job_input):
# Build command line

Ross John Hunter
committed
extra_options = [
f"n_threads = {args.threads}", f"n_event_slots = {args.evtSlots}",
f"evt_max = {args.evt_max}",

Ross John Hunter
committed
f"input_raw_format = {config['input_raw_format']}",
f"input_files = {job_input}"
]
if "testfiledb_key" in config.keys():
extra_options += [
f"set_conds_from_testfiledb('{config['testfiledb_key']}')",
f"input_type = '{config['input_type']}'"
]
# The code below allows to overwrite the dddb_tag and conddb_tag from the
# test file database when testfiledb_key is used. This option was added to allow
# hlt2 lines that rely on a newer particle table version to work with older simulation
# used in the bendwidth tests. See PRConfig!435 and Moore!3533 for more details.
# This workaround is temporary and can be removed, once newer simulation samples,
# based on dddb-20240427 directly, are available for these tests.
for opt in ['conddb_tag', 'dddb_tag']:
if opt in config.keys():
extra_options += [f"{opt} = '{config[opt]}'"]

Ross John Hunter
committed
extra_options += [f"simulation = {config['simulation']}"] + [
f"{opt} = '{config[opt]}'"

Ross John Hunter
committed
for opt in filter(lambda x: x in config.keys(), [
'input_type', 'data_type', 'conddb_tag', 'dddb_tag',
'conditions_version', 'geometry_version'
])

Ross John Hunter
committed
]

Luke Grazette
committed
if args.use_manifest:
extra_options += [
f'input_manifest_file = \'{config["input_manifest_file"]}\''
]

Ross John Hunter
committed
extra_options = [f"options.{opt_str}" for opt_str in extra_options]
extra_options.insert(0, "from Moore import options")

Sebastien Ponce
committed
# FIXME to be dropped when 2024-patches branch is gone
if args.download_input_files and args.process != "spruce":
extra_options += [
"if 'use_iosvc' in options.__slots__: options.event_store = 'EvtStoreSvc'; options.use_iosvc = True"
]
# Increase IOAlg buffer size to cope with large events - FIXME do by default when 2024-patches branch is gone
extra_options += [

Luke Grazette
committed
f"if 'use_iosvc' not in options.__slots__: options.ioalg_buffer_nb_events = {os.environ['IOALG_BUFFER_EVENTS']}"

Sebastien Ponce
committed
]

Ross John Hunter
committed
cmd = ['gaudirun.py', '--option', "\n".join(extra_options)
] + [os.path.expandvars(x) for x in args.options]
cmd.insert(1, '-T')
# Log command
logging.info("Launching bandwidth job with cmd: {}".format(" ".join(
map(repr, cmd))))
# run the test
subprocess.run(cmd)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument('options', nargs='+', help='Gaudi options files.')

Ross John Hunter
committed
parser.add_argument(
'-c',
'--config',
type=str,
required=True,
help='Path to yaml config file defining the input.')
parser.add_argument(
'-t',
'--threads',
type=int,
default=1,
help='Number of threads per job (defaults to # logical cpus')
parser.add_argument(
'-e',
'--evtSlots',
type=int,
default=None,
help='Number of event slots per job (defaults to max(1.2 * # threads, '
'1 + # threads)')
parser.add_argument(
'-n',

Luke Grazette
committed
'--evt-max',
default=100,
type=lambda x: int(round(float(x))),

Luke Grazette
committed
help='maximum nb of events to process per job')
parser.add_argument(
'-a',
'--avg_evt_size',
type=int,
help='average event size in input file in kB',
required=True)
parser.add_argument(
'--debug', action='store_true', help='Debugging output')
parser.add_argument(

Ross John Hunter
committed
'-d',
'--download-input-files',
action='store_true',
help=
"Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)"
)
parser.add_argument(
'--cache-dir',
default=DEFAULT_CACHE_DIR,
help='Comma separated paths to directories, one per job, where the '
'input files will be cached (default is hostname dependent or '
'$XDG_RUNTIME_DIR).')
parser.add_argument(
'--digi',
default=False,
action='store_true',
help='Flag to download digi files as opposed to the default mdf files')

Luke Grazette
committed
parser.add_argument(
'-um',
'--use-manifest',
action='store_true',
help=
"Flag to access and include config[input_manifest_file] as an extra option in the job."
)
parser.add_argument(
'-p',
'--process',
type=str,
help='Compute for Hlt1, Hlt2 or Sprucing lines',
choices=['hlt1', 'hlt2', 'spruce'],
required=True)
parser.add_argument('-sc', '--stream-config', type=str, required=True)
args = parser.parse_args()
logging.basicConfig(
format='%(levelname)-7s %(message)s',
level=(logging.DEBUG if args.debug else logging.INFO))
if args.evtSlots is None:
args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads)

Ross John Hunter
committed
config = parse_yaml(args.config)

Luke Grazette
committed
if args.use_manifest and "input_manifest_file" not in config.keys():
raise KeyError(
f'{args.config} does not provide "input_manifest_file" but --use-manifest is in use.'
)
# Always use config['input_files'] for inputs if available.
# use config['testfiledb_key'] if it exists
if "testfiledb_key" in config.keys():
from PRConfig.TestFileDB import test_file_db
tfdb_entry = test_file_db[config['testfiledb_key']]
qualifiers = tfdb_entry.qualifiers
if "input_files" not in config:
config["input_files"] = tfdb_entry.filenames
if "input_type" not in config:
file_format = qualifiers['Format']
config["input_type"] = 'ROOT' if file_format != 'MDF' else 'RAW'
if "data_type" not in config:
config["data_type"] = qualifiers['DataType']
if "simulation" not in config:
config["simulation"] = qualifiers['Simulation']
if "dddb_tag" not in config:
config["dddb_tag"] = qualifiers['DDDB']
if "conddb_tag" not in config:
config["conddb_tag"] = qualifiers['CondDB']
if "GeometryVersion" in qualifiers and "geometry_version" not in config:
config["geometry_version"] = qualifiers["GeometryVersion"]
if "ConditionsVersion" in qualifiers and "conditions_version" not in config:
config["conditions_version"] = qualifiers["ConditionsVersion"]

Luke Grazette
committed
if "input_files" in config.keys():
inputs_fns = config["input_files"]

Ross John Hunter
committed
else:
raise KeyError(
f'{args.config} does not provide either the "testfiledb_key" or "input_files".'
)
job_inputs = inputs_fns

Ross John Hunter
committed
if args.download_input_files:
# Set up local directories where inputs are cached
if not os.path.isdir(args.cache_dir):
fallback_dir = tempfile.mkdtemp(
prefix='bandwidth-', dir=FALLBACK_CACHE_DIR)
logging.warning(
'Default cache dir {!r} does not exist, using {}'.format(
args.cache_dir, fallback_dir))
args.cache_dir = fallback_dir
# if we use the fallback directory, clean up after ourselves
atexit.register(shutil.rmtree, fallback_dir)

Ross John Hunter
committed
# Now download files
if all(is_remote(url) for url in job_inputs):
from Moore.qmtest.context import download_mdf_inputs_locally, download_digi_inputs_locally
# download_inputs_locally only downloads if files
# are not already available locally on the machine
before_copy = datetime.now()
logging.info(
f'Downloading inputs for bandwidth job to {args.cache_dir}')
logging.info(
f'There are {len(job_inputs)} input files: [{job_inputs[0]} ' +
']' if len(job_inputs) < 2 else f'{job_inputs[1]}, ... ]')
kB_to_GB = 1e3
download_inputs_locally = download_digi_inputs_locally if args.digi else download_mdf_inputs_locally
job_inputs = download_inputs_locally(
job_inputs,
args.cache_dir,
max_size=args.avg_evt_size * kB_to_GB * args.evt_max)
logging.info(
f"Finished file downloads. This took: {datetime.now() - before_copy}"
)
elif any(is_remote(url) for url in job_inputs):
parser.error('inputs must either be all xrootd or all local')
else:
pass # They're all local so don't worry about it...

Ross John Hunter
committed
# Dump the input file names so we can work out the rate denominator afterwards
# Dump local file names that have been downloaded?
dump_fnames(args.process, args.stream_config, job_inputs)
run_gaudi_job(args, config, job_inputs)