From bd69f095ee0eb41edf07ebca9606c628c922b0b0 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Mon, 28 Oct 2024 13:08:39 +0100
Subject: [PATCH 01/12] add a step splitter by input filetype.

format
---
 .../Utilities/Models.py                       |  7 +++++--
 .../dirac_production_request_run_local.py     | 19 +++++++++++++++++++
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index 839831c534..9129ce3710 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -195,5 +195,8 @@ class DataProduction(ProductionBase):
 
 def parse_obj(obj: Any) -> ProductionBase:
     if obj.get("type") == "Simulation":
-        return SimulationProduction.parse_obj(obj)
-    return DataProduction.parse_obj(obj)
+        return SimulationProduction.model_validate(obj)
+
+    # TODO: should we really be validating the types first?
+    # if obj.get("type") in ["AnalysisProduction", "Sprucing"]:
+    return DataProduction.model_validate(obj)
diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
index ad0366d69c..de0ced7960 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
@@ -39,6 +39,24 @@ from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import (
 )
 
 
+def preprocess_spec(pr: ProductionBase):
+    """production-request-run-local does not support multiple input types so easily, so we need to split them"""
+    new_steps = []
+
+    for step in pr.steps:
+        if len(step.input) > 1:
+            # we need to split this step
+            for i, input_filetype in enumerate(step.input):
+                split_step = step.model_copy(deep=True)
+                split_step.name = f"{step.name}ft{i}"
+                split_step.input = [input_filetype]
+                new_steps.append(split_step)
+        else:
+            new_steps.append(step)
+    pr.steps = new_steps
+    return pr.model_validate()
+
+
 def parseArgs():
     useCfgOverride = True
     inputFiles = None
@@ -202,6 +220,7 @@ def main():
     productionRequests = defaultdict(list)
     for spec in yaml.safe_load(yamlPath.read_text()):
         productionRequest = parse_obj(spec)
+        productionRequest = preprocess_spec(productionRequest)
         productionRequests[productionRequest.name] += [productionRequest]
 
     if name is None:
-- 
GitLab


From b5b038a6009456beb3864efec982153c87461251 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Mon, 28 Oct 2024 13:38:15 +0100
Subject: [PATCH 02/12] add submission_info to ProductionBase

pre-commit

:cry:
---
 .../ProductionManagementSystem/Utilities/Models.py   | 12 +++++++-----
 .../scripts/dirac_production_request_run_local.py    |  2 +-
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index 9129ce3710..d4ffe2aae1 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -11,12 +11,12 @@
 from enum import Enum
 from typing import Any, Literal, Annotated
 
-from pydantic import StringConstraints, BaseModel as _BaseModel, field_validator
-from pydantic import Extra, Field, PositiveInt, validator
+from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator
+from pydantic import Field, PositiveInt
 
 from DIRAC.Core.Security.ProxyInfo import getProxyInfo
 from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise
-from typing import Annotated
+from typing import Annotated, Optional
 
 FILETYPE_PATTERN = r"[A-Z0-9\.]+"
 
@@ -35,8 +35,7 @@ class ProductionStates(str, Enum):
 
 
 class BaseModel(_BaseModel):
-    class Config:
-        extra = Extra.forbid
+    model_config = ConfigDict(extra="forbid")
 
 
 class ProductionStep(BaseModel):
@@ -154,6 +153,9 @@ class ProductionBase(BaseModel):
     state: ProductionStates = ProductionStates.NEW
     steps: Annotated[list[ProductionStep], Field(min_length=1)]
 
+    # TODO: a model should be written for this
+    submission_info: Any | None = None
+
 
 class SimulationProduction(ProductionBase):
     type: Literal["Simulation"]
diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
index de0ced7960..944d0c44e8 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
@@ -54,7 +54,7 @@ def preprocess_spec(pr: ProductionBase):
         else:
             new_steps.append(step)
     pr.steps = new_steps
-    return pr.model_validate()
+    return pr
 
 
 def parseArgs():
-- 
GitLab


From 62c73d4503641b9c513a51dcb273baa02f18d823 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Mon, 28 Oct 2024 21:46:25 +0100
Subject: [PATCH 03/12] set also output filetype when splitting merge step.

---
 .../scripts/dirac_production_request_run_local.py                | 1 +
 1 file changed, 1 insertion(+)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
