diff --git a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py index e5b51b463536250a5575a056862e191e852f45ad..c4e70e1300672424b5cd823d729abd716231062c 100644 --- a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py +++ b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py @@ -10,17 +10,15 @@ ############################################################################### import os import socket -from typing import Optional import psutil import xml.etree.ElementTree as ET from pydantic import BaseModel as _BaseModel -from pydantic import Extra import LHCbDIRAC -class BaseModel(_BaseModel, extra=Extra.forbid): +class BaseModel(_BaseModel, extra="forbid"): pass @@ -104,7 +102,7 @@ class BookkeepingJobInfo(BaseModel): root.extend( [ ET.Element("TypedParameter", Name=k, Value=str(v), Type="Info") - for k, v in self.typed_parameters.dict().items() + for k, v in self.typed_parameters.model_dump().items() if v is not None ] ) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 839831c534a56077dd61bbe94492296511cc7769..e5326059dd4dd1b7078635d2f3a44d28bb6d46f0 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -9,10 +9,18 @@ # or submit itself to any jurisdiction. # ############################################################################### from enum import Enum -from typing import Any, Literal, Annotated - -from pydantic import StringConstraints, BaseModel as _BaseModel, field_validator -from pydantic import Extra, Field, PositiveInt, validator +from typing import Any, Literal, Annotated, Union + +from pydantic import ( + ConfigDict, + StringConstraints, + BaseModel as _BaseModel, + field_validator, + model_validator, + Field, + PositiveInt, + TypeAdapter, +) from DIRAC.Core.Security.ProxyInfo import getProxyInfo from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise @@ -35,8 +43,7 @@ class ProductionStates(str, Enum): class BaseModel(_BaseModel): - class Config: - extra = Extra.forbid + model_config = ConfigDict(extra="forbid") class ProductionStep(BaseModel): @@ -141,6 +148,10 @@ class ProductionStep(BaseModel): return cleaned_data_pkgs +class InvalidStep(Exception): + pass + + class ProductionBase(BaseModel): type: str id: PositiveInt | None = None @@ -152,8 +163,29 @@ class ProductionBase(BaseModel): wg: str comment: str = "" state: ProductionStates = ProductionStates.NEW + steps: Annotated[list[ProductionStep], Field(min_length=1)] + # TODO: a model should be written for this + submission_info: Any | None = None + + @model_validator(mode="after") + def validate_steps(cls, values): # pylint: disable=no-self-argument + current_leaves = {ft.type for ft in values.steps[0].output} + for step in values.steps[1:]: + for input_filetype in step.input: + if input_filetype.type not in current_leaves: + raise InvalidStep(f"Failed to find input {input_filetype.type!r} for {step!r}") + current_leaves.remove(input_filetype.type) + + for output_filetype in step.output: + if output_filetype.type in current_leaves: + raise InvalidStep( + f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed" + ) + current_leaves.add(output_filetype.type) + return values + class SimulationProduction(ProductionBase): type: Literal["Simulation"] @@ -173,6 +205,8 @@ class SimulationProduction(ProductionBase): class DataProduction(ProductionBase): + type: Literal["Sprucing", "AnalysisProduction", "Stripping", "Reconstruction"] + class InputDataset(BaseModel): class BookkeepingQuery(BaseModel): configName: str @@ -193,7 +227,9 @@ class DataProduction(ProductionBase): input_dataset: InputDataset -def parse_obj(obj: Any) -> ProductionBase: - if obj.get("type") == "Simulation": - return SimulationProduction.parse_obj(obj) - return DataProduction.parse_obj(obj) +Production = Annotated[Union[SimulationProduction, DataProduction], Field(discriminator="type")] + + +def parse_obj(obj: Any) -> Production: + adapter = TypeAdapter(Production) + return adapter.validate_python(obj) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index ad0366d69cf64f65917668e0911bfd9f90c4aae2..d8a47a03c3443064004d0a55f0a84a3a872ddec0 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -39,6 +39,36 @@ from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import ( ) +def preprocess_spec(pr: ProductionBase): + """production-request-run-local does not support multiple input types so easily, so we need to split them""" + new_steps = [] + + for step in pr.steps: + if len(step.input) > 1: + print(f"Splitting step {step.name}") + # we need to split this step + for i, input_filetype in enumerate(step.input): + split_step = step.model_copy( + deep=True, + update={ + "name": f"{step.name}ft{i}", + "processing_pass": f"{step.processing_pass}{i}", + "input": [input_filetype], + "output": [input_filetype.model_copy(deep=True)], + }, + ) + new_steps.append(split_step) + print(split_step) + else: + new_steps.append(step) + return pr.model_copy( + deep=True, + update={ + "steps": new_steps, + }, + ) + + def parseArgs(): useCfgOverride = True inputFiles = None @@ -202,6 +232,7 @@ def main(): productionRequests = defaultdict(list) for spec in yaml.safe_load(yamlPath.read_text()): productionRequest = parse_obj(spec) + productionRequest = preprocess_spec(productionRequest) productionRequests[productionRequest.name] += [productionRequest] if name is None: diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py index ceeeed0938eec62916f568f94276a86027846721..f4ef668b86c7c96e8b275872a28d887470e3b3ee 100644 --- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py +++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py @@ -236,11 +236,11 @@ class BookkeepingReport(ModuleBase): def __generateTypedParams(self, job_info): """Set fields in job_info.typed_parameters""" exectime, cputime = getStepCPUTimes(self.step_commons) - job_info.typed_parameters.CPUTIME = cputime - job_info.typed_parameters.ExecTime = exectime + job_info.typed_parameters.CPUTIME = str(cputime) + job_info.typed_parameters.ExecTime = str(exectime) try: - job_info.typed_parameters.WNMEMORY = self.xf_o.memory + job_info.typed_parameters.WNMEMORY = str(self.xf_o.memory) except AttributeError: pass @@ -249,11 +249,11 @@ class BookkeepingReport(ModuleBase): mjfPower = gConfig.getValue("/LocalSite/CPUScalingFactor", "0") # Trick to know that the value is obtained from MJF: # from diracPower if mjfPower != diracPower: - job_info.typed_parameters.WNMJFHS06 = mjfPower - job_info.typed_parameters.NumberOfProcessors = self.numberOfProcessors - job_info.typed_parameters.Production = self.production_id + job_info.typed_parameters.WNMJFHS06 = str(mjfPower) + job_info.typed_parameters.NumberOfProcessors = str(self.numberOfProcessors) + job_info.typed_parameters.Production = str(self.production_id) job_info.typed_parameters.DiracJobId = str(self.jobID) - job_info.typed_parameters.Name = self.step_id + job_info.typed_parameters.Name = str(self.step_id) job_info.typed_parameters.JobStart = f"{self.ldatestart} {self.ltimestart}" job_info.typed_parameters.JobEnd = f"{self.ldate} {self.ltime}" job_info.typed_parameters.Location = self.siteName @@ -262,11 +262,11 @@ class BookkeepingReport(ModuleBase): job_info.typed_parameters.ProgramName = self.applicationName job_info.typed_parameters.ProgramVersion = self.applicationVersion - job_info.typed_parameters.FirstEventNumber = 1 + job_info.typed_parameters.FirstEventNumber = str(1) - job_info.typed_parameters.StatisticsRequested = self.numberOfEvents + job_info.typed_parameters.StatisticsRequested = str(self.numberOfEvents) - job_info.typed_parameters.StepID = self.BKstepID + job_info.typed_parameters.StepID = str(self.BKstepID) try: noOfEvents = self.xf_o.inputEventsTotal if self.xf_o.inputEventsTotal else self.xf_o.outputEventsTotal @@ -276,7 +276,7 @@ class BookkeepingReport(ModuleBase): if not res["OK"]: raise AttributeError("Can't get the BKK file metadata") noOfEvents = sum(fileMeta["EventStat"] for fileMeta in res["Value"]["Successful"].values()) - job_info.typed_parameters.NumberOfEvents = noOfEvents + job_info.typed_parameters.NumberOfEvents = str(noOfEvents) ################################################################################ diff --git a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py index b83d1df77566edbedc58bd729a944ad271fff419..49e2107b4b883bf03f5a037cab95d44387fcfd1a 100644 --- a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py +++ b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py @@ -822,31 +822,33 @@ class ModuleBase: """determine the input data for the step.""" if inputData == "previousStep": stepIndex = self.gaudiSteps.index(self.stepName) - previousStep = self.gaudiSteps[stepIndex - 1] - - outputList = self.workflow_commons.get("outputList", []) - if not outputList: - raise RuntimeError( - f"outputList was empty while finding input data for step {self.stepName} - " - "did the first step produce any output at all?" - ) - stepInputData = [] - for outputF in outputList: - try: - if ( - outputF["stepName"] == previousStep - and outputF["outputBKType"].lower() == self.inputDataType.lower() - ): - stepInputData.append(outputF["outputDataName"]) - except KeyError: - raise RuntimeError(f"Can't find output of step {previousStep}") + for previousStep in self.gaudiSteps[stepIndex - 1 :: -1]: + outputList = self.workflow_commons.get("outputList", []) + if not outputList: + raise RuntimeError( + f"outputList was empty while finding input data for step {self.stepName} - " + "did the first step produce any output at all?" + ) - # outputDataName is always lower case but the job output can vary - # Fix it if the file only exists in the current directory with mixed case - filenameMap = {fn.lower(): fn for fn in os.listdir(".")} - stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData] + for outputF in outputList: + try: + if ( + outputF["stepName"] == previousStep + and outputF["outputBKType"].lower() == self.inputDataType.lower() + ): + stepInputData.append(outputF["outputDataName"]) + except KeyError: + raise RuntimeError(f"Can't find output of step {previousStep}") + # outputDataName is always lower case but the job output can vary + # Fix it if the file only exists in the current directory with mixed case + filenameMap = {fn.lower(): fn for fn in os.listdir(".")} + stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData] + + # We found the output file so no need to look at earlier steps + if stepInputData: + break return stepInputData return [x.strip("LFN:") for x in inputData.split(";")]