From bd69f095ee0eb41edf07ebca9606c628c922b0b0 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Mon, 28 Oct 2024 13:08:39 +0100 Subject: [PATCH 01/12] add a step splitter by input filetype. format --- .../Utilities/Models.py | 7 +++++-- .../dirac_production_request_run_local.py | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 839831c534..9129ce3710 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -195,5 +195,8 @@ class DataProduction(ProductionBase): def parse_obj(obj: Any) -> ProductionBase: if obj.get("type") == "Simulation": - return SimulationProduction.parse_obj(obj) - return DataProduction.parse_obj(obj) + return SimulationProduction.model_validate(obj) + + # TODO: should we really be validating the types first? + # if obj.get("type") in ["AnalysisProduction", "Sprucing"]: + return DataProduction.model_validate(obj) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index ad0366d69c..de0ced7960 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -39,6 +39,24 @@ from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import ( ) +def preprocess_spec(pr: ProductionBase): + """production-request-run-local does not support multiple input types so easily, so we need to split them""" + new_steps = [] + + for step in pr.steps: + if len(step.input) > 1: + # we need to split this step + for i, input_filetype in enumerate(step.input): + split_step = step.model_copy(deep=True) + split_step.name = f"{step.name}ft{i}" + split_step.input = [input_filetype] + new_steps.append(split_step) + else: + new_steps.append(step) + pr.steps = new_steps + return pr.model_validate() + + def parseArgs(): useCfgOverride = True inputFiles = None @@ -202,6 +220,7 @@ def main(): productionRequests = defaultdict(list) for spec in yaml.safe_load(yamlPath.read_text()): productionRequest = parse_obj(spec) + productionRequest = preprocess_spec(productionRequest) productionRequests[productionRequest.name] += [productionRequest] if name is None: -- GitLab From b5b038a6009456beb3864efec982153c87461251 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Mon, 28 Oct 2024 13:38:15 +0100 Subject: [PATCH 02/12] add submission_info to ProductionBase pre-commit :cry: --- .../ProductionManagementSystem/Utilities/Models.py | 12 +++++++----- .../scripts/dirac_production_request_run_local.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 9129ce3710..d4ffe2aae1 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -11,12 +11,12 @@ from enum import Enum from typing import Any, Literal, Annotated -from pydantic import StringConstraints, BaseModel as _BaseModel, field_validator -from pydantic import Extra, Field, PositiveInt, validator +from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator +from pydantic import Field, PositiveInt from DIRAC.Core.Security.ProxyInfo import getProxyInfo from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise -from typing import Annotated +from typing import Annotated, Optional FILETYPE_PATTERN = r"[A-Z0-9\.]+" @@ -35,8 +35,7 @@ class ProductionStates(str, Enum): class BaseModel(_BaseModel): - class Config: - extra = Extra.forbid + model_config = ConfigDict(extra="forbid") class ProductionStep(BaseModel): @@ -154,6 +153,9 @@ class ProductionBase(BaseModel): state: ProductionStates = ProductionStates.NEW steps: Annotated[list[ProductionStep], Field(min_length=1)] + # TODO: a model should be written for this + submission_info: Any | None = None + class SimulationProduction(ProductionBase): type: Literal["Simulation"] diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index de0ced7960..944d0c44e8 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -54,7 +54,7 @@ def preprocess_spec(pr: ProductionBase): else: new_steps.append(step) pr.steps = new_steps - return pr.model_validate() + return pr def parseArgs(): -- GitLab From 62c73d4503641b9c513a51dcb273baa02f18d823 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Mon, 28 Oct 2024 21:46:25 +0100 Subject: [PATCH 03/12] set also output filetype when splitting merge step. --- .../scripts/dirac_production_request_run_local.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index 944d0c44e8..d40a4c5634 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -50,6 +50,7 @@ def preprocess_spec(pr: ProductionBase): split_step = step.model_copy(deep=True) split_step.name = f"{step.name}ft{i}" split_step.input = [input_filetype] + split_step.output = [input_filetype.model_copy(deep=True)] new_steps.append(split_step) else: new_steps.append(step) -- GitLab From fa23fa94a6ccdf41f9eaa6d1068f42bda69d6905 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Wed, 30 Oct 2024 11:55:49 +0100 Subject: [PATCH 04/12] working-ish splitting steps --- .../dirac_production_request_run_local.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index d40a4c5634..c7deeb153b 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -45,17 +45,25 @@ def preprocess_spec(pr: ProductionBase): for step in pr.steps: if len(step.input) > 1: + print(f"Splitting step {step.name}") # we need to split this step for i, input_filetype in enumerate(step.input): - split_step = step.model_copy(deep=True) - split_step.name = f"{step.name}ft{i}" - split_step.input = [input_filetype] - split_step.output = [input_filetype.model_copy(deep=True)] + split_step = step.model_copy(deep=True, update={ + "name": f"{step.name}ft{i}", + "processing_pass": f"{step.processing_pass}{i}", + "input": [input_filetype], + "output": [input_filetype.model_copy(deep=True)], + }) new_steps.append(split_step) + print(split_step) else: new_steps.append(step) - pr.steps = new_steps - return pr + return pr.model_copy( + deep=True, + update={ + "steps": new_steps, + } + ) def parseArgs(): -- GitLab From 527f8eebacf82ae2a689d982bc10d1e26dbd8b3b Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Wed, 30 Oct 2024 16:02:55 +0100 Subject: [PATCH 05/12] ModuleBase refactoring to support splitting, merging workflows Ran pre-commit --- .../Utilities/Models.py | 2 + .../dirac_production_request_run_local.py | 17 ++++--- src/LHCbDIRAC/Workflow/Modules/ModuleBase.py | 46 ++++++++++--------- 3 files changed, 36 insertions(+), 29 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index d4ffe2aae1..762dd88c1e 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -151,6 +151,8 @@ class ProductionBase(BaseModel): wg: str comment: str = "" state: ProductionStates = ProductionStates.NEW + + # TODO: validate that steps only process input filetypes out of a given node only once steps: Annotated[list[ProductionStep], Field(min_length=1)] # TODO: a model should be written for this diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py index c7deeb153b..d8a47a03c3 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py @@ -48,12 +48,15 @@ def preprocess_spec(pr: ProductionBase): print(f"Splitting step {step.name}") # we need to split this step for i, input_filetype in enumerate(step.input): - split_step = step.model_copy(deep=True, update={ - "name": f"{step.name}ft{i}", - "processing_pass": f"{step.processing_pass}{i}", - "input": [input_filetype], - "output": [input_filetype.model_copy(deep=True)], - }) + split_step = step.model_copy( + deep=True, + update={ + "name": f"{step.name}ft{i}", + "processing_pass": f"{step.processing_pass}{i}", + "input": [input_filetype], + "output": [input_filetype.model_copy(deep=True)], + }, + ) new_steps.append(split_step) print(split_step) else: @@ -62,7 +65,7 @@ def preprocess_spec(pr: ProductionBase): deep=True, update={ "steps": new_steps, - } + }, ) diff --git a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py index b83d1df775..49e2107b4b 100644 --- a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py +++ b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py @@ -822,31 +822,33 @@ class ModuleBase: """determine the input data for the step.""" if inputData == "previousStep": stepIndex = self.gaudiSteps.index(self.stepName) - previousStep = self.gaudiSteps[stepIndex - 1] - - outputList = self.workflow_commons.get("outputList", []) - if not outputList: - raise RuntimeError( - f"outputList was empty while finding input data for step {self.stepName} - " - "did the first step produce any output at all?" - ) - stepInputData = [] - for outputF in outputList: - try: - if ( - outputF["stepName"] == previousStep - and outputF["outputBKType"].lower() == self.inputDataType.lower() - ): - stepInputData.append(outputF["outputDataName"]) - except KeyError: - raise RuntimeError(f"Can't find output of step {previousStep}") + for previousStep in self.gaudiSteps[stepIndex - 1 :: -1]: + outputList = self.workflow_commons.get("outputList", []) + if not outputList: + raise RuntimeError( + f"outputList was empty while finding input data for step {self.stepName} - " + "did the first step produce any output at all?" + ) - # outputDataName is always lower case but the job output can vary - # Fix it if the file only exists in the current directory with mixed case - filenameMap = {fn.lower(): fn for fn in os.listdir(".")} - stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData] + for outputF in outputList: + try: + if ( + outputF["stepName"] == previousStep + and outputF["outputBKType"].lower() == self.inputDataType.lower() + ): + stepInputData.append(outputF["outputDataName"]) + except KeyError: + raise RuntimeError(f"Can't find output of step {previousStep}") + # outputDataName is always lower case but the job output can vary + # Fix it if the file only exists in the current directory with mixed case + filenameMap = {fn.lower(): fn for fn in os.listdir(".")} + stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData] + + # We found the output file so no need to look at earlier steps + if stepInputData: + break return stepInputData return [x.strip("LFN:") for x in inputData.split(";")] -- GitLab From a7bd4865e875282cc4a9a6813d92582915fd4fde Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Wed, 30 Oct 2024 17:00:49 +0100 Subject: [PATCH 06/12] Added a model validator for ProductionStep --- .../Utilities/Models.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 762dd88c1e..cb2e1b60b5 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -11,7 +11,7 @@ from enum import Enum from typing import Any, Literal, Annotated -from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator +from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator, model_validator from pydantic import Field, PositiveInt from DIRAC.Core.Security.ProxyInfo import getProxyInfo @@ -152,12 +152,30 @@ class ProductionBase(BaseModel): comment: str = "" state: ProductionStates = ProductionStates.NEW - # TODO: validate that steps only process input filetypes out of a given node only once steps: Annotated[list[ProductionStep], Field(min_length=1)] # TODO: a model should be written for this submission_info: Any | None = None + @model_validator(mode="after") + def validate_steps(cls, values): + steps = values.get("steps", []) + + for current_idx, current_step in enumerate(steps): + for input_filetype in current_step.input: + # Check all previous steps for input type conflicts + for prev_idx in range(current_idx - 1, -1, -1): + prev_step = steps[prev_idx] + + # Check if previous step also processes the same input filetype + if any(ft.type == input_filetype.type for ft in prev_step.input): + # Ensure it produces output of the same type + if not any(ft.type == input_filetype.type for ft in prev_step.output): + raise ValueError( + f"Step {current_idx} wants to process {input_filetype!r}, " + f"but step {prev_idx} already processes it without producing output of the same type." + ) + return values class SimulationProduction(ProductionBase): type: Literal["Simulation"] -- GitLab From 6b27bb86dfce1c9179205de97c6bb52e7da1ec61 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Wed, 30 Oct 2024 17:07:44 +0100 Subject: [PATCH 07/12] use values.steps pre-commit! --- .../ProductionManagementSystem/Utilities/Models.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index cb2e1b60b5..b2294b7e76 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -159,13 +159,11 @@ class ProductionBase(BaseModel): @model_validator(mode="after") def validate_steps(cls, values): - steps = values.get("steps", []) - - for current_idx, current_step in enumerate(steps): + for current_idx, current_step in enumerate(values.steps): for input_filetype in current_step.input: # Check all previous steps for input type conflicts for prev_idx in range(current_idx - 1, -1, -1): - prev_step = steps[prev_idx] + prev_step = values.steps[prev_idx] # Check if previous step also processes the same input filetype if any(ft.type == input_filetype.type for ft in prev_step.input): @@ -177,6 +175,7 @@ class ProductionBase(BaseModel): ) return values + class SimulationProduction(ProductionBase): type: Literal["Simulation"] mc_config_version: str -- GitLab From cbeee0579bf5d970e682db71f6504c1db4432c4a Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Wed, 30 Oct 2024 22:56:23 +0100 Subject: [PATCH 08/12] use discriminated unions --- .../Utilities/Models.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index b2294b7e76..8f28dcdec1 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -9,7 +9,7 @@ # or submit itself to any jurisdiction. # ############################################################################### from enum import Enum -from typing import Any, Literal, Annotated +from typing import Any, Literal, Annotated, Union from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator, model_validator from pydantic import Field, PositiveInt @@ -158,7 +158,7 @@ class ProductionBase(BaseModel): submission_info: Any | None = None @model_validator(mode="after") - def validate_steps(cls, values): + def validate_steps(cls, values): # pylint: disable=no-self-argument for current_idx, current_step in enumerate(values.steps): for input_filetype in current_step.input: # Check all previous steps for input type conflicts @@ -194,6 +194,8 @@ class SimulationProduction(ProductionBase): class DataProduction(ProductionBase): + type: Literal["Sprucing", "AnalysisProduction", "Generic"] + class InputDataset(BaseModel): class BookkeepingQuery(BaseModel): configName: str @@ -214,10 +216,8 @@ class DataProduction(ProductionBase): input_dataset: InputDataset -def parse_obj(obj: Any) -> ProductionBase: - if obj.get("type") == "Simulation": - return SimulationProduction.model_validate(obj) +Production = Annotated[Union[SimulationProduction, DataProduction], Field(discriminator="type")] + - # TODO: should we really be validating the types first? - # if obj.get("type") in ["AnalysisProduction", "Sprucing"]: - return DataProduction.model_validate(obj) +def parse_obj(obj: Any) -> Production: + return Production.model_validate(obj) -- GitLab From 3085a4797247ff3b4e26ac91c72388e01584f887 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Thu, 31 Oct 2024 15:06:25 +0100 Subject: [PATCH 09/12] try simplified algo --- .../Utilities/Models.py | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 8f28dcdec1..9d7dfabbd9 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -140,6 +140,10 @@ class ProductionStep(BaseModel): return cleaned_data_pkgs +class InvalidStep(Exception): + pass + + class ProductionBase(BaseModel): type: str id: PositiveInt | None = None @@ -159,20 +163,15 @@ class ProductionBase(BaseModel): @model_validator(mode="after") def validate_steps(cls, values): # pylint: disable=no-self-argument - for current_idx, current_step in enumerate(values.steps): - for input_filetype in current_step.input: - # Check all previous steps for input type conflicts - for prev_idx in range(current_idx - 1, -1, -1): - prev_step = values.steps[prev_idx] - - # Check if previous step also processes the same input filetype - if any(ft.type == input_filetype.type for ft in prev_step.input): - # Ensure it produces output of the same type - if not any(ft.type == input_filetype.type for ft in prev_step.output): - raise ValueError( - f"Step {current_idx} wants to process {input_filetype!r}, " - f"but step {prev_idx} already processes it without producing output of the same type." - ) + current_leaves = {ft.type for ft in values.steps[0].output} + for step in values.steps[1:]: + for input_filetype in step.input: + if current_leaves.pop(input_filetype.type, None) is None: + raise InvalidStep(f"Failed to find input {input_filetype.type!r} for {step!r}") + for output_filetype in step.output: + if output_filetype.type in current_leaves: + raise InvalidStep(f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed") + output_filetype.add(output_filetype.type) return values -- GitLab From 9f877eee86e0871f3440156f3d022a08f6847d17 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Thu, 31 Oct 2024 15:17:22 +0100 Subject: [PATCH 10/12] use a TypeAdapter model_validate-->validate_python set.pop is special so use remove instead fix typo --- .../Utilities/Models.py | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index 9d7dfabbd9..e4b01bbad2 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -11,12 +11,20 @@ from enum import Enum from typing import Any, Literal, Annotated, Union -from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator, model_validator -from pydantic import Field, PositiveInt +from pydantic import ( + ConfigDict, + StringConstraints, + BaseModel as _BaseModel, + field_validator, + model_validator, + Field, + PositiveInt, + TypeAdapter, +) from DIRAC.Core.Security.ProxyInfo import getProxyInfo from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise -from typing import Annotated, Optional +from typing import Annotated FILETYPE_PATTERN = r"[A-Z0-9\.]+" @@ -166,12 +174,16 @@ class ProductionBase(BaseModel): current_leaves = {ft.type for ft in values.steps[0].output} for step in values.steps[1:]: for input_filetype in step.input: - if current_leaves.pop(input_filetype.type, None) is None: + if input_filetype.type not in current_leaves: raise InvalidStep(f"Failed to find input {input_filetype.type!r} for {step!r}") + current_leaves.remove(input_filetype.type) + for output_filetype in step.output: if output_filetype.type in current_leaves: - raise InvalidStep(f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed") - output_filetype.add(output_filetype.type) + raise InvalidStep( + f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed" + ) + current_leaves.add(output_filetype.type) return values @@ -219,4 +231,5 @@ Production = Annotated[Union[SimulationProduction, DataProduction], Field(discri def parse_obj(obj: Any) -> Production: - return Production.model_validate(obj) + adapter = TypeAdapter(Production) + return adapter.validate_python(obj) -- GitLab From fb4d1f5b4b7bf6c195239623d2535938d0666097 Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Thu, 31 Oct 2024 17:13:04 +0100 Subject: [PATCH 11/12] Fixed annoying Serialization UserWarnings Formatting --- .../Core/Utilities/BookkeepingJobInfo.py | 6 ++--- .../Workflow/Modules/BookkeepingReport.py | 22 +++++++++---------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py index e5b51b4635..c4e70e1300 100644 --- a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py +++ b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py @@ -10,17 +10,15 @@ ############################################################################### import os import socket -from typing import Optional import psutil import xml.etree.ElementTree as ET from pydantic import BaseModel as _BaseModel -from pydantic import Extra import LHCbDIRAC -class BaseModel(_BaseModel, extra=Extra.forbid): +class BaseModel(_BaseModel, extra="forbid"): pass @@ -104,7 +102,7 @@ class BookkeepingJobInfo(BaseModel): root.extend( [ ET.Element("TypedParameter", Name=k, Value=str(v), Type="Info") - for k, v in self.typed_parameters.dict().items() + for k, v in self.typed_parameters.model_dump().items() if v is not None ] ) diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py index ceeeed0938..f4ef668b86 100644 --- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py +++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py @@ -236,11 +236,11 @@ class BookkeepingReport(ModuleBase): def __generateTypedParams(self, job_info): """Set fields in job_info.typed_parameters""" exectime, cputime = getStepCPUTimes(self.step_commons) - job_info.typed_parameters.CPUTIME = cputime - job_info.typed_parameters.ExecTime = exectime + job_info.typed_parameters.CPUTIME = str(cputime) + job_info.typed_parameters.ExecTime = str(exectime) try: - job_info.typed_parameters.WNMEMORY = self.xf_o.memory + job_info.typed_parameters.WNMEMORY = str(self.xf_o.memory) except AttributeError: pass @@ -249,11 +249,11 @@ class BookkeepingReport(ModuleBase): mjfPower = gConfig.getValue("/LocalSite/CPUScalingFactor", "0") # Trick to know that the value is obtained from MJF: # from diracPower if mjfPower != diracPower: - job_info.typed_parameters.WNMJFHS06 = mjfPower - job_info.typed_parameters.NumberOfProcessors = self.numberOfProcessors - job_info.typed_parameters.Production = self.production_id + job_info.typed_parameters.WNMJFHS06 = str(mjfPower) + job_info.typed_parameters.NumberOfProcessors = str(self.numberOfProcessors) + job_info.typed_parameters.Production = str(self.production_id) job_info.typed_parameters.DiracJobId = str(self.jobID) - job_info.typed_parameters.Name = self.step_id + job_info.typed_parameters.Name = str(self.step_id) job_info.typed_parameters.JobStart = f"{self.ldatestart} {self.ltimestart}" job_info.typed_parameters.JobEnd = f"{self.ldate} {self.ltime}" job_info.typed_parameters.Location = self.siteName @@ -262,11 +262,11 @@ class BookkeepingReport(ModuleBase): job_info.typed_parameters.ProgramName = self.applicationName job_info.typed_parameters.ProgramVersion = self.applicationVersion - job_info.typed_parameters.FirstEventNumber = 1 + job_info.typed_parameters.FirstEventNumber = str(1) - job_info.typed_parameters.StatisticsRequested = self.numberOfEvents + job_info.typed_parameters.StatisticsRequested = str(self.numberOfEvents) - job_info.typed_parameters.StepID = self.BKstepID + job_info.typed_parameters.StepID = str(self.BKstepID) try: noOfEvents = self.xf_o.inputEventsTotal if self.xf_o.inputEventsTotal else self.xf_o.outputEventsTotal @@ -276,7 +276,7 @@ class BookkeepingReport(ModuleBase): if not res["OK"]: raise AttributeError("Can't get the BKK file metadata") noOfEvents = sum(fileMeta["EventStat"] for fileMeta in res["Value"]["Successful"].values()) - job_info.typed_parameters.NumberOfEvents = noOfEvents + job_info.typed_parameters.NumberOfEvents = str(noOfEvents) ################################################################################ -- GitLab From cc3feddbce1ff308e860298d68aa26440105cc0b Mon Sep 17 00:00:00 2001 From: Ryunosuke O'Neil <r.oneil@cern.ch> Date: Fri, 1 Nov 2024 13:59:55 +0100 Subject: [PATCH 12/12] Set DataProduction discriminator field properly...again?! --- src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py index e4b01bbad2..e5326059dd 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py @@ -205,7 +205,7 @@ class SimulationProduction(ProductionBase): class DataProduction(ProductionBase): - type: Literal["Sprucing", "AnalysisProduction", "Generic"] + type: Literal["Sprucing", "AnalysisProduction", "Stripping", "Reconstruction"] class InputDataset(BaseModel): class BookkeepingQuery(BaseModel): -- GitLab