index 944d0c44e8..d40a4c5634 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
@@ -50,6 +50,7 @@ def preprocess_spec(pr: ProductionBase):
                 split_step = step.model_copy(deep=True)
                 split_step.name = f"{step.name}ft{i}"
                 split_step.input = [input_filetype]
+                split_step.output = [input_filetype.model_copy(deep=True)]
                 new_steps.append(split_step)
         else:
             new_steps.append(step)
-- 
GitLab


From fa23fa94a6ccdf41f9eaa6d1068f42bda69d6905 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Wed, 30 Oct 2024 11:55:49 +0100
Subject: [PATCH 04/12] working-ish splitting steps

---
 .../dirac_production_request_run_local.py     | 20 +++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
index d40a4c5634..c7deeb153b 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
@@ -45,17 +45,25 @@ def preprocess_spec(pr: ProductionBase):
 
     for step in pr.steps:
         if len(step.input) > 1:
+            print(f"Splitting step {step.name}")
             # we need to split this step
             for i, input_filetype in enumerate(step.input):
-                split_step = step.model_copy(deep=True)
-                split_step.name = f"{step.name}ft{i}"
-                split_step.input = [input_filetype]
-                split_step.output = [input_filetype.model_copy(deep=True)]
+                split_step = step.model_copy(deep=True, update={
+                    "name": f"{step.name}ft{i}",
+                    "processing_pass": f"{step.processing_pass}{i}",
+                    "input": [input_filetype],
+                    "output": [input_filetype.model_copy(deep=True)],
+                })
                 new_steps.append(split_step)
+                print(split_step)
         else:
             new_steps.append(step)
-    pr.steps = new_steps
-    return pr
+    return pr.model_copy(
+        deep=True,
+        update={
+            "steps": new_steps,
+        }
+    )
 
 
 def parseArgs():
-- 
GitLab


From 527f8eebacf82ae2a689d982bc10d1e26dbd8b3b Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Wed, 30 Oct 2024 16:02:55 +0100
Subject: [PATCH 05/12] ModuleBase refactoring to support splitting, merging
 workflows

Ran pre-commit
---
 .../Utilities/Models.py                       |  2 +
 .../dirac_production_request_run_local.py     | 17 ++++---
 src/LHCbDIRAC/Workflow/Modules/ModuleBase.py  | 46 ++++++++++---------
 3 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index d4ffe2aae1..762dd88c1e 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -151,6 +151,8 @@ class ProductionBase(BaseModel):
     wg: str
     comment: str = ""
     state: ProductionStates = ProductionStates.NEW
+
+    # TODO: validate that steps only process input filetypes out of a given node only once
     steps: Annotated[list[ProductionStep], Field(min_length=1)]
 
     # TODO: a model should be written for this
diff --git a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
index c7deeb153b..d8a47a03c3 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/scripts/dirac_production_request_run_local.py
@@ -48,12 +48,15 @@ def preprocess_spec(pr: ProductionBase):
             print(f"Splitting step {step.name}")
             # we need to split this step
             for i, input_filetype in enumerate(step.input):
-                split_step = step.model_copy(deep=True, update={
-                    "name": f"{step.name}ft{i}",
-                    "processing_pass": f"{step.processing_pass}{i}",
-                    "input": [input_filetype],
-                    "output": [input_filetype.model_copy(deep=True)],
-                })
+                split_step = step.model_copy(
+                    deep=True,
+                    update={
+                        "name": f"{step.name}ft{i}",
+                        "processing_pass": f"{step.processing_pass}{i}",
+                        "input": [input_filetype],
+                        "output": [input_filetype.model_copy(deep=True)],
+                    },
+                )
                 new_steps.append(split_step)
                 print(split_step)
         else:
@@ -62,7 +65,7 @@ def preprocess_spec(pr: ProductionBase):
         deep=True,
         update={
             "steps": new_steps,
-        }
+        },
     )
 
 
diff --git a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py
index b83d1df775..49e2107b4b 100644
--- a/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py
+++ b/src/LHCbDIRAC/Workflow/Modules/ModuleBase.py
@@ -822,31 +822,33 @@ class ModuleBase:
         """determine the input data for the step."""
         if inputData == "previousStep":
             stepIndex = self.gaudiSteps.index(self.stepName)
