From ce1982e12f650940ffae5b04191fda7a3660b2c5 Mon Sep 17 00:00:00 2001
From: Tadej Novak <tadej.novak@cern.ch>
Date: Fri, 10 Sep 2021 17:35:27 +0200
Subject: [PATCH] Support executor substeps in transforms

---
 .../python/AllConfigFlags.py                  |   8 +-
 Simulation/Digitization/python/PileUpUtils.py |  17 ++-
 .../python/RunDependentMCTaskIterator.py      |  12 +-
 .../PileUpProfile_run310000_splitting.py      |  19 +++
 .../RunDependentSimData/share/configCommon.py |  16 ++-
 .../share/configLumi_run310000_splitting.py   |  28 ++++
 .../share/CommonSkeletonJobOptions.py         |  10 +-
 ..._Digi_tf_multistep_presampling_CA_vs_CG.sh | 124 ++++++++++++++++
 ...ultistep_MP_presampling_reproducibility.sh | 135 ++++++++++++++++++
 Tools/Campaigns/python/MC21.py                |   8 ++
 Tools/Campaigns/python/__init__.py            |   4 +-
 .../python/CommonRunArgsToFlags.py            |  14 ++
 Tools/PyJobTransforms/python/transform.py     |  36 +++++
 Tools/PyJobTransforms/python/trfArgClasses.py |   5 +-
 Tools/PyJobTransforms/python/trfArgs.py       |   3 +
 Tools/PyJobTransforms/python/trfExe.py        |  84 +++++++++--
 .../PyJobTransforms/python/trfExeStepTools.py |  71 +++++++++
 Tools/PyJobTransforms/python/trfJobOptions.py |  22 ++-
 Tools/PyJobTransforms/python/trfMPTools.py    |   2 +
 Tools/PyJobTransforms/python/trfMTTools.py    |   4 +-
 Tools/PyJobTransforms/python/trfValidation.py |   7 +
 21 files changed, 599 insertions(+), 30 deletions(-)
 create mode 100644 Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py
 create mode 100644 Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py
 create mode 100755 Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh
 create mode 100755 Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh
 create mode 100644 Tools/PyJobTransforms/python/trfExeStepTools.py

diff --git a/Control/AthenaConfiguration/python/AllConfigFlags.py b/Control/AthenaConfiguration/python/AllConfigFlags.py
index c4c9023a4566..b39c26cfcb11 100644
--- a/Control/AthenaConfiguration/python/AllConfigFlags.py
+++ b/Control/AthenaConfiguration/python/AllConfigFlags.py
@@ -23,8 +23,12 @@ def _createCfgFlags():
     from AthenaCommon.Constants import INFO
     acf.addFlag('Exec.OutputLevel',INFO) #Global Output Level
     acf.addFlag('Exec.MaxEvents',-1) 
-    acf.addFlag("Exec.SkipEvents",0)
-    acf.addFlag("Exec.DebugStage","")
+    acf.addFlag('Exec.SkipEvents',0)
+    acf.addFlag('Exec.DebugStage','')
+
+    acf.addFlag('ExecutorSplitting.TotalSteps', 0)
+    acf.addFlag('ExecutorSplitting.Step', -1)
+    acf.addFlag('ExecutorSplitting.TotalEvents', -1)
 
     #Flags describing the input data 
     acf.addFlag('Input.Files', ["_ATHENA_GENERIC_INPUTFILE_NAME_",] ) # former global.InputFiles
