diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/actions.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/actions.py index ded484a78b37403943ef67ba85636be2ada84922..362c5cb6c7ac240581448260960546d27eae2be5 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/actions.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/actions.py @@ -85,6 +85,22 @@ class SubmitProductionsAction(Action): start_productions(logbook, self.issue) +class SubmitRemovalTransformationAction(Action): + def __init__(self, issue, request_id: int): + self.issue = issue + self.request_id = request_id + + def message(self): + return f"{self.__class__.__name__}({self.issue})" + + def run(self): + from .launching import start_removal_transformation + from .integrations import OperationsLogbook + + logbook = OperationsLogbook() + start_removal_transformation(logbook, self.issue, self.request_id) + + class CheckValidationAction(Action): def __init__(self, issue, request_id): self.issue = issue @@ -179,7 +195,6 @@ class AddLabelAction(Action): if project_label.startswith(prefix): break else: - breakpoint() raise ValueError(f"Unknown label prefix {prefix}") self.repo.project.labels.create({"name": self.label, "color": color}) # Clear the cache diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/checks.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/checks.py index f88d59940188abd6b765ac3de129fe6b0f0ec2e3..52b211590f9e88f391bdd78b6858badad191d2e0 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/checks.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/checks.py @@ -67,7 +67,9 @@ async def pm_checks(console, logbook, issue, request_id, transform_id): if choice == "yes": issue.metadata.requests[request_id].checks.pm_check[transform_id] = True issue.update_metadata() - maybe_post_logbook(console, logbook, issue.url, transform_id, choice == "yes", [(cmd, output)], ["Sprucing"]) + maybe_post_logbook( + console, logbook, issue.url, transform_id, choice == "yes", [(cmd, output)], ["Sprucing", "Production"] + ) async def dm_checks(console, logbook, issue, request_id, transform_id): @@ -90,7 +92,7 @@ async def dm_checks(console, logbook, issue, request_id, transform_id): transform_id, choice == "yes", [(cmd, output), (cmd2, output2)], - ["Sprucing", "Data Management"], + ["Sprucing", "Data Management", "Production"], ) @@ -202,7 +204,13 @@ async def buffer_check_removal(console, logbook, issue, request_id): issue.metadata.requests[request_id].checks.dm_clean[request_id] = True issue.update_metadata() maybe_post_logbook( - console, logbook, issue.url, transform_id, choice == "yes", cmd_out, ["Sprucing", "Data Management"] + console, + logbook, + issue.url, + transform_id, + choice == "yes", + cmd_out, + ["Sprucing", "Data Management", "Production"], ) diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/integrations.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/integrations.py index f4ebf410a42ebef24961c3a4e07d41bf3cb1ae57..0326b4fdea2833cb2d69aa9d3069dae717e548c1 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/integrations.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/integrations.py @@ -168,7 +168,7 @@ class RepoIssue: self.run_yaml = self._parse_run_yaml_blob(self.run_yaml_blob) self.request_yaml = self._parse_request_yaml_blob(self.request_yaml_blob) try: - self.metadata = self._extract_metadata() + self.metadata: RepoIssueMetadata = self._extract_metadata() except Exception: print(f"Failed to extract metadata for {self}") raise diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/launching.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/launching.py index 83d3e388dba60284994c968f4b17e423477b2a3b..2c3d843654bf5e6a9d9887752faf075bdeb1255b 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/launching.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/launching.py @@ -15,16 +15,18 @@ __all__ = ["start_productions", "check_validation"] import traceback import json + from LHCbDIRAC.ProductionManagementSystem.Utilities.Models import parse_obj from LHCbDIRAC.ProductionManagementSystem.scripts.dirac_production_request_submit import submitProductionRequests from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise +from LHCbDIRAC.TransformationSystem.Client.Transformation import Transformation from LHCbDIRAC.TransformationSystem.Client.TransformationClient import TransformationClient from LHCbDIRAC.ProductionManagementSystem.Client.ProductionRequest import ProductionRequest from LHCbDIRAC.BookkeepingSystem.Client.BookkeepingClient import BookkeepingClient from LHCbDIRAC.ProductionManagementSystem.Client.ProductionRequestClient import ProductionRequestClient from LHCbDIRAC.ProductionManagementSystem.Utilities.ModelCompatibility import configure_input, runs_to_input_query -from .integrations import Validation, Request +from .integrations import Validation, Request, RepoIssue def get_request_state(request_id): @@ -112,7 +114,7 @@ def start_productions(logbook, issue): f"{request_id}-extra.yaml": issue.run_yaml_blob, } try: - logbook.create_post(issue.url, transform_ids, subject, body, attachments, ["Sprucing"]) + logbook.create_post(issue.url, transform_ids, subject, body, attachments, ["Sprucing", "Production"]) except Exception: print("Failed to post to logbook") traceback.print_exc() @@ -124,6 +126,68 @@ def start_productions(logbook, issue): traceback.print_exc() +def start_removal_transformation(logbook, issue: RepoIssue, request_id: int): + if "state::running" not in issue.labels: + raise NotImplementedError("Only submitting removals for issues in state::running") + + request_metadata = issue.metadata.requests[request_id] + sprucing_tid, merging_tid = request_metadata.transform_ids + + tc = TransformationClient() + bkquery = returnValueOrRaise(tc.getBookkeepingQuery(sprucing_tid)) + bkquery["ProcessingPass"] = "/" + bkquery["ProcessingPass"] + extra_param = returnValueOrRaise(tc.getAdditionalParameters(sprucing_tid)) + removal_trans = Transformation() + removal_trans.setType("Removal") + + trans_name = "/".join( + [ + "Removal-", + bkquery["ConfigName"], + bkquery["ConfigVersion"], + bkquery["DataTakingConditions"], + bkquery["ProcessingPass"], + str(bkquery["EventType"]), + f"{bkquery['FileType']}-issue-{issue.issue.iid}", + ] + ) + removal_trans.setBkQuery(bkquery) + removal_trans.setTransformationName(trans_name) + removal_trans.setTransformationGroup("RemoveReplicasWhenProcessed") + removal_trans.setPlugin("RemoveReplicasWhenProcessed") + removal_trans.setBody("removal;RemoveReplica") + removal_trans.setSEParam("FromSEs", "['Tier1-Buffer']") + long_name = f"RemoveReplicasWhenProcessed for issue {issue.issue.iid}" + removal_trans.setDescription(long_name[:255]) + removal_trans.setLongDescription(long_name[:255]) + removal_trans.setAdditionalParam("ProcessingPasses", extra_param["groupDescription"].strip("/")) + # removal_trans.setStatus("Active") + removal_trans.setAgentType("Automatic") + returnValueOrRaise(removal_trans.addTransformation()) + removal_id = returnValueOrRaise(removal_trans.getTransformationID()) + print(f"Submitted removal transforation {request_id=} {removal_id=}", request_id) + + try: + + issue.metadata.requests[request_id].removal = removal_id + issue.update_metadata() + except Exception: + print("Failed to update metadata in issue", issue.metadata) + traceback.print_exc() + + productionRequest = parse_obj(issue.request_yaml) + subject = f"Removal for {productionRequest.name}" + body = f"Removal transformation was submitted: {removal_id}\n\n" + + body += json.dumps(removal_trans.paramValues, indent=3) + + try: + logbook.create_post(issue.url, [removal_id], subject, body, {}, ["Sprucing", "Data Management", "Production"]) + except Exception: + print("Failed to post to logbook") + traceback.print_exc() + + def check_validation(issue, prod_id): request_metadata = issue.metadata.validations[prod_id] diff --git a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/monitoring.py b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/monitoring.py index 4e970679f4ad64733276e05c6f2ccf29a4009e1d..a96cc4022e3d58cdb6fdfe4b411e470113655688 100644 --- a/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/monitoring.py +++ b/src/LHCbDIRAC/ProductionManagementSystem/Utilities/ProductionTools/monitoring.py @@ -31,6 +31,7 @@ from .actions import ( AddLabelAction, UpdateTransformationStatusAction, UpdateRequestStateAction, + SubmitRemovalTransformationAction, ) from .integrations import EVENTTYPE_TO_STREAM @@ -165,6 +166,9 @@ def analyse_active_productions(repo, *, states: set[str] = ACTIVE_PRODUCTION_STA tables_data[config][proc_pass][event_type][tinfo[main_tid]["Type"]].append(row_data) if state == "running": + if not request_meta.removal: + actions[issue].append(SubmitRemovalTransformationAction(issue, request_id)) + all_finished = True for tid in request_meta.transform_ids + [request_meta.removal]: file_status = request_meta.file_status.get(tid, {})