-            previousStep = self.gaudiSteps[stepIndex - 1]
-
-            outputList = self.workflow_commons.get("outputList", [])
-            if not outputList:
-                raise RuntimeError(
-                    f"outputList was empty while finding input data for step {self.stepName} - "
-                    "did the first step produce any output at all?"
-                )
-
             stepInputData = []
-            for outputF in outputList:
-                try:
-                    if (
-                        outputF["stepName"] == previousStep
-                        and outputF["outputBKType"].lower() == self.inputDataType.lower()
-                    ):
-                        stepInputData.append(outputF["outputDataName"])
-                except KeyError:
-                    raise RuntimeError(f"Can't find output of step {previousStep}")
+            for previousStep in self.gaudiSteps[stepIndex - 1 :: -1]:
+                outputList = self.workflow_commons.get("outputList", [])
+                if not outputList:
+                    raise RuntimeError(
+                        f"outputList was empty while finding input data for step {self.stepName} - "
+                        "did the first step produce any output at all?"
+                    )
 
-            # outputDataName is always lower case but the job output can vary
-            # Fix it if the file only exists in the current directory with mixed case
-            filenameMap = {fn.lower(): fn for fn in os.listdir(".")}
-            stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData]
+                for outputF in outputList:
+                    try:
+                        if (
+                            outputF["stepName"] == previousStep
+                            and outputF["outputBKType"].lower() == self.inputDataType.lower()
+                        ):
+                            stepInputData.append(outputF["outputDataName"])
+                    except KeyError:
+                        raise RuntimeError(f"Can't find output of step {previousStep}")
 
+                # outputDataName is always lower case but the job output can vary
+                # Fix it if the file only exists in the current directory with mixed case
+                filenameMap = {fn.lower(): fn for fn in os.listdir(".")}
+                stepInputData = [fn if fn in os.listdir(".") else filenameMap.get(fn, fn) for fn in stepInputData]
+
+                # We found the output file so no need to look at earlier steps
+                if stepInputData:
+                    break
             return stepInputData
 
         return [x.strip("LFN:") for x in inputData.split(";")]
-- 
GitLab


From a7bd4865e875282cc4a9a6813d92582915fd4fde Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Wed, 30 Oct 2024 17:00:49 +0100
Subject: [PATCH 06/12] Added a model validator for ProductionStep

---
 .../Utilities/Models.py                       | 22 +++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index 762dd88c1e..cb2e1b60b5 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -11,7 +11,7 @@
 from enum import Enum
 from typing import Any, Literal, Annotated
 
-from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator
+from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator, model_validator
 from pydantic import Field, PositiveInt
 
 from DIRAC.Core.Security.ProxyInfo import getProxyInfo
@@ -152,12 +152,30 @@ class ProductionBase(BaseModel):
     comment: str = ""
     state: ProductionStates = ProductionStates.NEW
 
-    # TODO: validate that steps only process input filetypes out of a given node only once
     steps: Annotated[list[ProductionStep], Field(min_length=1)]
 
     # TODO: a model should be written for this
     submission_info: Any | None = None
 
+    @model_validator(mode="after")
+    def validate_steps(cls, values):
+        steps = values.get("steps", [])
+
+        for current_idx, current_step in enumerate(steps):
+            for input_filetype in current_step.input:
+                # Check all previous steps for input type conflicts
+                for prev_idx in range(current_idx - 1, -1, -1):
+                    prev_step = steps[prev_idx]
+
+                    # Check if previous step also processes the same input filetype
+                    if any(ft.type == input_filetype.type for ft in prev_step.input):
+                        # Ensure it produces output of the same type
+                        if not any(ft.type == input_filetype.type for ft in prev_step.output):
+                            raise ValueError(
+                                f"Step {current_idx} wants to process {input_filetype!r}, "
+                                f"but step {prev_idx} already processes it without producing output of the same type."
+                            )
+        return values
 
 class SimulationProduction(ProductionBase):
     type: Literal["Simulation"]
-- 
GitLab


From 6b27bb86dfce1c9179205de97c6bb52e7da1ec61 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Wed, 30 Oct 2024 17:07:44 +0100
Subject: [PATCH 07/12] use values.steps