diff --git a/Simulation/Digitization/python/PileUpUtils.py b/Simulation/Digitization/python/PileUpUtils.py
index 6c9cc6d7dd68..992b9d3453b2 100644
--- a/Simulation/Digitization/python/PileUpUtils.py
+++ b/Simulation/Digitization/python/PileUpUtils.py
@@ -134,6 +134,12 @@ def generatePileUpProfile(flags,
 
     jobNumber = flags.Digitization.JobNumber
     maxEvents = flags.Exec.MaxEvents
+    totalEvents = flags.Exec.MaxEvents
+    skipEvents = flags.Exec.SkipEvents
+
+    # executor splitting
+    if flags.ExecutorSplitting.TotalSteps > 1:
+        totalEvents = flags.ExecutorSplitting.TotalEvents
 
     if maxEvents == -1:
         raise SystemExit("maxEvents = %d is not supported! Please set this to the number of events per file times the number of files per job." % (
@@ -152,6 +158,10 @@ def generatePileUpProfile(flags,
     #  the number of events specified by this run is not evenly
     #  divisible by trfMaxEvents.
     generatedProfile = loadPileUpProfile(flags, profile)
+    # do executor step filtering
+    if flags.ExecutorSplitting.TotalSteps > 1:
+        generatedProfile = list(filter(lambda lb: 'step' not in lb or lb['step'] == flags.ExecutorSplitting.Step, generatedProfile))
+
     runMaxEvents = sum(lb["evts"] for lb in generatedProfile)
     logger.info("There are %d events in this run.", runMaxEvents)
     jobsPerRun = int(ceil(float(runMaxEvents)/corrMaxEvents))
@@ -172,13 +182,18 @@ def generatePileUpProfile(flags,
             jobnumber=(jobNumber-1),
             task=generatedProfile,
             maxEvents=maxEvents,
+            totalEvents=totalEvents,
+            skipEvents=skipEvents,
             sequentialEventNumbers=sequentialEventNumbers)
     else:
         # Load needed tools
         from Digitization.RunDependentMCTaskIterator import getRunLumiInfoFragment
         fragment = getRunLumiInfoFragment(
             jobnumber=(jobNumber-1),
-            task=generatedProfile, maxEvents=maxEvents,
+            task=generatedProfile,
+            maxEvents=maxEvents,
+            totalEvents=totalEvents,
+            skipEvents=skipEvents,
             sequentialEventNumbers=sequentialEventNumbers)
     
     # Remove lumiblocks with no events
diff --git a/Simulation/Digitization/python/RunDependentMCTaskIterator.py b/Simulation/Digitization/python/RunDependentMCTaskIterator.py
index 1b120b71a871..39526e31a85b 100644
--- a/Simulation/Digitization/python/RunDependentMCTaskIterator.py
+++ b/Simulation/Digitization/python/RunDependentMCTaskIterator.py
@@ -11,7 +11,7 @@ import itertools
 import random
 from operator import itemgetter
 
-def getRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEventNumbers=False):
+def getRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
     """Calculate the specific configuration of the current job in the digi
     task. Try to make each fragment utilize the same amount of CPU and
     Cache resources.  Exploits the fact that the task when sorted by
@@ -35,11 +35,11 @@ def getRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEventNumbers=False
     
     fragment=sorted(sum([hi_mu_frag,lo_mu_frag],[]),key=lambda job: job['run'])
     if sequentialEventNumbers:
-        return defineSequentialEventNumbers(jobnumber,fragment,maxEvents)
+        return defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents)
     else:
         return fragment
 
-def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEventNumbers=False):
+def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
     """Calculate the specific configuration of the current job in the digi
     task. Sample the mu values randomly.
     """
@@ -52,7 +52,7 @@ def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEve
 
     # sample mu profile
     new_frag = []
-    evt_nbr = jobnumber * maxEvents
+    evt_nbr = jobnumber * totalEvents + skipEvents
     for i in range(maxEvents):
         evt_nbr += 1
 
@@ -160,11 +160,11 @@ def taskLookupTable(task):
             table.append(i)
     return table
 
-def defineSequentialEventNumbers(jobnumber,fragment,maxEvents):
+def defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents):
     """ Calculate sequential event numbers for the defined getFragment.
     """
     new_frag = []
-    evt_nbr = jobnumber * maxEvents
+    evt_nbr = jobnumber * totalEvents + skipEvents
     for t in fragment:
         for i in range(t['evts']):
             evt_nbr += 1
diff --git a/Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py b/Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py
new file mode 100644
index 000000000000..766f51020bef
--- /dev/null
+++ b/Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py
@@ -0,0 +1,19 @@
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
+
+def setupProfile(flags, scaleTaskLength=1):
+
+  def _evts(x):
+    return int(scaleTaskLength * x)
+
+  return [
+    {'run':310000, 'lb':1, 'starttstamp':1550000000, 'dt':0.000, 'evts':_evts(1), 'mu':20.500, 'force_new':False, 'step': 0},
+    {'run':310000, 'lb':2, 'starttstamp':1550000060, 'dt':0.000, 'evts':_evts(1), 'mu':21.500, 'force_new':False, 'step': 0},
+    {'run':310000, 'lb':3, 'starttstamp':1550000120, 'dt':0.000, 'evts':_evts(1), 'mu':22.500, 'force_new':False, 'step': 0},
+    {'run':310000, 'lb':4, 'starttstamp':1550000180, 'dt':0.000, 'evts':_evts(1), 'mu':10.500, 'force_new':False, 'step': 1},
+    {'run':310000, 'lb':5, 'starttstamp':1550000240, 'dt':0.000, 'evts':_evts(1), 'mu':11.500, 'force_new':False, 'step': 1},
+    {'run':310000, 'lb':6, 'starttstamp':1550000300, 'dt':0.000, 'evts':_evts(1), 'mu':1.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':7, 'starttstamp':1550000360, 'dt':0.000, 'evts':_evts(1), 'mu':2.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':8, 'starttstamp':1550000420, 'dt':0.000, 'evts':_evts(1), 'mu':3.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':9, 'starttstamp':1550000480, 'dt':0.000, 'evts':_evts(1), 'mu':4.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':10, 'starttstamp':1550000540, 'dt':0.000, 'evts':_evts(1), 'mu':5.500, 'force_new':False, 'step': 2},
+  ]
diff --git a/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py b/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py
index fa4892859cba..d7682bf869dc 100644
--- a/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py
+++ b/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py
@@ -8,6 +8,17 @@ if 'runArgs' in dir():
     if hasattr(runArgs,"jobNumber") and hasattr(runArgs,"maxEvents"):
         trfJobNumber = runArgs.jobNumber
         trfMaxEvents = runArgs.maxEvents
+        trfTotalEvents = runArgs.maxEvents
+        trfSkipEvents = runArgs.skipEvents if hasattr(runArgs, "skipEvents") else 0
+
+        # do executor step filtering
+        if hasattr(runArgs, "totalExecutorSteps") and runArgs.totalExecutorSteps > 1:
+            JobMaker = list(filter(lambda lb: 'step' not in lb or lb['step'] == runArgs.executorStep, JobMaker))
+            if runArgs.totalExecutorSteps != len(runArgs.executorEventCounts):
+                raise ValueError("Mismatch between total executor steps and event fractions size!")
+            trfMaxEvents = runArgs.executorEventCounts[runArgs.executorStep]
+            trfSkipEvents = runArgs.executorEventSkips[runArgs.executorStep]
+
         if runArgs.maxEvents==-1:
             raise SystemExit("maxEvents = %d is not supported! Please set this to the number of events per file times the number of files per job."%(runArgs.maxEvents,))
         if not 'DoNotCorrectMaxEvents' in dir():
@@ -21,6 +32,7 @@ else:
     #this is a test job not a trf job
     trfJobNumber=1
     trfMaxEvents=10
+    trfTotalEvents=10
     corrMaxEvents=float(trfMaxEvents)
     
 #We may need to repeat this run for long production jobs.
@@ -45,11 +57,11 @@ if randomMuSampling:
     digilog.info('Mu values will be sampled randomly from the set profile.')
     #Load needed tools 
     from Digitization.RunDependentMCTaskIterator import getRandomlySampledRunLumiInfoFragment
-    fragment=getRandomlySampledRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,sequentialEventNumbers=sequentialEventNumbers)
+    fragment=getRandomlySampledRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,totalEvents=trfTotalEvents,skipEvents=trfSkipEvents,sequentialEventNumbers=sequentialEventNumbers)
 else:
     #Load needed tools 
     from Digitization.RunDependentMCTaskIterator import getRunLumiInfoFragment
-    fragment=getRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,sequentialEventNumbers=sequentialEventNumbers)
+    fragment=getRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,totalEvents=trfTotalEvents,skipEvents=trfSkipEvents,sequentialEventNumbers=sequentialEventNumbers)
 
 from RunDependentSimComps.RunLumiConfigTools import condenseRunLumiInfoFragment
 digilog.info( 'Writing RunDMC trigger configuration fragment to file.  listOfRunsEvents = %s' %
diff --git a/Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py b/Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py
new file mode 100644
index 000000000000..05fe5f48cfea
--- /dev/null
+++ b/Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
+
+# We need to be able to adjust for different dataset sizes.
+if not 'ScaleTaskLength' in dir():   ScaleTaskLength = 1
+_evts = lambda x: int(ScaleTaskLength * x)
+
+if not 'logging' in dir(): import logging
+digilog = logging.getLogger('Digi_trf')
+digilog.info('doing RunLumiOverride configuration from file.')
+JobMaker=[
+  {'run':310000, 'lb':1, 'starttstamp':1550000000, 'dt':0.000, 'evts':_evts(1), 'mu':20.500, 'force_new':False, 'step': 0},
+  {'run':310000, 'lb':2, 'starttstamp':1550000060, 'dt':0.000, 'evts':_evts(1), 'mu':21.500, 'force_new':False, 'step': 0},
+  {'run':310000, 'lb':3, 'starttstamp':1550000120, 'dt':0.000, 'evts':_evts(1), 'mu':22.500, 'force_new':False, 'step': 0},
+  {'run':310000, 'lb':4, 'starttstamp':1550000180, 'dt':0.000, 'evts':_evts(1), 'mu':10.500, 'force_new':False, 'step': 1},
+  {'run':310000, 'lb':5, 'starttstamp':1550000240, 'dt':0.000, 'evts':_evts(1), 'mu':11.500, 'force_new':False, 'step': 1},
+  {'run':310000, 'lb':6, 'starttstamp':1550000300, 'dt':0.000, 'evts':_evts(1), 'mu':1.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':7, 'starttstamp':1550000360, 'dt':0.000, 'evts':_evts(1), 'mu':2.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':8, 'starttstamp':1550000420, 'dt':0.000, 'evts':_evts(1), 'mu':3.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':9, 'starttstamp':1550000480, 'dt':0.000, 'evts':_evts(1), 'mu':4.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':10, 'starttstamp':1550000540, 'dt':0.000, 'evts':_evts(1), 'mu':5.500, 'force_new':False, 'step': 2},
+#--> end hiding
+]
+
+include('RunDependentSimData/configCommon.py')
+
+#cleanup python memory
+if not "RunDMC_testing_configuration" in dir():
+    del JobMaker
diff --git a/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py b/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py
index 8bfa9167662b..f799fe221b52 100644
--- a/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py
+++ b/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py
@@ -11,9 +11,15 @@ from AthenaCommon.AthenaCommonFlags import athenaCommonFlags
 
 ## Max/skip events
 if hasattr(runArgs,"skipEvents"):
-    athenaCommonFlags.SkipEvents.set_Value_and_Lock( runArgs.skipEvents )
+    if hasattr(runArgs,"totalExecutorSteps") and runArgs.totalExecutorSteps > 1:
+        athenaCommonFlags.SkipEvents.set_Value_and_Lock( runArgs.executorEventSkips[runArgs.executorStep] )
+    else:
+        athenaCommonFlags.SkipEvents.set_Value_and_Lock( runArgs.skipEvents )
 if hasattr(runArgs,"maxEvents"):
-    athenaCommonFlags.EvtMax.set_Value_and_Lock( runArgs.maxEvents )
+    if hasattr(runArgs,"totalExecutorSteps") and runArgs.totalExecutorSteps > 1:
+        athenaCommonFlags.EvtMax.set_Value_and_Lock( runArgs.executorEventCounts[runArgs.executorStep] )
+    else:
+        athenaCommonFlags.EvtMax.set_Value_and_Lock( runArgs.maxEvents )
 else:
     athenaCommonFlags.EvtMax=-1
 
diff --git a/Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh b/Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh
new file mode 100755
index 000000000000..c9fc9428395d
--- /dev/null
+++ b/Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh
@@ -0,0 +1,124 @@
+#!/bin/sh
+#
+# art-description: Run a digitization example to compare configuration between ConfGetter and the new ComponentAccumulator approach.
+# art-type: grid
+# art-include: master/Athena
+# art-output: multistep_presampling.CG.RDO.pool.root
+# art-output: multistep_presampling.CA.RDO.pool.root
+# art-output: log.*
+# art-output: DigiPUConfig*
+
+Events=100
+DigiOutFileNameCG="multistep_presampling.CG.RDO.pool.root"
+DigiOutFileNameCA="multistep_presampling.CA.RDO.pool.root"
+HSHitsFile="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayTests/mc16_13TeV.900149.PG_single_nu_Pt50.simul.HITS.e8307_s3482/HITS.24078104._234467.pool.root.1"
+HighPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.800831.Py8EG_minbias_inelastic_highjetphotonlepton.simul.HITS_FILT.e8341_s3687_s3704/*"
+LowPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.900311.Epos_minbias_inelastic_lowjetphoton.simul.HITS_FILT.e8341_s3687_s3704/*"
+
+
+# config only
+Digi_tf.py \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameCG} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);digitizationFlags.HighPtMinBiasInputColOffset=-1;" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 15 \
+--athenaopts '"--config-only=DigiPUConfigCG.pkl"'
+
+# full run
+Digi_tf.py \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameCG} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);digitizationFlags.HighPtMinBiasInputColOffset=-1;" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec 'HITtoRDO:job+=CfgMgr.JobOptsDumperAlg(FileName="DigiPUConfigCG.txt")' \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 15
+
+rc=$?
+status=$rc
+echo "art-result: $rc CGdigi"
+mv runargs.HITtoRDOExecutorStep0.py runargs.legacy.HITtoRDOExecutorStep0.py 
+mv runargs.HITtoRDOExecutorStep1.py runargs.legacy.HITtoRDOExecutorStep1.py 
+mv runargs.HITtoRDOExecutorStep2.py runargs.legacy.HITtoRDOExecutorStep2.py 
+mv runargs.RDOMergeAthenaMP0.py runargs.legacy.RDOMergeAthenaMP0.py
+mv log.HITtoRDOExecutorStep0 legacy.HITtoRDOExecutorStep0
+mv log.HITtoRDOExecutorStep1 legacy.HITtoRDOExecutorStep1
+mv log.HITtoRDOExecutorStep2 legacy.HITtoRDOExecutorStep2
+mv log.RDOMergeAthenaMP0 legacy.RDOMergeAthenaMP0
+
+rc2=-9999
+if [ $rc -eq 0 ]
+then
+    Digi_tf.py \
+    --CA 'HITtoRDO:True' \
+    --splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+    --detectors Truth \
+    --PileUpPresampling True \
+    --conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+    --digiSeedOffset1 170 --digiSeedOffset2 170 \
+    --digiSteeringConf "StandardSignalOnlyTruth" \
+    --geometryVersion default:ATLAS-R2-2016-01-00-01 \
+    --inputHITSFile ${HSHitsFile} \
+    --inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+    --inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+    --jobNumber 568 \
+    --maxEvents ${Events} \
+    --outputRDOFile ${DigiOutFileNameCA} \
+    --numberOfLowPtMinBias 22.4 \
+    --numberOfHighPtMinBias 0.1 \
+    --postInclude 'HITtoRDO:PyJobTransforms.UseFrontier' 'HITtoRDO:Digitization.DigitizationSteering.DigitizationTestingPostInclude' \
+    --preInclude 'HITtoRDO:Campaigns.MC20e' \
+    --preExec 'HITtoRDO:ConfigFlags.Digitization.PU.ProfileConfig="RunDependentSimData.PileUpProfile_run310000_splitting";' \
+    --skipEvents 15
+
+    rc2=$?
+    status=$rc2
+fi
+
+echo  "art-result: $rc2 CAdigi"
+
+rc3=-9999
+if [ $rc2 -eq 0 ]
+then
+    acmd.py diff-root ${DigiOutFileNameCG} ${DigiOutFileNameCA} \
+        --mode=semi-detailed --error-mode resilient --order-trees \
+        --ignore-leaves RecoTimingObj_p1_HITStoRDO_timings index_ref
+    rc3=$?
+    status=$rc3
+fi
+
+echo  "art-result: $rc3 comparison"
+
+exit $status
diff --git a/Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh b/Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh
new file mode 100755
index 000000000000..900f3b9d1a54
--- /dev/null
+++ b/Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh
@@ -0,0 +1,135 @@
+#!/bin/sh
+#
+# art-description: Run multistep pile-up presampling
+# art-type: grid
+# art-athena-mt: 8
+# art-include: master/Athena
+# art-output: multistep_presampling_SP.RDO.pool.root
+# art-output: multistep_presampling_MP_fork_evt0.RDO.pool.root
+# art-output: multistep_presampling_MP_fork_evt1.RDO.pool.root
+
+export ATHENA_CORE_NUMBER=8
+
+Events=100
+DigiOutFileNameSP="multistep_presampling_SP.RDO.pool.root"
+DigiOutFileNameMP0="multistep_presampling_MP_fork_evt0.RDO.pool.root"
+DigiOutFileNameMP1="multistep_presampling_MP_fork_evt1.RDO.pool.root"
+
+HSHitsFile="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayTests/mc16_13TeV.900149.PG_single_nu_Pt50.simul.HITS.e8307_s3482/HITS.24078104._234467.pool.root.1"
+HighPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.800831.Py8EG_minbias_inelastic_highjetphotonlepton.simul.HITS_FILT.e8341_s3687_s3704/*"
+LowPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.900311.Epos_minbias_inelastic_lowjetphoton.simul.HITS_FILT.e8341_s3687_s3704/*"
+
+
+Digi_tf.py \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameSP} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec "HITtoRDO:ServiceMgr.PileUpEventLoopMgr.AllowSerialAndMPToDiffer=False" \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 0
+
+rc=$?
+status=$rc
+echo "art-result: $rc Digi_tf.py SP"
+
+Digi_tf.py \
+--multiprocess --athenaMPEventsBeforeFork 0 \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameMP0} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec "HITtoRDO:ServiceMgr.PileUpEventLoopMgr.AllowSerialAndMPToDiffer=False" \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 0
+
+rc2=$?
+if [ $status -eq 0 ]; then
+  status=$rc2
+fi
+echo "art-result: $rc2 Digi_tf.py MP fork after 0"
+
+Digi_tf.py \
+--multiprocess --athenaMPEventsBeforeFork 1 \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameMP1} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec "HITtoRDO:ServiceMgr.PileUpEventLoopMgr.AllowSerialAndMPToDiffer=False" \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 0
+
+rc3=$?
+if [ $status -eq 0 ]; then
+  status=$rc3
+fi
+echo "art-result: $rc3 Digi_tf.py MP fork after 1"
+
+rc4=-9999
+if [ $status -eq 0 ] && [ $rc -eq 0 ] && [ $rc2 -eq 0 ]
+then
+    acmd.py diff-root ${DigiOutFileNameSP} ${DigiOutFileNameMP0} \
+        --mode=semi-detailed --error-mode resilient --order-trees \
+        --ignore-leaves RecoTimingObj_p1_Bkg_HITStoRDO_timings index_ref
+    rc4=$?
+    status=$rc4
+fi
+echo "art-result: $rc4 SP vs MP fork after 0"
+
+rc5=-9999
+if [ $status -eq 0 ] && [ $rc -eq 0 ] && [ $rc3 -eq 0 ]
+then
+    acmd.py diff-root ${DigiOutFileNameSP} ${DigiOutFileNameMP1} \
+        --mode=semi-detailed --error-mode resilient --order-trees \
+        --ignore-leaves RecoTimingObj_p1_Bkg_HITStoRDO_timings index_ref
+    rc5=$?
+    status=$rc5
+fi
+echo "art-result: $rc5 SP vs MP fork after 1"
+
+exit $status
diff --git a/Tools/Campaigns/python/MC21.py b/Tools/Campaigns/python/MC21.py
index 5e1b30bb38d7..454a411b9895 100644
--- a/Tools/Campaigns/python/MC21.py
+++ b/Tools/Campaigns/python/MC21.py
@@ -33,3 +33,11 @@ def MC21NoPileUp(flags):
 
     flags.Tile.BestPhaseFromCOOL = False
     flags.Tile.correctTime = False
+
+
+def BeamspotSplitMC21a():
+    """MC21a beamspot splitting configuration"""
+    substeps = 3
+    event_fractions = [0.3, 0.3, 0.4]
+
+    return substeps, event_fractions
diff --git a/Tools/Campaigns/python/__init__.py b/Tools/Campaigns/python/__init__.py
index 1ea80b575a1a..8c972b6fa8de 100644
--- a/Tools/Campaigns/python/__init__.py
+++ b/Tools/Campaigns/python/__init__.py
@@ -2,12 +2,12 @@
 
 from .MC16 import MC16a, MC16d, MC16e, MC16NoPileUp
 from .MC20 import MC20a, MC20d, MC20e, MC20NoPileUp
-from .MC21 import MC21a, MC21NoPileUp
+from .MC21 import MC21a, MC21NoPileUp, BeamspotSplitMC21a
 from .PhaseII import PhaseIIPileUp, PhaseIINoPileUp
 
 __all__ = [
   'MC16a', 'MC16d', 'MC16e', 'MC16NoPileUp',
   'MC20a', 'MC20d', 'MC20e', 'MC20NoPileUp',
-  'MC21a', 'MC21NoPileUp',
+  'MC21a', 'MC21NoPileUp', 'BeamspotSplitMC21a',
   'PhaseIIPileUp', 'PhaseIINoPileUp',
 ]
diff --git a/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py b/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py
index 02d7972ed534..f3962800818c 100644
--- a/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py
+++ b/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py
@@ -24,3 +24,17 @@ def commonRunArgsToFlags(runArgs,configFlags):
 
     if hasattr(runArgs,"concurrentEvents"):
         configFlags.Concurrency.NumConcurrentEvents = runArgs.concurrentEvents
+
+    ## Executor steps
+    if hasattr(runArgs,"totalExecutorSteps"):
+        configFlags.ExecutorSplitting.TotalSteps = runArgs.totalExecutorSteps
+
+    if hasattr(runArgs,"executorStep"):
+        configFlags.ExecutorSplitting.Step = runArgs.executorStep
+
+    if hasattr(runArgs,"executorEventCounts"):
+        configFlags.ExecutorSplitting.TotalEvents = configFlags.Exec.MaxEvents
+        configFlags.Exec.MaxEvents = runArgs.executorEventCounts[configFlags.ExecutorSplitting.Step]
+
+    if hasattr(runArgs,"executorEventSkips"):
+        configFlags.Exec.SkipEvents = runArgs.executorEventSkips[configFlags.ExecutorSplitting.Step]
diff --git a/Tools/PyJobTransforms/python/transform.py b/Tools/PyJobTransforms/python/transform.py
index 431bf849ced1..946bfabe469d 100644
--- a/Tools/PyJobTransforms/python/transform.py
+++ b/Tools/PyJobTransforms/python/transform.py
@@ -8,6 +8,7 @@
 #
 
 import argparse
+import copy
 import os
 import os.path as path
 import re
@@ -24,6 +25,7 @@ from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandle
 from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
 from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
 from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
+from PyJobTransforms.trfExeStepTools import executorStepSuffix, getTotalExecutorSteps
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
 from PyJobTransforms.trfUtils import isInteractiveEnv, calcCpuTime, calcWallTime
@@ -436,6 +438,9 @@ class transform(object):
                             msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
                     self._dataDictionary[dataType].name = fileName
 
+            # Do splitting if required
+            self.setupSplitting()
+
             # Now we can set the final executor configuration properly, with the final dataDictionary
             for executor in self._executors:
                 executor.conf.setFromTransform(self)
@@ -517,6 +522,37 @@ class transform(object):
         self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
         self._executorGraph.doToposort()
     
+    ## @brief Setup executor splitting
+    def setupSplitting(self):
+        if 'splitConfig' not in self._argdict:
+            return
+
+        split = []
+        for executionStep in self._executorPath:
+            baseStepName = executionStep['name']
+            if baseStepName in split:
+                continue
+
+            baseExecutor = self._executorDictionary[baseStepName]
+            splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
+            if splitting <= 1:
+                continue
+
+            msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
+            index = self._executorPath.index(executionStep)
+            baseStep = self._executorPath.pop(index)
+            for i in range(splitting):
+                name = baseStepName + executorStepSuffix + str(i)
+                step = copy.deepcopy(baseStep)
+                step['name'] = name
+                self._executorPath.insert(index + i, step)
+                executor = copy.deepcopy(baseExecutor)
+                executor.name = name
+                executor.conf.executorStep = i
+                executor.conf.totalExecutorSteps = splitting
+                self._executors.add(executor)
+                self._executorDictionary[name] = executor
+                split.append(name)
     
     ## @brief Trace the path through the executor graph
     #  @note This function might need to be called again when the number of 'substeps' is unknown
diff --git a/Tools/PyJobTransforms/python/trfArgClasses.py b/Tools/PyJobTransforms/python/trfArgClasses.py
index 5b6c6f97b3a8..80cb57a6b7a5 100644
--- a/Tools/PyJobTransforms/python/trfArgClasses.py
+++ b/Tools/PyJobTransforms/python/trfArgClasses.py
@@ -20,6 +20,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 
 from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PRWEntries, urlType, ROOTGetSize
 from PyJobTransforms.trfUtils import call
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName
 from PyJobTransforms.trfExitCodes import trfExit as trfExit
 from PyJobTransforms.trfDecorators import timelimited
 from PyJobTransforms.trfAMI import getAMIClient
@@ -1928,7 +1929,9 @@ class argSubstep(argument):
             name = exe.name
             substep = exe.substep
             first = exe.conf.firstExecutor
-            
+
+        name = commonExecutorStepName(name)
+
         value = None
         ## @note First we see if we have an explicit name or substep match, then a special 'first' or 'default' match
         if name in self._value:
diff --git a/Tools/PyJobTransforms/python/trfArgs.py b/Tools/PyJobTransforms/python/trfArgs.py
index f3756915350e..bd7c4bd2256c 100644
--- a/Tools/PyJobTransforms/python/trfArgs.py
+++ b/Tools/PyJobTransforms/python/trfArgs.py
@@ -79,6 +79,9 @@ def addAthenaArguments(parser, maxEventsDefaultSubstep='first', addValgrind=True
                         metavar='substep:POSTINCLUDE',
                         help='Python configuration fragment to include after main job options (can be optionally limited ' 
                         'to a single substep). Will split on commas: frag1.py,frag2.py is understood.')
+    parser.add_argument('--splitConfig', group = 'Athena', type=argFactory(trfArgClasses.argSubstepString),
+                        metavar='substep:SPLITCONFIG',
+                        help='Configuration file to internally split job into multiple parts (can be optionally limited to a single substep)')
     parser.add_argument('--maxEvents', group='Athena', type=argFactory(trfArgClasses.argSubstepInt, defaultSubstep=maxEventsDefaultSubstep), 
                         nargs='+', metavar='substep:maxEvents',
                         help='Set maximum events for each processing step (default substep is "{0}")'.format(maxEventsDefaultSubstep))
diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py
index 1051be1aac63..ee0e080b918d 100755
--- a/Tools/PyJobTransforms/python/trfExe.py
+++ b/Tools/PyJobTransforms/python/trfExe.py
@@ -26,6 +26,7 @@ from PyJobTransforms.trfJobOptions import JobOptionsTemplate
 from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
     cvmfsDBReleaseCheck, forceToAlphaNum, \
     ValgrindCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfLogger import stdLogLevels
 from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
@@ -64,7 +65,9 @@ class executorConfig(object):
         self._argdict = argdict
         self._dataDictionary = dataDictionary
         self._firstExecutor = firstExecutor
-       
+        self._executorStep = -1
+        self._totalExecutorSteps = 0
+
     @property
     def argdict(self):
         return self._argdict
@@ -88,7 +91,23 @@ class executorConfig(object):
     @firstExecutor.setter
     def firstExecutor(self, value):
         self._firstExecutor = value
-        
+
+    @property
+    def executorStep(self):
+        return self._executorStep
+
+    @executorStep.setter
+    def executorStep(self, value):
+        self._executorStep = value
+
+    @property
+    def totalExecutorSteps(self):
+        return self._totalExecutorSteps
+
+    @totalExecutorSteps.setter
+    def totalExecutorSteps(self, value):
+        self._totalExecutorSteps = value
+
     ## @brief Set configuration properties from the parent transform
     #  @note  It's not possible to set firstExecutor here as the transform holds
     #  the name of the first executor, which we don't know... (should we?)
@@ -190,7 +209,11 @@ class transformExecutor(object):
     @property
     def name(self):
         return self._name
-    
+
+    @name.setter
+    def name(self, value):
+        self._name = value
+
     @property
     def substep(self):
         if '_substep' in dir(self):
@@ -981,7 +1004,17 @@ class athenaExecutor(scriptExecutor):
                 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
                 self._disableMP = True
                 self._athenaMP = 0
-            
+
+        # Handle executor steps
+        if self.conf.totalExecutorSteps > 1:
+            for dataType in output:
+                if self.conf._dataDictionary[dataType].originalName:
+                    self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
+                else:
+                    self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
+                self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
+                msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
+
         # And if this is (still) athenaMP, then set some options for workers and output file report
         if self._athenaMP:
             self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
@@ -1021,7 +1054,8 @@ class athenaExecutor(scriptExecutor):
             # so that the mother process output file (if it exists) can be used directly
             # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
             for dataType in output:
-                self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
+                if self.conf.totalExecutorSteps <= 1:
+                    self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
                 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
                     if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
                         msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
@@ -1040,12 +1074,13 @@ class athenaExecutor(scriptExecutor):
             outputFiles = dict()
             for dataType in output:
                 outputFiles[dataType] = self.conf.dataDictionary[dataType]
-                
+
             # See if we have any 'extra' file arguments
+            nameForFiles = commonExecutorStepName(self._name)
             for dataType, dataArg in self.conf.dataDictionary.items():
-                if dataArg.io == 'input' and self._name in dataArg.executor:
+                if dataArg.io == 'input' and nameForFiles in dataArg.executor:
                     inputFiles[dataArg.subtype] = dataArg
-                
+
             msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
             
             # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
@@ -1105,8 +1140,34 @@ class athenaExecutor(scriptExecutor):
     def postExecute(self):
         super(athenaExecutor, self).postExecute()
 
+        # Handle executor substeps
+        if self.conf.totalExecutorSteps > 1:
+            if self._athenaMP:
+                outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
+                athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
+            if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
+                # first loop over datasets for the output
+                for dataType in self._output:
+                    newValue = []
+                    if self._athenaMP:
+                        # assume the same number of workers all the time
+                        for i in range(self.conf.totalExecutorSteps):
+                            for v in self.conf.dataDictionary[dataType].value:
+                                newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
+                                                          '_{0}{1}_'.format(executorStepSuffix, i)))
+                    else:
+                        self.conf.dataDictionary[dataType].multipleOK = True
+                        # just combine all executors
+                        for i in range(self.conf.totalExecutorSteps):
+                            newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
+                    self.conf.dataDictionary[dataType].value = newValue
+
+                    # do the merging if needed
+                    if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
+                        self._smartMerge(self.conf.dataDictionary[dataType])
+
         # If this was an athenaMP run then we need to update output files
-        if self._athenaMP:
+        elif self._athenaMP:
             outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
             ## @note Update argFile values to have the correct outputs from the MP workers
             skipFileChecks=False
@@ -1262,8 +1323,9 @@ class athenaExecutor(scriptExecutor):
         # Find options for the current substep. Name is prioritised (e.g. RAWtoESD) over alias (e.g. r2e). Last look for 'all'
         currentSubstep = None
         if 'athenaopts' in self.conf.argdict:
-            if self.name in self.conf.argdict['athenaopts'].value:
-                currentSubstep = self.name
+            currentName = commonExecutorStepName(self.name)
+            if currentName in self.conf.argdict['athenaopts'].value:
+                currentSubstep = currentName
                 if self.substep in self.conf.argdict['athenaopts'].value:
                     msg.info('Athenaopts found for {0} and {1}, joining options. '
                              'Consider changing your configuration to use just the name or the alias of the substep.'
diff --git a/Tools/PyJobTransforms/python/trfExeStepTools.py b/Tools/PyJobTransforms/python/trfExeStepTools.py
new file mode 100644
index 000000000000..463d8911ad7a
--- /dev/null
+++ b/Tools/PyJobTransforms/python/trfExeStepTools.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
+
+
+executorStepSuffix = 'ExecutorStep'
+
+
+def commonExecutorStepName(name):
+    """Remove executor step suffix from executor name."""
+    if executorStepSuffix in name:
+        name, _sep, _tail = name.rpartition(executorStepSuffix)
+    return name
+
+
+def loadSplitConfig(config):
+    """Load splitting configuration from file."""
+    parts = config.split('.')
+    if len(parts) < 2:
+        raise ValueError('Splitting config should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
+
+    function = parts[-1]
+    module = '.'.join(parts[:-1])
+
+    from importlib import import_module
+    loaded_module = import_module(module)
+    function_def = getattr(loaded_module, function)
+    return function_def()
+
+
+def getTotalExecutorSteps(executor, argdict=None):
+    """Get total executor steps from executor."""
+    if not argdict:
+        argdict = executor.conf.argdict
+    if 'splitConfig' not in argdict:
+        return 0
+
+    splitConfig = argdict['splitConfig'].returnMyValue(exe=executor)
+    if not splitConfig:
+        return 0
+
+    steps, fractions = loadSplitConfig(splitConfig)
+    return steps
+
+
+def getExecutorStepEventCounts(executor, argdict=None):
+    """Get executor step event counts from executor config."""
+    if not argdict:
+        argdict = executor.conf.argdict
+    if 'splitConfig' not in argdict:
+        return []
+
+    maxEvents = argdict['maxEvents'].returnMyValue(exe=executor)
+    skipEvents = argdict['skipEvents'].returnMyValue(exe=executor)
+    splitConfig = argdict['splitConfig'].returnMyValue(exe=executor)
+    if not splitConfig:
+        return []
+
+    steps, fractions = loadSplitConfig(splitConfig)
+
+    if sum(fractions) != 1:
+        raise ValueError('Event fractions should total to 1!')
+
+    counts = []
+    for i in range(len(fractions) - 1):
+        counts.append(round(maxEvents * fractions[i]))
+    counts.append(maxEvents - sum(counts))
+
+    sums = []
+    for i in range(len(fractions)):
+        sums.append(skipEvents + sum(counts[:i]))
+
+    return counts, sums
diff --git a/Tools/PyJobTransforms/python/trfJobOptions.py b/Tools/PyJobTransforms/python/trfJobOptions.py
index ba10d6d1778a..6e898353908c 100644
--- a/Tools/PyJobTransforms/python/trfJobOptions.py
+++ b/Tools/PyJobTransforms/python/trfJobOptions.py
@@ -15,6 +15,7 @@ msg = logging.getLogger(__name__)
 
 import PyJobTransforms.trfArgClasses as trfArgClasses
 import PyJobTransforms.trfExceptions as trfExceptions
+from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
 from PyJobTransforms.trfExitCodes import trfExit
 
 from PyJobTransforms.trfUtils import findFile
@@ -208,6 +209,21 @@ class JobOptionsTemplate(object):
                         print(f"AthenaMPJobProps.AthenaMPFlags.UseSharedWriter={self._exe.conf.argdict['sharedWriter'].value}", file=runargsFile)
                     if 'parallelCompression' in self._exe.conf.argdict:
                         print(f"AthenaMPJobProps.AthenaMPFlags.UseParallelCompression={self._exe.conf.argdict['parallelCompression'].value}", file=runargsFile)
+
+                # Executor substeps
+                print(os.linesep, '# Executor flags', file=runargsFile)
+                msg.debug('Adding runarg {0!s}={1!r}'.format('totalExecutorSteps', self._exe.conf.totalExecutorSteps))
+                print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'totalExecutorSteps', self._exe.conf.totalExecutorSteps), file=runargsFile)
+                if self._exe.conf.executorStep >= 0:
+                    msg.debug('Adding runarg {0!s}={1!r}'.format('executorStep', self._exe.conf.executorStep))
+                    print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorStep', self._exe.conf.executorStep), file=runargsFile)
+                    executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._exe)
+                    msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventCounts', executorEventCounts))
+                    print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventCounts', executorEventCounts), file=runargsFile)
+                    msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventSkips', executorEventSkips))
+                    print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventSkips', executorEventSkips), file=runargsFile)
+
+                # CA
                 if self._exe._isCAEnabled():
                     print(os.linesep, '# Threading flags', file=runargsFile)
                     #Pass the number of threads
@@ -223,7 +239,7 @@ class JobOptionsTemplate(object):
                     print('fromRunArgs({0})'.format(self._runArgsName),file=runargsFile)
 
                 msg.info('Successfully wrote runargs file {0}'.format(self._runArgsFile))
-                
+
             except (IOError, OSError) as e:
                 errMsg = 'Got an error when writing JO template {0}: {1}'.format(self._runArgsFile, e)
                 msg.error(errMsg)
@@ -251,7 +267,9 @@ class JobOptionsTemplate(object):
     #  @param input Input file list
     #  @param output Output file list
     #  @return List of runargs and skeletons to be processed by athena
-    def getTopOptions(self, input = dict(), output = dict()): 
+    def getTopOptions(self, input = dict(), output = dict()):
+        # Update the output name
+        self._runArgsFile = 'runargs.' + self._exe.name + '.py'
         # First Make the runArgs file:
         self.writeRunArgs(input = input, output = output)
         # Make sure runArgs and skeleton are valid
diff --git a/Tools/PyJobTransforms/python/trfMPTools.py b/Tools/PyJobTransforms/python/trfMPTools.py
index 926816bde4ed..125f47696cc8 100644
--- a/Tools/PyJobTransforms/python/trfMPTools.py
+++ b/Tools/PyJobTransforms/python/trfMPTools.py
@@ -16,6 +16,7 @@ msg = logging.getLogger(__name__)
 
 from xml.etree import ElementTree
 
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName
 from PyJobTransforms.trfExitCodes import trfExit
 
 import PyJobTransforms.trfExceptions as trfExceptions
@@ -25,6 +26,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 #  @return Integer with the number of processes, N.B. 0 means non-MP serial mode
 def detectAthenaMPProcs(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
     athenaMPProcs = 0
+    currentSubstep = commonExecutorStepName(currentSubstep)
     
     # Try and detect if any AthenaMP has been enabled 
     try:
diff --git a/Tools/PyJobTransforms/python/trfMTTools.py b/Tools/PyJobTransforms/python/trfMTTools.py
index c025e04c9860..1b476b75f8f1 100644
--- a/Tools/PyJobTransforms/python/trfMTTools.py
+++ b/Tools/PyJobTransforms/python/trfMTTools.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
 
 ## @package PyJobTransforms.trfMTTools
 #
@@ -12,6 +12,7 @@ import os
 import logging
 msg = logging.getLogger(__name__)
 
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName
 from PyJobTransforms.trfExitCodes import trfExit
 
 import PyJobTransforms.trfExceptions as trfExceptions
@@ -22,6 +23,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 def detectAthenaMTThreads(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
     athenaMTThreads = 0
     athenaConcurrentEvents = 0
+    currentSubstep = commonExecutorStepName(currentSubstep)
 
     if legacyThreadingRelease:
         return athenaMTThreads, athenaConcurrentEvents
diff --git a/Tools/PyJobTransforms/python/trfValidation.py b/Tools/PyJobTransforms/python/trfValidation.py
index 9545c52c6511..238507b896ec 100644
--- a/Tools/PyJobTransforms/python/trfValidation.py
+++ b/Tools/PyJobTransforms/python/trfValidation.py
@@ -22,6 +22,7 @@ msg = logging.getLogger(__name__)
 
 from PyUtils import RootUtils
 
+from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfLogger import stdLogLevels
 from PyJobTransforms.trfArgClasses import argFile
@@ -989,6 +990,12 @@ class eventMatch(object):
             else:
                 self._maxEvents = None
 
+            # Executor substeps handling
+            if self._executor.conf.totalExecutorSteps > 1 and self._executor.conf.executorStep < self._executor.conf.totalExecutorSteps - 1:
+                executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._executor)
+                self._maxEvents = executorEventCounts[self._executor.conf.executorStep]
+                self._skipEvents = executorEventSkips[self._executor.conf.executorStep]
+
             # Global eventAcceptanceEfficiency set?
             if "eventAcceptanceEfficiency" in self._executor.conf.argdict:
                 self._evAccEff = self._executor.conf.argdict['eventAcceptanceEfficiency'].returnMyValue(exe=self._executor)
-- 
GitLab