Skip to content

Moving `Job finalization` step from the workflow to the `JobWrapper`: Transition Plan for Enhancing HPC Exploitation in DIRAC/LHCbDIRAC

The objective of this transition plan is to enhance the exploitation of HPCs with no external connectivity in DIRAC/LHCbDIRAC. This involves a phased approach to integrate new functionalities for job processing, ensuring minimal disruption to existing workflows and providing a smooth transition.

Problems with the Current Solution

Because a few LHCbDIRAC workflow modules depend on external services (mostly present in the Job finalization step), we currently need to submit the application directly from the workflow. This prevents us from scaling the solution up:

  • High Memory Consumption: each job execution involves multiple processes (JobWrapper, dirac-jobexec, and others), leading to high memory consumption. For instance, each JobWrapper consumes about 77MB, and dirac-jobexec takes around 90MB. This results in a substantial memory footprint when running multiple jobs simultaneously (e.g., 300 jobs require about 51GB of RAM).
  • Lack of Scalability: each CE interface within the workflow (RemoteRunner) handles only one job ID, preventing bulk processing of jobs.

Transition Plan (targeting MC Simulation but could be expanded to other types of jobs)

Basically, I would like to move the Job finalization step from the workflow, to the JobWrapper.

Phase 1: Development and simple testing

  • DIRAC Pull Request:

    • Modify the JobWrapper class to include preProcess() and postProcess() methods. These methods should be designed to be overridden, allowing for customized behavior per community needs.
     class JobWrapper():
     ...
     	def preProcess(self):
     		"""This method is called before the payload starts."""
     		pass
     	def postProcess(self):
     		"""This method is called after the payload has finished running."""
     		pass
     	def execute(self):
     		"""Pre-process, execute the workflow, and post-process"""
     		self.preProcess()
     		self.process() # the current execute()
     		self.postProcess()
    • JobWrapper could be configured in the CS, such as:
     JobWrapper
     {
     	Implementation = JobWrapper
     }
    • JobWrapperTemplate should then use it as:
     class JobWrapperTemplate():
     ...
     	def execute(self):
     		jobWrapper = ObjectLoader().loadObject(f"DIRAC.WMS.JobWrapper.{type}")
    • Remove the DIRAC UploadOutputs module (because not usable in its current state).
    • Update PushJobAgent to use preProcess() and postProcess() methods, and submit jobs to HPCs accordingly. Basically, the content of JobWrapperTemplate should be spread within PushJobAgent such as:
     class PushJobAgent():
     ...
     # Current solution
     jobWrapper = createJobWrapper()
     ce.submitJob(jobWrapper)
    
     ---
     # Proposed solution
     # Basically the content of JobWrapperTemplate
     # JobWrapperTemplate could actually be reused here to avoid duplicating the logic
    
     # This would be done in execute()
     jobWrapper = ObjectLoader().loadObject(f"DIRAC.WMS.JobWrapper.{type}"
     jobWrapper.initialize()
     jobWrapper.transferInputSandbox()
     jobWrapper.resolveInputData()
     jobWrapper.preProcess()
    
     # This would be done in _submitJob()
     command = f"dirac-jobexec {jobWrapper.workflow}"
     ce.submitJob(command)
     jobWrapper.postProcess()
    
     # This would be done in execute() once we have results
     jobWrapper.processJobOutputs()
     jobWrapper.finalize()
    • Introduce feature flags or configuration parameters to toggle the new functionality. This allows for selective enablement in production environments and provides a fallback in case of issues.
      • SubmissionPolicy in PushJobAgent: JobWrapper/Workflow to switch from a solution to another.
  • LHCbDIRAC MR:

    • In LHCbDIRAC, override postProcess() to handle the 'Job finalization' step, moving logic from the workflow to this method.
class LHCbJobWrapper():
...
    def preProcess(self):
        if not externalConnectivity and submissionPolicy == "Workflow":
            # remove Job finalization content from the workflow

    def postProcess(self):
        if "Job finalization" not in workflow:
            # Job finalization content

Initial state: the old behaviour is enabled by default: preProcess() and postProcess() are not used.

Phase 2: Configuration and "production" testing

  • LHCbDIRAC configuration: enable postProcess() execution only for the PushJobAgent first.
    • Switch SubmissionPolicy to Workflow
    • Monitor the system closely for any issues.

Phase 3: Full Deployment

  • LHCbDIRAC configuration: After ensuring stability and performance in the initial deployment phase,
    • Modify ProductionRequest
# N/100 to add the finalization step
chanceToRunWithoutFinalizationStep = gConfig.getValue("/Operations/ChanceToRunWithoutFinalizationStep")
if random.randint(chanceToRunWithoutFinalizationStep,100) == 1:
    prod.addFinalizationStep(["UploadOutputData", "RemoveInputData", "UploadLogFile", "UploadMC", "FailoverRequest"])
else:
    prod.addFinalizationStep()
  • gradually roll out the changes.

Phase 4: Decommissioning of Old Logic

  • Decommissioning:
    • Once the new system proves to be stable and effective, start decommissioning the old logic related to job finalization within the workflows.
      • Remove the flags ChanceToRunWithoutFinalizationStep and SubmissionPolicy
      • Remove the preProcess() method in LHCbJobWrapper

Any opinion about this transition plan?