From 4689275a48d9950122ebbdf2f67d2ef51804c44c Mon Sep 17 00:00:00 2001 From: Daniel Magdalinski <daniel.magdalinski@cern.ch> Date: Thu, 20 Feb 2025 14:41:30 +0100 Subject: [PATCH 1/4] Added options to multithread and download input to job setup --- .../Hlt2Conf/tests/run_max_candidates_job.py | 96 ++++++++++++++++++- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py index 0f44d30303f..fbe2ae95a84 100644 --- a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py +++ b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py @@ -18,13 +18,30 @@ import argparse import json import os import subprocess +import logging +import tempfile +import shutil +import atexit +from datetime import datetime +def is_remote(url): + return url.startswith("mdf:root:") or url.startswith("root:") -def run_gaudi_job(args, job_input): +# prefer XDG_RUNTIME_DIR which should be on tmpfs +FALLBACK_CACHE_DIR = os.getenv("XDG_RUNTIME_DIR", tempfile.gettempdir()) + +# Default cache dir is the current working directory as this is most convenient for the machine +# that the test runs on periodically. It assumes the working directory is not cleaned up often, +# and so the files remain available for subsequent jobs. +DEFAULT_CACHE_DIR = "." + +def run_gaudi_job(args, job_inputs): # Build command line extra_options = [ + f"n_threads = {args.threads}", + f"n_event_slots = {args.evtSlots}", f"evt_max = {args.evt_max}", - f"input_files = {job_input}", + f"input_files = {job_inputs}", "input_type = 'MDF'", f"set_conds_from_testfiledb({args.test_file_db_key!r})", "msg_svc_format = '% F%90W%S %7W%R%T %0W%M'", @@ -58,13 +75,86 @@ if __name__ == "__main__": type=lambda x: int(round(float(x))), help="maximum nb of events to process per job", ) + parser.add_argument( + "-d", + "--download-input-files", + action="store_true", + help="Download files to local disk before running Moore. Achieves big speedup (5x) in Moore, but only worth it if the downloading is fast (probably only true if you're at CERN.)", + ) + parser.add_argument( + "--cache-dir", + default=DEFAULT_CACHE_DIR, + help="Comma separated paths to directories, one per job, where the " + "input files will be cached (default is hostname dependent or " + "$XDG_RUNTIME_DIR).", + ) + parser.add_argument( + "-t", + "--threads", + type=int, + default=1, + help="Number of threads per job (defaults to # logical cpus", + ) + parser.add_argument( + "-e", + "--evtSlots", + type=int, + default=None, + help="Number of event slots per job (defaults to max(1.2 * # threads, " + "1 + # threads)", + ) args = parser.parse_args() from PRConfig.TestFileDB import test_file_db + if args.evtSlots is None: + args.evtSlots = max(int(round(1.2 * args.threads)), 1 + args.threads) + inputs_fns = test_file_db[args.test_file_db_key].filenames + job_inputs = inputs_fns + + if args.download_input_files: + # Set up local directories where inputs are cached + if not os.path.isdir(args.cache_dir): + fallback_dir = tempfile.mkdtemp(prefix="max-candidates", dir=FALLBACK_CACHE_DIR) + logging.warning( + "Default cache dir {!r} does not exist, using {}".format( + args.cache_dir, fallback_dir + ) + ) + args.cache_dir = fallback_dir + # if we use the fallback directory, clean up after ourselves + atexit.register(shutil.rmtree, fallback_dir) + + # Now download files + if all(is_remote(url) for url in job_inputs): + from Moore.qmtest.context import ( + download_mdf_inputs_locally, + ) + + # download_inputs_locally only downloads if files + # are not already available locally on the machine + before_copy = datetime.now() + logging.info(f"Downloading inputs for bandwidth job to {args.cache_dir}") + logging.info( + f"There are {len(job_inputs)} input files: [{job_inputs[0]} " + "]" + if len(job_inputs) < 2 + else f"{job_inputs[1]}, ... ]" + ) + job_inputs = download_mdf_inputs_locally( + job_inputs, + args.cache_dir + ) + logging.info( + f"Finished file downloads. This took: {datetime.now() - before_copy}" + ) + elif any(is_remote(url) for url in job_inputs): + parser.error("inputs must either be all xrootd or all local") + else: + pass # They're all local so don't worry about it... + settings = {"test_file_db_key": args.test_file_db_key} with open("jobSettings.json", "w") as ofile: json.dump(settings, ofile) - run_gaudi_job(args, inputs_fns) + run_gaudi_job(args, job_inputs) -- GitLab From ae9aad15f964ff2b65b67902cfa179f6d85c7068 Mon Sep 17 00:00:00 2001 From: Gitlab CI <noreply@cern.ch> Date: Thu, 20 Feb 2025 13:50:21 +0000 Subject: [PATCH 2/4] pre-commit fixes patch generated by https://gitlab.cern.ch/lhcb/Moore/-/jobs/51384288 --- .../Hlt2Conf/tests/run_max_candidates_job.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py index fbe2ae95a84..6ae81bab363 100644 --- a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py +++ b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py @@ -15,18 +15,20 @@ Launch Moore job to produce output for test checking how often the max candidate """ import argparse +import atexit import json +import logging import os +import shutil import subprocess -import logging import tempfile -import shutil -import atexit from datetime import datetime + def is_remote(url): return url.startswith("mdf:root:") or url.startswith("root:") + # prefer XDG_RUNTIME_DIR which should be on tmpfs FALLBACK_CACHE_DIR = os.getenv("XDG_RUNTIME_DIR", tempfile.gettempdir()) @@ -35,6 +37,7 @@ FALLBACK_CACHE_DIR = os.getenv("XDG_RUNTIME_DIR", tempfile.gettempdir()) # and so the files remain available for subsequent jobs. DEFAULT_CACHE_DIR = "." + def run_gaudi_job(args, job_inputs): # Build command line extra_options = [ @@ -117,7 +120,9 @@ if __name__ == "__main__": if args.download_input_files: # Set up local directories where inputs are cached if not os.path.isdir(args.cache_dir): - fallback_dir = tempfile.mkdtemp(prefix="max-candidates", dir=FALLBACK_CACHE_DIR) + fallback_dir = tempfile.mkdtemp( + prefix="max-candidates", dir=FALLBACK_CACHE_DIR + ) logging.warning( "Default cache dir {!r} does not exist, using {}".format( args.cache_dir, fallback_dir @@ -142,10 +147,7 @@ if __name__ == "__main__": if len(job_inputs) < 2 else f"{job_inputs[1]}, ... ]" ) - job_inputs = download_mdf_inputs_locally( - job_inputs, - args.cache_dir - ) + job_inputs = download_mdf_inputs_locally(job_inputs, args.cache_dir) logging.info( f"Finished file downloads. This took: {datetime.now() - before_copy}" ) -- GitLab From aa2ea831a6152e3653c569d13183d53c5616d3ce Mon Sep 17 00:00:00 2001 From: Daniel Magdalinski <daniel.magdalinski@cern.ch> Date: Thu, 20 Feb 2025 16:41:15 +0100 Subject: [PATCH 3/4] Added maxsize of downloaded input files --- .../python/Hlt2Conf/tests/run_max_candidates_job.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py index 6ae81bab363..b08d8e23de7 100644 --- a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py +++ b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py @@ -147,7 +147,13 @@ if __name__ == "__main__": if len(job_inputs) < 2 else f"{job_inputs[1]}, ... ]" ) - job_inputs = download_mdf_inputs_locally(job_inputs, args.cache_dir) + avg_evt_size = 1000 + kB_to_MB=1e3 + job_inputs = download_mdf_inputs_locally( + job_inputs, + args.cache_dir, + max_size=avg_evt_size * kB_to_MB * args.evt_max, + ) logging.info( f"Finished file downloads. This took: {datetime.now() - before_copy}" ) -- GitLab From c6dd656958fc28cdffbee0d48027029f265caf3a Mon Sep 17 00:00:00 2001 From: Gitlab CI <noreply@cern.ch> Date: Thu, 20 Feb 2025 15:42:11 +0000 Subject: [PATCH 4/4] pre-commit fixes patch generated by https://gitlab.cern.ch/lhcb/Moore/-/jobs/51396855 --- Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py index b08d8e23de7..d6a867a9ead 100644 --- a/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py +++ b/Hlt/Hlt2Conf/python/Hlt2Conf/tests/run_max_candidates_job.py @@ -148,9 +148,9 @@ if __name__ == "__main__": else f"{job_inputs[1]}, ... ]" ) avg_evt_size = 1000 - kB_to_MB=1e3 + kB_to_MB = 1e3 job_inputs = download_mdf_inputs_locally( - job_inputs, + job_inputs, args.cache_dir, max_size=avg_evt_size * kB_to_MB * args.evt_max, ) -- GitLab