diff --git a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py index 0f44d30303f8d7e88e5e5602fad4e9f32e114451..d6a867a9ead22bca6792b7093238144729dcf9ba 100644 --- a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py +++ b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py @@ -15,16 +15,36 @@ Launch Moore job to produce output for test checking how often the max candidate """ import argparse +import atexit import json +import logging import os +import shutil import subprocess +import tempfile +from datetime import datetime -def run_gaudi_job(args, job_input): +def is_remote(url): + return url.startswith("mdf:root:") or url.startswith("root:") + + +# prefer XDG_RUNTIME_DIR which should be on tmpfs +FALLBACK_CACHE_DIR = os.getenv("XDG_RUNTIME_DIR", tempfile.gettempdir()) + +# Default cache dir is the current working directory as this is most convenient for the machine +# that the test runs on periodically. It assumes the working directory is not cleaned up often, +# and so the files remain available for subsequent jobs. +DEFAULT_CACHE_DIR = "." + + +def run_gaudi_job(args, job_inputs): # Build command line extra_options = [ + f"n_threads = {args.threads}", + f"n_event_slots = {args.evtSlots}", f"evt_max = {args.evt_max}", - f"input_files = {job_input}", + f"input_files = {job_inputs}", "input_type = 'MDF'", f"set_conds_from_testfiledb({args.test_file_db_key!r})", "msg_svc_format = '% F%90W%S %7W%R%T %0W%M'", @@ -58,13 +78,91 @@ if __name__ == "__main__": type=lambda x: int(round(float(x))), help="maximum nb of events to process per job", ) + parser.add_argument( + "-d", + "--download-input-files", + action="store_true", + help="Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)", + ) + parser.add_argument( + "--cache-dir", + default=DEFAULT_CACHE_DIR, + help="Comma separated paths to directories, one per job, where the " + "input files will be cached (default is hostname dependent or " + "$XDG_RUNTIME_DIR).", + ) + parser.add_argument( + "-t", + "--threads", + type=int, + default=1, + help="Number of threads per job (defaults to # logical cpus", + ) + parser.add_argument( + "-e", + "--evtSlots", + type=int, + default=None, + help="Number of event slots per job (defaults to max(1.2 * # threads, " + "1 + # threads)", + ) args = parser.parse_args() from PRConfig.TestFileDB import test_file_db + if args.evtSlots is None: + args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads) + inputs_fns = test_file_db[args.test_file_db_key].filenames + job_inputs = inputs_fns + + if args.download_input_files: + # Set up local directories where inputs are cached + if not os.path.isdir(args.cache_dir): + fallback_dir = tempfile.mkdtemp( + prefix="max-candidates", dir=FALLBACK_CACHE_DIR + ) + logging.warning( + "Default cache dir {!r} does not exist, using {}".format( + args.cache_dir, fallback_dir + ) + ) + args.cache_dir = fallback_dir + # if we use the fallback directory, clean up after ourselves + atexit.register(shutil.rmtree, fallback_dir) + + # Now download files + if all(is_remote(url) for url in job_inputs): + from Moore.qmtest.context import ( + download_mdf_inputs_locally, + ) + + # download_inputs_locally only downloads if files + # are not already available locally on the machine + before_copy = datetime.now() + logging.info(f"Downloading inputs for bandwidth job to {args.cache_dir}") + logging.info( + f"There are {len(job_inputs)} input files: [{job_inputs[0]} " + "]" + if len(job_inputs) < 2 + else f"{job_inputs[1]}, ... ]" + ) + avg_evt_size = 1000 + kB_to_MB = 1e3 + job_inputs = download_mdf_inputs_locally( + job_inputs, + args.cache_dir, + max_size=avg_evt_size * kB_to_MB * args.evt_max, + ) + logging.info( + f"Finished file downloads. This took: {datetime.now() - before_copy}" + ) + elif any(is_remote(url) for url in job_inputs): + parser.error("inputs must either be all xrootd or all local") + else: + pass # They're all local so don't worry about it... + settings = {"test_file_db_key": args.test_file_db_key} with open("jobSettings.json", "w") as ofile: json.dump(settings, ofile) - run_gaudi_job(args, inputs_fns) + run_gaudi_job(args, job_inputs)