From 9703b4ec797497244e90dafa0100b9ea65741a36 Mon Sep 17 00:00:00 2001 From: LHCbDIRAC Bot <dirac.bot@cern.ch> Date: Mon, 4 Nov 2024 06:53:04 +0100 Subject: [PATCH] Merge branch 'roneil-run-local-multi-filetype' into 'master' [master] add a step splitter by input filetype and pydantic modernisations / squashing of warnings See merge request lhcb-dirac/LHCbDIRAC!1669 (cherry picked from commit b3b41d927e86d7430851b0df9dc4b55caf9bfc01) bd69f095 add a step splitter by input filetype. b5b038a6 add submission_info to ProductionBase 62c73d45 set also output filetype when splitting merge step. fa23fa94 working-ish splitting steps 527f8eeb ModuleBase refactoring to support splitting, merging workflows a7bd4865 Added a model validator for ProductionStep 6b27bb86 use values.steps cbeee057 use discriminated unions 3085a479 try simplified algo 9f877eee use a TypeAdapter fb4d1f5b Fixed annoying Serialization UserWarnings cc3feddb Set DataProduction discriminator field properly...again?! Co-authored-by: Chris Burr <christopher.burr@cern.ch> --- .../Core/Utilities/BookkeepingJobInfo.py | 6 +- .../Utilities/Models.py | 56 +++++++++++++++---- .../dirac_production_request_run_local.py | 31 ++++++++++ .../Workflow/Modules/BookkeepingReport.py | 22 ++++---- src/LHCbDIRAC/Workflow/Modules/ModuleBase.py | 46 +++++++-------- 5 files changed, 114 insertions(+), 47 deletions(-) diff --git a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py index b8879005e7..abdea41219 100644 --- a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py +++ b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py @@ -10,17 +10,15 @@ ############################################################################### import os import socket -from typing import Optional import psutil import xml.etree.ElementTree as ET from pydantic import BaseModel as _BaseModel -from pydantic import Extra import LHCbDIRAC -class BaseModel(_BaseModel, extra=Extra.forbid): +class BaseModel(_BaseModel, extra="forbid"): pass @@ -104,7 +102,7 @@ class BookkeepingJobInfo(BaseModel): root.extend( [ ET.Element("TypedParameter", Name=k, Value=str(v), Type="Info") - for k, v in self.typed_parameters.dict().items() + for k, v in self.typed_parameters.model_dump().items() if v is not None ] ) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 839831c534..e5326059dd 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -9,10 +9,18 @@ # or submit itself to any jurisdiction. # ############################################################################### from enum import Enum -from typing import Any, Literal, Annotated - -from pydantic import StringConstraints, BaseModel as _BaseModel, field_validator -from pydantic import Extra, Field, PositiveInt, validator +from typing import Any, Literal, Annotated, Union + +from pydantic import ( + ConfigDict, + StringConstraints, + BaseModel as _BaseModel, + field_validator, + model_validator, + Field, + PositiveInt, + TypeAdapter, +) from DIRAC.Core.Security.ProxyInfo import getProxyInfo from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise @@ -35,8 +43,7 @@ class ProductionStates(str, Enum): class BaseModel(_BaseModel): - class Config: - extra = Extra.forbid + model_config = ConfigDict(extra="forbid") class ProductionStep(BaseModel): @@ -141,6 +148,10 @@ class ProductionStep(BaseModel): return cleaned_data_pkgs +class InvalidStep(Exception): + pass + + class ProductionBase(BaseModel): type: str id: PositiveInt | None = None @@ -152,8 +163,29 @@ class ProductionBase(BaseModel): wg: str comment: str = "" state: ProductionStates = ProductionStates.NEW + steps: Annotated[list[ProductionStep], Field(min_length=1)] + # TODO: a model should be written for this + submission_info: Any | None = None + + @model_validator(mode="after") + def validate_steps(cls, values): # pylint: disable=no-self-argument + current_leaves = {ft.type for ft in values.steps[0].output} + for step in values.steps[1:]: + for input_filetype in step.input: + if input_filetype.type not in current_leaves: + raise InvalidStep(f"Failed to find input {input_filetype.type!r} for {step!r}") + current_leaves.remove(input_filetype.type) + + for output_filetype in step.output: + if output_filetype.type in current_leaves: + raise InvalidStep( + f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed" + ) + current_leaves.add(output_filetype.type) + return values + class SimulationProduction(ProductionBase): type: Literal["Simulation"] @@ -173,6 +205,8 @@ class SimulationProduction(ProductionBase): class DataProduction(ProductionBase): + type: Literal["Sprucing", "AnalysisProduction", "Stripping", "Reconstruction"] + class InputDataset(BaseModel): class BookkeepingQuery(BaseModel): configName: str @@ -193,7 +227,9 @@ class DataProduction(ProductionBase): input_dataset: InputDataset -def parse_obj(obj: Any) -> ProductionBase: - if obj.get("type") == "Simulation": - return SimulationProduction.parse_obj(obj) - return DataProduction.parse_obj(obj) +Production = Annotated[Union[SimulationProduction, DataProduction], Field(discriminator="type")] + + +def parse_obj(obj: Any) -> Production: + adapter = TypeAdapter(Production) + return adapter.validate_python(obj) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index db1b4638df..a183304423 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -39,6 +39,36 @@ from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import ( ) +def preprocess_spec(pr: ProductionBase): + """production-request-run-local does not support multiple input types so easily, so we need to split them""" + new_steps = [] + + for step in pr.steps: + if len(step.input) > 1: + print(f"Splitting step {step.name}") + # we need to split this step + for i, input_filetype in enumerate(step.input): + split_step = step.model_copy( + deep=True, + update={ + "name": f"{step.name}ft{i}", + "processing_pass": f"{step.processing_pass}{i}", + "input": [input_filetype], + "output": [input_filetype.model_copy(deep=True)], + }, + ) + new_steps.append(split_step) + print(split_step) + else: + new_steps.append(step) + return pr.model_copy( + deep=True, + update={ + "steps": new_steps, + }, + ) + + def parseArgs(): useCfgOverride = True inputFiles = None @@ -204,6 +234,7 @@ def main(): if spec.get("author") is None: # can occur when testing files produce by LbMCSubmit spec["author"] = "local_user" productionRequest = parse_obj(spec) + productionRequest = preprocess_spec(productionRequest) productionRequests[productionRequest.name] += [productionRequest] if name is None: diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py index bc799080c4..075fc26478 100644 --- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py +++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py @@ -235,11 +235,11 @@ class BookkeepingReport(ModuleBase): def __generateTypedParams(self, job_info): """Set fields in job_info.typed_parameters""" exectime, cputime = getStepCPUTimes(self.step_commons) - job_info.typed_parameters.CPUTIME = cputime - job_info.typed_parameters.ExecTime = exectime + job_info.typed_parameters.CPUTIME = str(cputime) + job_info.typed_parameters.ExecTime = str(exectime) try: - job_info.typed_parameters.WNMEMORY = self.xf_o.memory + job_info.typed_parameters.WNMEMORY = str(self.xf_o.memory) except AttributeError: pass @@ -248,11 +248,11 @@ class BookkeepingReport(ModuleBase): mjfPower = gConfig.getValue("/LocalSite/CPUScalingFactor", "0") # Trick to know that the value is obtained from MJF: # from diracPower if mjfPower != diracPower: - job_info.typed_parameters.WNMJFHS06 = mjfPower - job_info.typed_parameters.NumberOfProcessors = self.numberOfProcessors - job_info.typed_parameters.Production = self.production_id + job_info.typed_parameters.WNMJFHS06 = str(mjfPower) + job_info.typed_parameters.NumberOfProcessors = str(self.numberOfProcessors) + job_info.typed_parameters.Production = str(self.production_id) job_info.typed_parameters.DiracJobId = str(self.jobID) - job_info.typed_parameters.Name = self.step_id + job_info.typed_parameters.Name = str(self.step_id) job_info.typed_parameters.JobStart = f"{self.ldatestart} {self.ltimestart}" job_info.typed_parameters.JobEnd = f"{self.ldate} {self.ltime}" job_info.typed_parameters.Location = self.siteName @@ -261,11 +261,11 @@ class BookkeepingReport(ModuleBase): job_info.typed_parameters.ProgramName = self.applicationName job_info.typed_parameters.ProgramVersion = self.applicationVersion - job_info.typed_parameters.FirstEventNumber = 1 + job_info.typed_parameters.FirstEventNumber = str(1) - job_info.typed_parameters.StatisticsRequested = self.numberOfEvents + job_info.typed_parameters.StatisticsRequested = str(self.numberOfEvents) - job_info.typed_parameters.StepID = self.BKstepID + job_info.typed_parameters.StepID = str(self.BKstepID) try: noOfEvents = self.xf_o.inputEventsTotal if self.xf_o.inputEventsTotal else self.xf_o.outputEventsTotal @@ -275,7 +275,7 @@ class BookkeepingReport(ModuleBase): if not res["OK"]: raise AttributeError("Can't get the BKK file metadata") noOfEvents = sum(fileMeta["EventStat"] for fileMeta in res["Value"]["Successful"].values()) - job_info.typed_parameters.NumberOfEvents = noOfEvents + job_info.typed_parameters.NumberOfEvents = str(noOfEvents) ################################################################################ diff --git a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py index b83d1df775..49e2107b4b 100644 --- a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py +++ b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py @@ -822,31 +822,33 @@ class ModuleBase: """determine the input data for the step.""" if inputData == "previousStep": stepIndex = self.gaudiSteps.index(self.stepName) - previousStep = self.gaudiSteps[stepIndex - 1] - - outputList = self.workflow_commons.get("outputList", []) - if not outputList: - raise RuntimeError( - f"outputList was empty while finding input data for step {self.stepName} - " - "did the first step produce any output at all?" - ) - stepInputData = [] - for outputF in outputList: - try: - if ( - outputF["stepName"] == previousStep - and outputF["outputBKType"].lower() == self.inputDataType.lower() - ): - stepInputData.append(outputF["outputDataName"]) - except KeyError: - raise RuntimeError(f"Can't find output of step {previousStep}") + for previousStep in self.gaudiSteps[stepIndex - 1 :: -1]: + outputList = self.workflow_commons.get("outputList", []) + if not outputList: + raise RuntimeError( + f"outputList was empty while finding input data for step {self.stepName} - " + "did the first step produce any output at all?" + ) - # outputDataName is always lower case but the job output can vary - # Fix it if the file only exists in the current directory with mixed case - filenameMap = {fn.lower(): fn for fn in os.listdir(".")} - stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData] + for outputF in outputList: + try: + if ( + outputF["stepName"] == previousStep + and outputF["outputBKType"].lower() == self.inputDataType.lower() + ): + stepInputData.append(outputF["outputDataName"]) + except KeyError: + raise RuntimeError(f"Can't find output of step {previousStep}") + # outputDataName is always lower case but the job output can vary + # Fix it if the file only exists in the current directory with mixed case + filenameMap = {fn.lower(): fn for fn in os.listdir(".")} + stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData] + + # We found the output file so no need to look at earlier steps + if stepInputData: + break return stepInputData return [x.strip("LFN:") for x in inputData.split(";")] -- GitLab