pre-commit!
---
 .../ProductionManagementSystem/Utilities/Models.py         | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index cb2e1b60b5..b2294b7e76 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -159,13 +159,11 @@ class ProductionBase(BaseModel):
 
     @model_validator(mode="after")
     def validate_steps(cls, values):
-        steps = values.get("steps", [])
-
-        for current_idx, current_step in enumerate(steps):
+        for current_idx, current_step in enumerate(values.steps):
             for input_filetype in current_step.input:
                 # Check all previous steps for input type conflicts
                 for prev_idx in range(current_idx - 1, -1, -1):
-                    prev_step = steps[prev_idx]
+                    prev_step = values.steps[prev_idx]
 
                     # Check if previous step also processes the same input filetype
                     if any(ft.type == input_filetype.type for ft in prev_step.input):
@@ -177,6 +175,7 @@ class ProductionBase(BaseModel):
                             )
         return values
 
+
 class SimulationProduction(ProductionBase):
     type: Literal["Simulation"]
     mc_config_version: str
-- 
GitLab


From cbeee0579bf5d970e682db71f6504c1db4432c4a Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Wed, 30 Oct 2024 22:56:23 +0100
Subject: [PATCH 08/12] use discriminated unions

---
 .../Utilities/Models.py                          | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index b2294b7e76..8f28dcdec1 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -9,7 +9,7 @@
 # or submit itself to any jurisdiction.                                       #
 ###############################################################################
 from enum import Enum
-from typing import Any, Literal, Annotated
+from typing import Any, Literal, Annotated, Union
 
 from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator, model_validator
 from pydantic import Field, PositiveInt
@@ -158,7 +158,7 @@ class ProductionBase(BaseModel):
     submission_info: Any | None = None
 
     @model_validator(mode="after")
-    def validate_steps(cls, values):
+    def validate_steps(cls, values):  # pylint: disable=no-self-argument
         for current_idx, current_step in enumerate(values.steps):
             for input_filetype in current_step.input:
                 # Check all previous steps for input type conflicts
@@ -194,6 +194,8 @@ class SimulationProduction(ProductionBase):
 
 
 class DataProduction(ProductionBase):
+    type: Literal["Sprucing", "AnalysisProduction", "Generic"]
+
     class InputDataset(BaseModel):
         class BookkeepingQuery(BaseModel):
             configName: str
@@ -214,10 +216,8 @@ class DataProduction(ProductionBase):
     input_dataset: InputDataset
 
 
-def parse_obj(obj: Any) -> ProductionBase:
-    if obj.get("type") == "Simulation":
-        return SimulationProduction.model_validate(obj)
+Production = Annotated[Union[SimulationProduction, DataProduction], Field(discriminator="type")]
+
 
-    # TODO: should we really be validating the types first?
-    # if obj.get("type") in ["AnalysisProduction", "Sprucing"]:
-    return DataProduction.model_validate(obj)
+def parse_obj(obj: Any) -> Production:
+    return Production.model_validate(obj)
-- 
GitLab


From 3085a4797247ff3b4e26ac91c72388e01584f887 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Thu, 31 Oct 2024 15:06:25 +0100
Subject: [PATCH 09/12] try simplified algo

---
 .../Utilities/Models.py                       | 27 +++++++++----------
 1 file changed, 13 insertions(+), 14 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index 8f28dcdec1..9d7dfabbd9 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -140,6 +140,10 @@ class ProductionStep(BaseModel):
         return cleaned_data_pkgs
 
 
+class InvalidStep(Exception):
+    pass
+
+
 class ProductionBase(BaseModel):
     type: str
     id: PositiveInt | None = None
@@ -159,20 +163,15 @@ class ProductionBase(BaseModel):
 
     @model_validator(mode="after")
     def validate_steps(cls, values):  # pylint: disable=no-self-argument
-        for current_idx, current_step in enumerate(values.steps):
-            for input_filetype in current_step.input:
-                # Check all previous steps for input type conflicts
-                for prev_idx in range(current_idx - 1, -1, -1):
-                    prev_step = values.steps[prev_idx]
-
-                    # Check if previous step also processes the same input filetype
-                    if any(ft.type == input_filetype.type for ft in prev_step.input):
-                        # Ensure it produces output of the same type
-                        if not any(ft.type == input_filetype.type for ft in prev_step.output):
-                            raise ValueError(
-                                f"Step {current_idx} wants to process {input_filetype!r}, "
-                                f"but step {prev_idx} already processes it without producing output of the same type."
-                            )
+        current_leaves = {ft.type for ft in values.steps[0].output}
+        for step in values.steps[1:]:
+            for input_filetype in step.input:
+                if current_leaves.pop(input_filetype.type, None) is None:
+                    raise InvalidStep(f"Failed to find input {input_filetype.type!r} for {step!r}")
+            for output_filetype in step.output:
+                if output_filetype.type in current_leaves:
+                    raise InvalidStep(f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed")
+                output_filetype.add(output_filetype.type)
         return values
 
 
-- 
GitLab


From 9f877eee86e0871f3440156f3d022a08f6847d17 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Thu, 31 Oct 2024 15:17:22 +0100
Subject: [PATCH 10/12] use a TypeAdapter

model_validate-->validate_python

set.pop is special so use remove instead

fix typo
---
 .../Utilities/Models.py                       | 27 ++++++++++++++-----
 1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index 9d7dfabbd9..e4b01bbad2 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -11,12 +11,20 @@
 from enum import Enum
 from typing import Any, Literal, Annotated, Union
 
-from pydantic import ConfigDict, StringConstraints, BaseModel as _BaseModel, field_validator, model_validator
-from pydantic import Field, PositiveInt
+from pydantic import (
+    ConfigDict,
+    StringConstraints,
+    BaseModel as _BaseModel,
+    field_validator,
+    model_validator,
+    Field,
+    PositiveInt,
+    TypeAdapter,
+)
 
 from DIRAC.Core.Security.ProxyInfo import getProxyInfo
 from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise
-from typing import Annotated, Optional
+from typing import Annotated
 
 FILETYPE_PATTERN = r"[A-Z0-9\.]+"
 
@@ -166,12 +174,16 @@ class ProductionBase(BaseModel):
         current_leaves = {ft.type for ft in values.steps[0].output}
         for step in values.steps[1:]:
             for input_filetype in step.input:
-                if current_leaves.pop(input_filetype.type, None) is None:
+                if input_filetype.type not in current_leaves:
                     raise InvalidStep(f"Failed to find input {input_filetype.type!r} for {step!r}")
+                current_leaves.remove(input_filetype.type)
+
             for output_filetype in step.output:
                 if output_filetype.type in current_leaves:
-                    raise InvalidStep(f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed")
-                output_filetype.add(output_filetype.type)
+                    raise InvalidStep(
+                        f"Found producer for {output_filetype.type!r} despite previous instance having not been consumed"
+                    )
+                current_leaves.add(output_filetype.type)
         return values
 
 
@@ -219,4 +231,5 @@ Production = Annotated[Union[SimulationProduction, DataProduction], Field(discri
 
 
 def parse_obj(obj: Any) -> Production:
-    return Production.model_validate(obj)
+    adapter = TypeAdapter(Production)
+    return adapter.validate_python(obj)
-- 
GitLab


From fb4d1f5b4b7bf6c195239623d2535938d0666097 Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Thu, 31 Oct 2024 17:13:04 +0100
Subject: [PATCH 11/12] Fixed annoying Serialization UserWarnings

Formatting
---
 .../Core/Utilities/BookkeepingJobInfo.py      |  6 ++---
 .../Workflow/Modules/BookkeepingReport.py     | 22 +++++++++----------
 2 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py
index e5b51b4635..c4e70e1300 100644
--- a/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py
+++ b/src/LHCbDIRAC/Core/Utilities/BookkeepingJobInfo.py
@@ -10,17 +10,15 @@
 ###############################################################################
 import os
 import socket
-from typing import Optional
 
 import psutil
 import xml.etree.ElementTree as ET
 from pydantic import BaseModel as _BaseModel
-from pydantic import Extra
 
 import LHCbDIRAC
 
 
-class BaseModel(_BaseModel, extra=Extra.forbid):
+class BaseModel(_BaseModel, extra="forbid"):
     pass
 
 
@@ -104,7 +102,7 @@ class BookkeepingJobInfo(BaseModel):
         root.extend(
             [
                 ET.Element("TypedParameter", Name=k, Value=str(v), Type="Info")
-                for k, v in self.typed_parameters.dict().items()
+                for k, v in self.typed_parameters.model_dump().items()
                 if v is not None
             ]
         )
diff --git a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
index ceeeed0938..f4ef668b86 100644
--- a/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
+++ b/src/LHCbDIRAC/Workflow/Modules/BookkeepingReport.py
@@ -236,11 +236,11 @@ class BookkeepingReport(ModuleBase):
     def __generateTypedParams(self, job_info):
         """Set fields in job_info.typed_parameters"""
         exectime, cputime = getStepCPUTimes(self.step_commons)
-        job_info.typed_parameters.CPUTIME = cputime
-        job_info.typed_parameters.ExecTime = exectime
+        job_info.typed_parameters.CPUTIME = str(cputime)
+        job_info.typed_parameters.ExecTime = str(exectime)
 
         try:
-            job_info.typed_parameters.WNMEMORY = self.xf_o.memory
+            job_info.typed_parameters.WNMEMORY = str(self.xf_o.memory)
         except AttributeError:
             pass
 
@@ -249,11 +249,11 @@ class BookkeepingReport(ModuleBase):
         mjfPower = gConfig.getValue("/LocalSite/CPUScalingFactor", "0")
         # Trick to know that the value is obtained from MJF: # from diracPower
         if mjfPower != diracPower:
-            job_info.typed_parameters.WNMJFHS06 = mjfPower
-        job_info.typed_parameters.NumberOfProcessors = self.numberOfProcessors
-        job_info.typed_parameters.Production = self.production_id
+            job_info.typed_parameters.WNMJFHS06 = str(mjfPower)
+        job_info.typed_parameters.NumberOfProcessors = str(self.numberOfProcessors)
+        job_info.typed_parameters.Production = str(self.production_id)
         job_info.typed_parameters.DiracJobId = str(self.jobID)
-        job_info.typed_parameters.Name = self.step_id
+        job_info.typed_parameters.Name = str(self.step_id)
         job_info.typed_parameters.JobStart = f"{self.ldatestart} {self.ltimestart}"
         job_info.typed_parameters.JobEnd = f"{self.ldate} {self.ltime}"
         job_info.typed_parameters.Location = self.siteName
@@ -262,11 +262,11 @@ class BookkeepingReport(ModuleBase):
         job_info.typed_parameters.ProgramName = self.applicationName
         job_info.typed_parameters.ProgramVersion = self.applicationVersion
 
-        job_info.typed_parameters.FirstEventNumber = 1
+        job_info.typed_parameters.FirstEventNumber = str(1)
 
-        job_info.typed_parameters.StatisticsRequested = self.numberOfEvents
+        job_info.typed_parameters.StatisticsRequested = str(self.numberOfEvents)
 
-        job_info.typed_parameters.StepID = self.BKstepID
+        job_info.typed_parameters.StepID = str(self.BKstepID)
 
         try:
             noOfEvents = self.xf_o.inputEventsTotal if self.xf_o.inputEventsTotal else self.xf_o.outputEventsTotal
@@ -276,7 +276,7 @@ class BookkeepingReport(ModuleBase):
             if not res["OK"]:
                 raise AttributeError("Can't get the BKK file metadata")
             noOfEvents = sum(fileMeta["EventStat"] for fileMeta in res["Value"]["Successful"].values())
-        job_info.typed_parameters.NumberOfEvents = noOfEvents
+        job_info.typed_parameters.NumberOfEvents = str(noOfEvents)
 
     ################################################################################
 
-- 
GitLab


From cc3feddbce1ff308e860298d68aa26440105cc0b Mon Sep 17 00:00:00 2001
From: Ryunosuke O'Neil <r.oneil@cern.ch>
Date: Fri, 1 Nov 2024 13:59:55 +0100
Subject: [PATCH 12/12] Set DataProduction discriminator field
 properly...again?!

---
 src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
index e4b01bbad2..e5326059dd 100644
--- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
+++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/Models.py
@@ -205,7 +205,7 @@ class SimulationProduction(ProductionBase):
 
 
 class DataProduction(ProductionBase):
-    type: Literal["Sprucing", "AnalysisProduction", "Generic"]
+    type: Literal["Sprucing", "AnalysisProduction", "Stripping", "Reconstruction"]
 
     class InputDataset(BaseModel):
         class BookkeepingQuery(BaseModel):
-- 
GitLab