Moving `Job finalization` step from the workflow to the `JobWrapper`: Transition Plan for Enhancing HPC Exploitation in DIRAC/LHCbDIRAC
The objective of this transition plan is to enhance the exploitation of HPCs with no external connectivity in DIRAC/LHCbDIRAC. This involves a phased approach to integrate new functionalities for job processing, ensuring minimal disruption to existing workflows and providing a smooth transition.
Problems with the Current Solution
Because a few LHCbDIRAC workflow modules depend on external services (mostly present in the Job finalization step), we currently need to submit the application directly from the workflow. This prevents us from scaling the solution up:
- High Memory Consumption: each job execution involves multiple processes (
JobWrapper,dirac-jobexec, and others), leading to high memory consumption. For instance, eachJobWrapperconsumes about 77MB, and dirac-jobexec takes around 90MB. This results in a substantial memory footprint when running multiple jobs simultaneously (e.g., 300 jobs require about 51GB of RAM). - Lack of Scalability: each CE interface within the workflow (
RemoteRunner) handles only one job ID, preventing bulk processing of jobs.
Transition Plan (targeting MC Simulation but could be expanded to other types of jobs)
Basically, I would like to move the Job finalization step from the workflow, to the JobWrapper.
Phase 1: Development and simple testing
-
DIRAC Pull Request:
- Modify the
JobWrapperclass to includepreProcess()andpostProcess()methods. These methods should be designed to be overridden, allowing for customized behavior per community needs.
class JobWrapper(): ... def preProcess(self): """This method is called before the payload starts.""" pass def postProcess(self): """This method is called after the payload has finished running.""" pass def execute(self): """Pre-process, execute the workflow, and post-process""" self.preProcess() self.process() # the current execute() self.postProcess()-
JobWrappercould be configured in the CS, such as:
JobWrapper { Implementation = JobWrapper }-
JobWrapperTemplateshould then use it as:
class JobWrapperTemplate(): ... def execute(self): jobWrapper = ObjectLoader().loadObject(f"DIRAC.WMS.JobWrapper.{type}")- Remove the DIRAC
UploadOutputsmodule (because not usable in its current state). - Update
PushJobAgentto usepreProcess()andpostProcess()methods, and submit jobs to HPCs accordingly. Basically, the content ofJobWrapperTemplateshould be spread withinPushJobAgentsuch as:
class PushJobAgent(): ... # Current solution jobWrapper = createJobWrapper() ce.submitJob(jobWrapper) --- # Proposed solution # Basically the content of JobWrapperTemplate # JobWrapperTemplate could actually be reused here to avoid duplicating the logic # This would be done in execute() jobWrapper = ObjectLoader().loadObject(f"DIRAC.WMS.JobWrapper.{type}" jobWrapper.initialize() jobWrapper.transferInputSandbox() jobWrapper.resolveInputData() jobWrapper.preProcess() # This would be done in _submitJob() command = f"dirac-jobexec {jobWrapper.workflow}" ce.submitJob(command) jobWrapper.postProcess() # This would be done in execute() once we have results jobWrapper.processJobOutputs() jobWrapper.finalize()- Introduce feature flags or configuration parameters to toggle the new functionality. This allows for selective enablement in production environments and provides a fallback in case of issues.
-
SubmissionPolicyinPushJobAgent:JobWrapper/Workflowto switch from a solution to another.
-
- Modify the
-
LHCbDIRAC MR:
- In
LHCbDIRAC, overridepostProcess()to handle the 'Job finalization' step, moving logic from the workflow to this method.
- In
class LHCbJobWrapper():
...
def preProcess(self):
if not externalConnectivity and submissionPolicy == "Workflow":
# remove Job finalization content from the workflow
def postProcess(self):
if "Job finalization" not in workflow:
# Job finalization content
Initial state: the old behaviour is enabled by default: preProcess() and postProcess() are not used.
Phase 2: Configuration and "production" testing
- LHCbDIRAC configuration: enable
postProcess()execution only for thePushJobAgentfirst.- Switch
SubmissionPolicytoWorkflow - Monitor the system closely for any issues.
- Switch
Phase 3: Full Deployment
- LHCbDIRAC configuration: After ensuring stability and performance in the initial deployment phase,
- Modify
ProductionRequest
- Modify
# N/100 to add the finalization step
chanceToRunWithoutFinalizationStep = gConfig.getValue("/Operations/ChanceToRunWithoutFinalizationStep")
if random.randint(chanceToRunWithoutFinalizationStep,100) == 1:
prod.addFinalizationStep(["UploadOutputData", "RemoveInputData", "UploadLogFile", "UploadMC", "FailoverRequest"])
else:
prod.addFinalizationStep()
- gradually roll out the changes.
Phase 4: Decommissioning of Old Logic
- Decommissioning:
- Once the new system proves to be stable and effective, start decommissioning the old logic related to job finalization within the workflows.
- Remove the flags
ChanceToRunWithoutFinalizationStepandSubmissionPolicy - Remove the
preProcess()method inLHCbJobWrapper
- Remove the flags
- Once the new system proves to be stable and effective, start decommissioning the old logic related to job finalization within the workflows.
Any opinion about this transition plan?