diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py index 584e20a9ce29e85ddd2d01716b3579c90996cdf8..684e28bbcb18cc45c8d9645ca7db31ba2c1cf9fc 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ModelCompatibility.py @@ -391,3 +391,49 @@ def retValToListOfDict(retVal) -> list[dict]: """Convert a Records-style bookkeeping response to a list of dictionaries""" retVal = returnValueOrRaise(retVal) return [dict(zip(retVal["ParameterNames"], record)) for record in retVal["Records"]] + + +def runs_to_input_query(runs): + """Convert a list of runs to a query for the bookkeeping system + + The list of runs can contain any combination of: + - single run numbers as a string or integer + - run ranges as a string in the form "start:end" (inclusive) + """ + for run in runs: + if isinstance(run, int): + run = str(run) + if ":" in run: + start, end = map(int, run.split(":")) + yield from map(str, range(start, end + 1)) + else: + yield run + + +def configure_input(pr, data, *, runs=None, startRun=None, endRun=None): + """Configure the input for a ProductionRequest object.""" + scd = json.loads(data["SimCondDetail"]) + import ast + + if "[" in scd["inDataQualityFlag"]: + scd["inDataQualityFlag"] = ",".join(ast.literal_eval(scd["inDataQualityFlag"])) + + pr.startRun = startRun or 0 + pr.endRun = endRun or 0 + if runs: + pr.runsList = ",".join(runs_to_input_query(runs)) + + pr.configName = scd["configName"] + pr.configVersion = scd["configVersion"] + pr.dqFlag = scd["inDataQualityFlag"] + if scd["inSMOG2State"]: + pr.smog2State = scd["inSMOG2State"] + pr.dataTakingConditions = data["SimCondition"] + pr.processingPass = scd["inProPass"] + pr.eventType = data["EventType"] + if scd["inProductionID"] == "ALL": + pr.bkFileType = scd["inFileType"] + pr.bkQueries = ["Full"] + else: + pr.previousProdID = scd["inProductionID"] + pr.bkQueries = ["fromPreviousProd"] diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py index f4a738333999eb2544d7def300041846f5e11419..7cf10935761f6c83303459772cfc6b13ea01b5ce 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/test/Test_production_models.py @@ -15,7 +15,10 @@ from pathlib import Path from DIRAC import S_OK from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import parse_obj -from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import step_to_step_manager_dict +from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import ( + step_to_step_manager_dict, + runs_to_input_query, +) EXAMPLE_YAML_FILES = list((Path(__file__).parent / "example_yamls").glob("*.yaml")) @@ -81,3 +84,19 @@ def test_step_to_step_manager_dict(yaml_path, monkeypatch): {"FileType": "B2OC.DST", "Visible": "N"}, {"FileType": "BNOC.DST", "Visible": "N"}, ] + + +@pytest.mark.parametrize( + "runs, expected", + [ + ([], ""), + ([1], "1"), + (["34"], "34"), + (["34", "35"], "34,35"), + (["34:36"], "34,35,36"), + (["34:36", "38"], "34,35,36,38"), + (["34:36", "38:40"], "34,35,36,38,39,40"), + ], +) +def test_runs_to_input_query(runs, expected): + assert ",".join(runs_to_input_query(runs)) == expected diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index 13a373a0f2c172b591c20a2909b86336d5e2c383..c11e47389b5d52dda64002413a86d09ec6d0c718 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -13,6 +13,7 @@ from __future__ import annotations import json import os +import random import shlex import subprocess import sys @@ -20,7 +21,6 @@ import tempfile from collections import defaultdict from pathlib import Path from textwrap import dedent -from typing import Optional import yaml @@ -28,6 +28,8 @@ from DIRAC import gLogger from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals from DIRAC.Core.Base.Script import Script from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise, convertToReturnValue + + from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import ( parse_obj, ProductionBase, @@ -42,6 +44,8 @@ def parseArgs(): inputFiles = None inputFileType = None ancestorDepth = None + numTestLFNs = 1 + testRunNumbers = None @convertToReturnValue def disableCfgOverride(_): @@ -63,11 +67,23 @@ def parseArgs(): nonlocal ancestorDepth ancestorDepth = int(s) + @convertToReturnValue + def setNumTestLFNs(s: str): + nonlocal numTestLFNs + numTestLFNs = int(s) + + @convertToReturnValue + def setTestRunNumbers(s: str): + nonlocal testRunNumbers + testRunNumbers = s.split(",") + switches = [ ("", "input-files=", "Comma separated list of input files (Data only)", setInputFiles), ("", "no-cfg-override", "Internal implementation detail", disableCfgOverride), ("", "input-file-type=", "Limit the file type for generic merge steps", setOutputFileType), ("", "ancestor-depth=", "Set the ancestor depth that should be included in the pool catalog", setAncestorDepth), + ("", "num-test-lfns=", "Number of LFNs to test with", setNumTestLFNs), + ("", "test-runs=", "Comma separated list of test runs to use", setTestRunNumbers), ] Script.registerSwitches(switches) Script.registerArgument("yaml_path: Path to the YAML file containing productions to submit") @@ -82,7 +98,17 @@ def parseArgs(): gLogger.fatal("Failed to contact CS, do you have a valid proxy?") sys.exit(1) - return Path(yaml_path), name, eventType, inputFiles, useCfgOverride, inputFileType, ancestorDepth + return ( + Path(yaml_path), + name, + eventType, + inputFiles, + useCfgOverride, + inputFileType, + ancestorDepth, + numTestLFNs, + testRunNumbers, + ) def _runWithConfigOverride(argv): @@ -122,7 +148,9 @@ def _runWithConfigOverride(argv): @Script() def main(): - yamlPath, name, eventType, inputFiles, useCfgOverride, inputFileType, ancestorDepth = parseArgs() + yamlPath, name, eventType, inputFiles, useCfgOverride, inputFileType, ancestorDepth, numTestLFNs, testRunNumbers = ( + parseArgs() + ) if useCfgOverride: return _runWithConfigOverride(sys.argv + ["--no-cfg-override"]) @@ -179,6 +207,8 @@ def main(): inputFiles=inputFiles, inputFileType=inputFileType, ancestorDepth=ancestorDepth, + numTestLFNs=numTestLFNs, + testRunNumbers=testRunNumbers, ) @@ -190,8 +220,15 @@ def testProductionRequest( inputFiles: list[str] | None = None, inputFileType: str | None = None, ancestorDepth=None, + numTestLFNs: int = 1, + testRunNumbers: list[str] | None = None, ): - from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import production_to_legacy_dict + from DIRAC.DataManagementSystem.Client.DataManager import DataManager + from LHCbDIRAC.BookkeepingSystem.Client.BookkeepingClient import BookkeepingClient + from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import ( + production_to_legacy_dict, + configure_input, + ) from LHCbDIRAC.ProductionManagementSystem.Client.ProductionRequest import ProductionRequest pr = ProductionRequest() @@ -211,7 +248,37 @@ def testProductionRequest( ) elif isinstance(productionRequest, DataProduction): if not inputFiles: - raise ValueError("No input files specified") + configure_input(pr, legacy_dict, runs=testRunNumbers) + bkQueryDict = pr._getBKKQuery() + result = returnValueOrRaise(BookkeepingClient().getFilesWithMetadata(bkQueryDict)) + if result["TotalRecords"] == 0: + raise ValueError("No input files found in the bookkeeping") + + # Remove the smallest 50% of files to avoid unusually small files + sizeIndex = result["ParameterNames"].index("FileSize") + records = sorted(result["Records"], key=lambda x: x[sizeIndex]) + if len(records) // 2 >= numTestLFNs: + records = records[len(records) // 2 :] + + # Shuffle the LFNs so we pick a random one + random.shuffle(records) + + # Only run tests with files which have available replicas + filenameIndex = result["ParameterNames"].index("FileName") + inputFiles = [] + for record in records: + lfn = record[filenameIndex] + result = returnValueOrRaise(DataManager().getReplicasForJobs([lfn])) + inputFiles.extend(result["Successful"]) + if len(inputFiles) == numTestLFNs: + break + if result["Failed"]: + gLogger.warn("Skipping LFN as it has no replicas for jobs", result["Failed"]) + else: + raise ValueError(f"Only {len(inputFiles)} input files found, requested {numTestLFNs}") + + if len(inputFiles) < numTestLFNs: + raise ValueError(f"Only {len(inputFiles)} input files found, requested {numTestLFNs}") kwargs |= dict( inputDataList=inputFiles,