Skip to content
Snippets Groups Projects
Commit 49e4d655 authored by Ella Godiva Noomen's avatar Ella Godiva Noomen Committed by Sascha Stahl
Browse files

download event files locally for BW test speedup


Co-authored-by: default avatarGitlab CI <noreply@cern.ch>
parent 59367aa3
No related branches found
No related tags found
1 merge request!312download event files locally for BW test speedup
......@@ -14,6 +14,7 @@
The corresponding option files are under:
$HLT2CONFROOT/tests/options/bandwidth/
Runs exactly one job
"""
from __future__ import print_function, division
......@@ -21,28 +22,73 @@ import argparse
import logging
import subprocess
import os
import socket
import tempfile
import atexit
import shutil
# Default cache dir is the current working directory as this is most convenient for the machine
# that the test runs on periodically. It assumes the working directory is not cleaned up often,
# and so the files remain available for subsequent jobs.
DEFAULT_CACHE_DIRS = {'default': ['.']}
def run_gaudi_job(args, n_events):
# prefer XDG_RUNTIME_DIR which should be on tmpfs
FALLBACK_CACHE_DIR = os.getenv('XDG_RUNTIME_DIR', tempfile.gettempdir())
# build command line
extra_options = """
def default_cache_dirs():
hostname = socket.getfqdn()
dirs = DEFAULT_CACHE_DIRS.get(hostname, DEFAULT_CACHE_DIRS['default'])
return dirs
def is_remote(url):
return url.startswith('mdf:root:') or url.startswith('root:')
def run_gaudi_job(args, n_events, job_input):
# Build command line
# Case 1: No input is given, only test_file_db_key
if not args.input and args.test_file_db_key:
extra_options = f"""
from Moore import options
options.n_threads = {args.threads}
options.n_event_slots = {args.evtSlots}
options.evt_max = {args.events}
options.event_store = 'EvtStoreSvc'
options.set_input_and_conds_from_testfiledb('{args.test_file_db_key}')
options.use_iosvc = True
options.input_files = {job_input}
"""
# Case 2: Input and conditions are specified
# If both input and test_file_db_key are specified, key is ignored
# as checking that input and conditions match is non-trivial
elif all([args.input, args.condDB, args.DDDB, args.form, args.simulation]):
extra_options = f"""
from Moore import options
options.n_threads = {n_threads!r}
options.n_event_slots = {evt_slots!r}
options.evt_max = {n_evts!r}
options.input_type = 'MDF'
options.set_input_and_conds_from_testfiledb({TFDBkey!r})
""".format(
n_evts=n_events,
evt_slots=args.evtSlots,
n_threads=args.threads,
TFDBkey=args.test_file_db_key)
options.n_threads = {args.threads}
options.n_event_slots = {args.evtSlots}
options.evt_max = {args.events}
options.input_type = '{args.form}'
options.input_files = {args.input.split(',')}
options.use_iosvc = True
options.event_store = 'EvtStoreSvc'
options.conddb_tag = '{args.condDB}'
options.dddb_tag = '{args.DDDB}'
options.simulation = {args.simulation}
"""
else:
logging.info("Incorrect configuration of Moore")
cmd = ['gaudirun.py', '--option', extra_options
] + [os.path.expandvars(x) for x in args.options]
cmd.insert(1, '-T')
# Log command
logging.info("Launching bandwidth job with cmd: {}".format(" ".join(
map(repr, cmd))))
# run the test
subprocess.run(cmd)
......@@ -69,12 +115,37 @@ if __name__ == '__main__':
default=100,
type=lambda x: int(round(float(x))),
help='nb of events to process per job')
parser.add_argument(
'-a',
'--avg_evt_size',
type=int,
help='average event size in input file in kB',
required=True)
parser.add_argument(
'--test-file-db-key',
default='UpgradeHLT1FilteredWithGEC',
help='TestFileDB key defining input files and tags.')
parser.add_argument(
'--debug', action='store_true', help='Debugging output')
parser.add_argument(
'-f',
'--input',
help='Names of input files, multiple names possible (defaults to '
'files from TestFileDB entry if not given)')
parser.add_argument(
'-s',
'--simulation',
#type=str,
default=None)
parser.add_argument('-c', '--condDB', type=str, default=None)
parser.add_argument('-d', '--DDDB', type=str, default=None)
parser.add_argument('-r', '--form', type=str, default=None)
parser.add_argument(
'--cache-dirs',
default=None,
help='Comma separated paths to directories, one per job, where the '
'input files will be cached (default is hostname dependent or '
'$XDG_RUNTIME_DIR).')
args = parser.parse_args()
logging.basicConfig(
......@@ -87,5 +158,47 @@ if __name__ == '__main__':
if args.evtSlots is None:
args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads)
'''
Make sure input files are available locally
(5x speed-up compared to using online)
'''
if args.input:
inputs_fns = args.input
else:
from PRConfig.TestFileDB import test_file_db
inputs_fns = test_file_db[args.test_file_db_key].filenames
logging.info(inputs_fns)
# Set up local directories where inputs are cached
if args.cache_dirs:
args.cache_dirs = args.cache_dirs.split(',')
else:
args.cache_dirs = default_cache_dirs()
if any(not os.path.isdir(d) for d in args.cache_dirs):
fallback_dir = tempfile.mkdtemp(
prefix='bandwidth-', dir=FALLBACK_CACHE_DIR)
logging.warning(
'not all default cache dirs {!r} exist, using {}'.format(
args.cache_dirs, fallback_dir))
args.cache_dirs = [fallback_dir]
# if we use the fallback directory, clean up after ourselves
atexit.register(shutil.rmtree, fallback_dir)
run_gaudi_job(args, n_events)
job_inputs = [inputs_fns]
for i, inputs in enumerate(job_inputs):
logging.info('Downloading input files {}'.format(inputs))
if all(is_remote(url) for url in inputs):
from Moore.qmtest.context import download_mdf_inputs_locally
# download_mdf_inputs_locally only downloads if files
# are not already available locally on the machine
logging.info(
'Downloading inputs for bandwidth job to {}'.format(
args.cache_dirs[i]))
job_inputs[i] = download_mdf_inputs_locally(
inputs,
args.cache_dirs[i],
max_size=args.avg_evt_size * 1e3 * args.events)
logging.info(inputs)
elif any(is_remote(url) for url in inputs_fns):
parser.error('inputs must either be all xrootd or all local')
run_gaudi_job(args, n_events, job_inputs[i])
......@@ -14,7 +14,7 @@
# Run with ./Moore_hlt2_bandwidth.sh 2>&1 | tee <path-for-output.txt> to collect all output as a log file
export MOORE_THREADS=$(nproc)
export EVTS_STREAMLESS=1e4 # Used for both streamless and 16-stream (per WG) config
export EVTS_STREAMLESS=1e5 # Used for both streamless and 16-stream (per WG) config
export EVTS_FIVECONFIG=1e5
mkdir -p tmp/MDF
......@@ -23,15 +23,15 @@ mkdir -p tmp/Output/Inter
# 1. Run options files for baseline models
echo 'Running trigger to obtain MDF files for comparison'
time python -m MooreTests.run_bandwidth_test_jobs -n=$EVTS_STREAMLESS -t=$MOORE_THREADS --test-file-db-key=UpgradeHLT1FilteredWithGEC '$HLT2CONFROOT/tests/options/bandwidth/hlt2_bandwidth_streamless.py' # No streaming
time python -m MooreTests.run_bandwidth_test_jobs -n=$EVTS_FIVECONFIG -t=$MOORE_THREADS --test-file-db-key=UpgradeHLT1FilteredWithGEC '$HLT2CONFROOT/tests/options/bandwidth/hlt2_bandwidth_5streams.py' # Turbo/Turcal/Full/Monitoring
time python -m MooreTests.run_bandwidth_test_jobs -n=$EVTS_STREAMLESS -t=$MOORE_THREADS --test-file-db-key=UpgradeHLT1FilteredWithGEC '$HLT2CONFROOT/tests/options/bandwidth/hlt2_bandwidth_16streams.py' # Streaming per module
time python -m MooreTests.run_bandwidth_test_jobs -n=$EVTS_STREAMLESS -t=$MOORE_THREADS -a=100 --test-file-db-key='UpgradeHLT1FilteredWithGEC' '$HLT2CONFROOT/tests/options/bandwidth/hlt2_bandwidth_streamless.py' # No streaming
time python -m MooreTests.run_bandwidth_test_jobs -n=$EVTS_FIVECONFIG -t=$MOORE_THREADS -a=100 --test-file-db-key='UpgradeHLT1FilteredWithGEC' '$HLT2CONFROOT/tests/options/bandwidth/hlt2_bandwidth_5streams.py' # Turbo/Turcal/Full/Monitoring/IFT
time python -m MooreTests.run_bandwidth_test_jobs -n=$EVTS_STREAMLESS -t=$MOORE_THREADS -a=100 --test-file-db-key='UpgradeHLT1FilteredWithGEC' '$HLT2CONFROOT/tests/options/bandwidth/hlt2_bandwidth_16streams.py' # Streaming per module
# 2. Compute line descriptives: persist reco, extra output
echo 'Obtaining line descriptives'
time gaudirun.py $PRCONFIGROOT/python/MooreTests/line-descriptives.py
# 3. Compute similarity matrix for all lines based on streamless file
# 3. Compute similarity matrix for all lines based on streamless file
echo 'Obtaining similarity matrix and rates for all lines computed using streamless MDF file'
time python $PRCONFIGROOT/python/MooreTests/line-similarity.py -p Hlt2 -i tmp/MDF/baseline-streamless-all.mdf -t tmp/MDF/baseline-streamless-all.tck.json
......
......@@ -21,8 +21,8 @@ mkdir -p tmp/Output/Inter
# 1. Run options files for baseline models
echo 'Running trigger to obtain MDF files for comparison'
time python -m MooreTests.run_bandwidth_test_jobs -n=1e4 -t=$MOORE_THREADS --test-file-db-key=upgrade-minbias-hlt2-persistreco-output '$HLT2CONFROOT/tests/options/bandwidth/spruce_bandwidth_streamless.py' # No streaming
time python -m MooreTests.run_bandwidth_test_jobs -n=1e5 -t=$MOORE_THREADS --test-file-db-key=upgrade-minbias-hlt2-persistreco-output '$HLT2CONFROOT/tests/options/bandwidth/spruce_bandwidth_wg_streams.py' # One stream per WG
time python -m MooreTests.run_bandwidth_test_jobs -n=1e4 -t=$MOORE_THREADS -a=220 --test-file-db-key=upgrade-minbias-hlt2-persistreco-output '$HLT2CONFROOT/tests/options/bandwidth/spruce_bandwidth_streamless.py' # No streaming
time python -m MooreTests.run_bandwidth_test_jobs -n=1e5 -t=$MOORE_THREADS -a=220 --test-file-db-key=upgrade-minbias-hlt2-persistreco-output '$HLT2CONFROOT/tests/options/bandwidth/spruce_bandwidth_wg_streams.py' # One stream per WG
# 2. Compute line descriptives: persist reco, extra output
echo 'Obtaining line descriptives'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment