diff --git a/Control/AthenaConfiguration/python/AllConfigFlags.py b/Control/AthenaConfiguration/python/AllConfigFlags.py
index c4c9023a45662e0532c56110db477d755f416b80..b39c26cfcb1116d85c0b93563e86e85b34065106 100644
--- a/Control/AthenaConfiguration/python/AllConfigFlags.py
+++ b/Control/AthenaConfiguration/python/AllConfigFlags.py
@@ -23,8 +23,12 @@ def _createCfgFlags():
     from AthenaCommon.Constants import INFO
     acf.addFlag('Exec.OutputLevel',INFO) #Global Output Level
     acf.addFlag('Exec.MaxEvents',-1) 
-    acf.addFlag("Exec.SkipEvents",0)
-    acf.addFlag("Exec.DebugStage","")
+    acf.addFlag('Exec.SkipEvents',0)
+    acf.addFlag('Exec.DebugStage','')
+
+    acf.addFlag('ExecutorSplitting.TotalSteps', 0)
+    acf.addFlag('ExecutorSplitting.Step', -1)
+    acf.addFlag('ExecutorSplitting.TotalEvents', -1)
 
     #Flags describing the input data 
     acf.addFlag('Input.Files', ["_ATHENA_GENERIC_INPUTFILE_NAME_",] ) # former global.InputFiles
diff --git a/Simulation/Digitization/python/PileUpUtils.py b/Simulation/Digitization/python/PileUpUtils.py
index 6c9cc6d7dd681d144876dc36013024f5dd6e2db6..992b9d3453b263a16d58699141723497b26d7030 100644
--- a/Simulation/Digitization/python/PileUpUtils.py
+++ b/Simulation/Digitization/python/PileUpUtils.py
@@ -134,6 +134,12 @@ def generatePileUpProfile(flags,
 
     jobNumber = flags.Digitization.JobNumber
     maxEvents = flags.Exec.MaxEvents
+    totalEvents = flags.Exec.MaxEvents
+    skipEvents = flags.Exec.SkipEvents
+
+    # executor splitting
+    if flags.ExecutorSplitting.TotalSteps > 1:
+        totalEvents = flags.ExecutorSplitting.TotalEvents
 
     if maxEvents == -1:
         raise SystemExit("maxEvents = %d is not supported! Please set this to the number of events per file times the number of files per job." % (
@@ -152,6 +158,10 @@ def generatePileUpProfile(flags,
     #  the number of events specified by this run is not evenly
     #  divisible by trfMaxEvents.
     generatedProfile = loadPileUpProfile(flags, profile)
+    # do executor step filtering
+    if flags.ExecutorSplitting.TotalSteps > 1:
+        generatedProfile = list(filter(lambda lb: 'step' not in lb or lb['step'] == flags.ExecutorSplitting.Step, generatedProfile))
+
     runMaxEvents = sum(lb["evts"] for lb in generatedProfile)
     logger.info("There are %d events in this run.", runMaxEvents)
     jobsPerRun = int(ceil(float(runMaxEvents)/corrMaxEvents))
@@ -172,13 +182,18 @@ def generatePileUpProfile(flags,
             jobnumber=(jobNumber-1),
             task=generatedProfile,
             maxEvents=maxEvents,
+            totalEvents=totalEvents,
+            skipEvents=skipEvents,
             sequentialEventNumbers=sequentialEventNumbers)
     else:
         # Load needed tools
         from Digitization.RunDependentMCTaskIterator import getRunLumiInfoFragment
         fragment = getRunLumiInfoFragment(
             jobnumber=(jobNumber-1),
-            task=generatedProfile, maxEvents=maxEvents,
+            task=generatedProfile,
+            maxEvents=maxEvents,
+            totalEvents=totalEvents,
+            skipEvents=skipEvents,
             sequentialEventNumbers=sequentialEventNumbers)
     
     # Remove lumiblocks with no events
diff --git a/Simulation/Digitization/python/RunDependentMCTaskIterator.py b/Simulation/Digitization/python/RunDependentMCTaskIterator.py
index 1b120b71a871217b5e01831928d6312f0c26698b..39526e31a85bdf90f8426e7b1719e13d2559b720 100644
--- a/Simulation/Digitization/python/RunDependentMCTaskIterator.py
+++ b/Simulation/Digitization/python/RunDependentMCTaskIterator.py
@@ -11,7 +11,7 @@ import itertools
 import random
 from operator import itemgetter
 
-def getRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEventNumbers=False):
+def getRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
     """Calculate the specific configuration of the current job in the digi
     task. Try to make each fragment utilize the same amount of CPU and
     Cache resources.  Exploits the fact that the task when sorted by
@@ -35,11 +35,11 @@ def getRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEventNumbers=False
     
     fragment=sorted(sum([hi_mu_frag,lo_mu_frag],[]),key=lambda job: job['run'])
     if sequentialEventNumbers:
-        return defineSequentialEventNumbers(jobnumber,fragment,maxEvents)
+        return defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents)
     else:
         return fragment
 
-def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEventNumbers=False):
+def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,totalEvents,skipEvents,sequentialEventNumbers=False):
     """Calculate the specific configuration of the current job in the digi
     task. Sample the mu values randomly.
     """
@@ -52,7 +52,7 @@ def getRandomlySampledRunLumiInfoFragment(jobnumber,task,maxEvents,sequentialEve
 
     # sample mu profile
     new_frag = []
-    evt_nbr = jobnumber * maxEvents
+    evt_nbr = jobnumber * totalEvents + skipEvents
     for i in range(maxEvents):
         evt_nbr += 1
 
@@ -160,11 +160,11 @@ def taskLookupTable(task):
             table.append(i)
     return table
 
-def defineSequentialEventNumbers(jobnumber,fragment,maxEvents):
+def defineSequentialEventNumbers(jobnumber,fragment,totalEvents,skipEvents):
     """ Calculate sequential event numbers for the defined getFragment.
     """
     new_frag = []
-    evt_nbr = jobnumber * maxEvents
+    evt_nbr = jobnumber * totalEvents + skipEvents
     for t in fragment:
         for i in range(t['evts']):
             evt_nbr += 1
diff --git a/Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py b/Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py
new file mode 100644
index 0000000000000000000000000000000000000000..766f51020bef9c70a3ba433db4457645189741fa
--- /dev/null
+++ b/Simulation/RunDependentSim/RunDependentSimData/python/PileUpProfile_run310000_splitting.py
@@ -0,0 +1,19 @@
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
+
+def setupProfile(flags, scaleTaskLength=1):
+
+  def _evts(x):
+    return int(scaleTaskLength * x)
+
+  return [
+    {'run':310000, 'lb':1, 'starttstamp':1550000000, 'dt':0.000, 'evts':_evts(1), 'mu':20.500, 'force_new':False, 'step': 0},
+    {'run':310000, 'lb':2, 'starttstamp':1550000060, 'dt':0.000, 'evts':_evts(1), 'mu':21.500, 'force_new':False, 'step': 0},
+    {'run':310000, 'lb':3, 'starttstamp':1550000120, 'dt':0.000, 'evts':_evts(1), 'mu':22.500, 'force_new':False, 'step': 0},
+    {'run':310000, 'lb':4, 'starttstamp':1550000180, 'dt':0.000, 'evts':_evts(1), 'mu':10.500, 'force_new':False, 'step': 1},
+    {'run':310000, 'lb':5, 'starttstamp':1550000240, 'dt':0.000, 'evts':_evts(1), 'mu':11.500, 'force_new':False, 'step': 1},
+    {'run':310000, 'lb':6, 'starttstamp':1550000300, 'dt':0.000, 'evts':_evts(1), 'mu':1.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':7, 'starttstamp':1550000360, 'dt':0.000, 'evts':_evts(1), 'mu':2.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':8, 'starttstamp':1550000420, 'dt':0.000, 'evts':_evts(1), 'mu':3.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':9, 'starttstamp':1550000480, 'dt':0.000, 'evts':_evts(1), 'mu':4.500, 'force_new':False, 'step': 2},
+    {'run':310000, 'lb':10, 'starttstamp':1550000540, 'dt':0.000, 'evts':_evts(1), 'mu':5.500, 'force_new':False, 'step': 2},
+  ]
diff --git a/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py b/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py
index fa4892859cbafc2e0eb9ee78433219aebec8d97e..d7682bf869dca328a034c6c9f69c83e3403af74a 100644
--- a/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py
+++ b/Simulation/RunDependentSim/RunDependentSimData/share/configCommon.py
@@ -8,6 +8,17 @@ if 'runArgs' in dir():
     if hasattr(runArgs,"jobNumber") and hasattr(runArgs,"maxEvents"):
         trfJobNumber = runArgs.jobNumber
         trfMaxEvents = runArgs.maxEvents
+        trfTotalEvents = runArgs.maxEvents
+        trfSkipEvents = runArgs.skipEvents if hasattr(runArgs, "skipEvents") else 0
+
+        # do executor step filtering
+        if hasattr(runArgs, "totalExecutorSteps") and runArgs.totalExecutorSteps > 1:
+            JobMaker = list(filter(lambda lb: 'step' not in lb or lb['step'] == runArgs.executorStep, JobMaker))
+            if runArgs.totalExecutorSteps != len(runArgs.executorEventCounts):
+                raise ValueError("Mismatch between total executor steps and event fractions size!")
+            trfMaxEvents = runArgs.executorEventCounts[runArgs.executorStep]
+            trfSkipEvents = runArgs.executorEventSkips[runArgs.executorStep]
+
         if runArgs.maxEvents==-1:
             raise SystemExit("maxEvents = %d is not supported! Please set this to the number of events per file times the number of files per job."%(runArgs.maxEvents,))
         if not 'DoNotCorrectMaxEvents' in dir():
@@ -21,6 +32,7 @@ else:
     #this is a test job not a trf job
     trfJobNumber=1
     trfMaxEvents=10
+    trfTotalEvents=10
     corrMaxEvents=float(trfMaxEvents)
     
 #We may need to repeat this run for long production jobs.
@@ -45,11 +57,11 @@ if randomMuSampling:
     digilog.info('Mu values will be sampled randomly from the set profile.')
     #Load needed tools 
     from Digitization.RunDependentMCTaskIterator import getRandomlySampledRunLumiInfoFragment
-    fragment=getRandomlySampledRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,sequentialEventNumbers=sequentialEventNumbers)
+    fragment=getRandomlySampledRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,totalEvents=trfTotalEvents,skipEvents=trfSkipEvents,sequentialEventNumbers=sequentialEventNumbers)
 else:
     #Load needed tools 
     from Digitization.RunDependentMCTaskIterator import getRunLumiInfoFragment
-    fragment=getRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,sequentialEventNumbers=sequentialEventNumbers)
+    fragment=getRunLumiInfoFragment(jobnumber=(trfJobNumber-1),task=JobMaker,maxEvents=trfMaxEvents,totalEvents=trfTotalEvents,skipEvents=trfSkipEvents,sequentialEventNumbers=sequentialEventNumbers)
 
 from RunDependentSimComps.RunLumiConfigTools import condenseRunLumiInfoFragment
 digilog.info( 'Writing RunDMC trigger configuration fragment to file.  listOfRunsEvents = %s' %
diff --git a/Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py b/Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py
new file mode 100644
index 0000000000000000000000000000000000000000..05fe5f48cfeaf58620f42a1576798d47247af228
--- /dev/null
+++ b/Simulation/RunDependentSim/RunDependentSimData/share/configLumi_run310000_splitting.py
@@ -0,0 +1,28 @@
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
+
+# We need to be able to adjust for different dataset sizes.
+if not 'ScaleTaskLength' in dir():   ScaleTaskLength = 1
+_evts = lambda x: int(ScaleTaskLength * x)
+
+if not 'logging' in dir(): import logging
+digilog = logging.getLogger('Digi_trf')
+digilog.info('doing RunLumiOverride configuration from file.')
+JobMaker=[
+  {'run':310000, 'lb':1, 'starttstamp':1550000000, 'dt':0.000, 'evts':_evts(1), 'mu':20.500, 'force_new':False, 'step': 0},
+  {'run':310000, 'lb':2, 'starttstamp':1550000060, 'dt':0.000, 'evts':_evts(1), 'mu':21.500, 'force_new':False, 'step': 0},
+  {'run':310000, 'lb':3, 'starttstamp':1550000120, 'dt':0.000, 'evts':_evts(1), 'mu':22.500, 'force_new':False, 'step': 0},
+  {'run':310000, 'lb':4, 'starttstamp':1550000180, 'dt':0.000, 'evts':_evts(1), 'mu':10.500, 'force_new':False, 'step': 1},
+  {'run':310000, 'lb':5, 'starttstamp':1550000240, 'dt':0.000, 'evts':_evts(1), 'mu':11.500, 'force_new':False, 'step': 1},
+  {'run':310000, 'lb':6, 'starttstamp':1550000300, 'dt':0.000, 'evts':_evts(1), 'mu':1.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':7, 'starttstamp':1550000360, 'dt':0.000, 'evts':_evts(1), 'mu':2.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':8, 'starttstamp':1550000420, 'dt':0.000, 'evts':_evts(1), 'mu':3.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':9, 'starttstamp':1550000480, 'dt':0.000, 'evts':_evts(1), 'mu':4.500, 'force_new':False, 'step': 2},
+  {'run':310000, 'lb':10, 'starttstamp':1550000540, 'dt':0.000, 'evts':_evts(1), 'mu':5.500, 'force_new':False, 'step': 2},
+#--> end hiding
+]
+
+include('RunDependentSimData/configCommon.py')
+
+#cleanup python memory
+if not "RunDMC_testing_configuration" in dir():
+    del JobMaker
diff --git a/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py b/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py
index 8bfa9167662b38b15d997e7ed0baa74da076e09c..f799fe221b527b42bc1801a2172b82de138ee4af 100644
--- a/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py
+++ b/Simulation/SimuJobTransforms/share/CommonSkeletonJobOptions.py
@@ -11,9 +11,15 @@ from AthenaCommon.AthenaCommonFlags import athenaCommonFlags
 
 ## Max/skip events
 if hasattr(runArgs,"skipEvents"):
-    athenaCommonFlags.SkipEvents.set_Value_and_Lock( runArgs.skipEvents )
+    if hasattr(runArgs,"totalExecutorSteps") and runArgs.totalExecutorSteps > 1:
+        athenaCommonFlags.SkipEvents.set_Value_and_Lock( runArgs.executorEventSkips[runArgs.executorStep] )
+    else:
+        athenaCommonFlags.SkipEvents.set_Value_and_Lock( runArgs.skipEvents )
 if hasattr(runArgs,"maxEvents"):
-    athenaCommonFlags.EvtMax.set_Value_and_Lock( runArgs.maxEvents )
+    if hasattr(runArgs,"totalExecutorSteps") and runArgs.totalExecutorSteps > 1:
+        athenaCommonFlags.EvtMax.set_Value_and_Lock( runArgs.executorEventCounts[runArgs.executorStep] )
+    else:
+        athenaCommonFlags.EvtMax.set_Value_and_Lock( runArgs.maxEvents )
 else:
     athenaCommonFlags.EvtMax=-1
 
diff --git a/Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh b/Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh
new file mode 100755
index 0000000000000000000000000000000000000000..c9fc9428395d5312029d8e3be67eccdf0827e77b
--- /dev/null
+++ b/Simulation/Tests/DigitizationTests/test/test_Digi_tf_multistep_presampling_CA_vs_CG.sh
@@ -0,0 +1,124 @@
+#!/bin/sh
+#
+# art-description: Run a digitization example to compare configuration between ConfGetter and the new ComponentAccumulator approach.
+# art-type: grid
+# art-include: master/Athena
+# art-output: multistep_presampling.CG.RDO.pool.root
+# art-output: multistep_presampling.CA.RDO.pool.root
+# art-output: log.*
+# art-output: DigiPUConfig*
+
+Events=100
+DigiOutFileNameCG="multistep_presampling.CG.RDO.pool.root"
+DigiOutFileNameCA="multistep_presampling.CA.RDO.pool.root"
+HSHitsFile="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayTests/mc16_13TeV.900149.PG_single_nu_Pt50.simul.HITS.e8307_s3482/HITS.24078104._234467.pool.root.1"
+HighPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.800831.Py8EG_minbias_inelastic_highjetphotonlepton.simul.HITS_FILT.e8341_s3687_s3704/*"
+LowPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.900311.Epos_minbias_inelastic_lowjetphoton.simul.HITS_FILT.e8341_s3687_s3704/*"
+
+
+# config only
+Digi_tf.py \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameCG} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);digitizationFlags.HighPtMinBiasInputColOffset=-1;" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 15 \
+--athenaopts '"--config-only=DigiPUConfigCG.pkl"'
+
+# full run
+Digi_tf.py \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameCG} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);digitizationFlags.HighPtMinBiasInputColOffset=-1;" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec 'HITtoRDO:job+=CfgMgr.JobOptsDumperAlg(FileName="DigiPUConfigCG.txt")' \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 15
+
+rc=$?
+status=$rc
+echo "art-result: $rc CGdigi"
+mv runargs.HITtoRDOExecutorStep0.py runargs.legacy.HITtoRDOExecutorStep0.py 
+mv runargs.HITtoRDOExecutorStep1.py runargs.legacy.HITtoRDOExecutorStep1.py 
+mv runargs.HITtoRDOExecutorStep2.py runargs.legacy.HITtoRDOExecutorStep2.py 
+mv runargs.RDOMergeAthenaMP0.py runargs.legacy.RDOMergeAthenaMP0.py
+mv log.HITtoRDOExecutorStep0 legacy.HITtoRDOExecutorStep0
+mv log.HITtoRDOExecutorStep1 legacy.HITtoRDOExecutorStep1
+mv log.HITtoRDOExecutorStep2 legacy.HITtoRDOExecutorStep2
+mv log.RDOMergeAthenaMP0 legacy.RDOMergeAthenaMP0
+
+rc2=-9999
+if [ $rc -eq 0 ]
+then
+    Digi_tf.py \
+    --CA 'HITtoRDO:True' \
+    --splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+    --detectors Truth \
+    --PileUpPresampling True \
+    --conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+    --digiSeedOffset1 170 --digiSeedOffset2 170 \
+    --digiSteeringConf "StandardSignalOnlyTruth" \
+    --geometryVersion default:ATLAS-R2-2016-01-00-01 \
+    --inputHITSFile ${HSHitsFile} \
+    --inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+    --inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+    --jobNumber 568 \
+    --maxEvents ${Events} \
+    --outputRDOFile ${DigiOutFileNameCA} \
+    --numberOfLowPtMinBias 22.4 \
+    --numberOfHighPtMinBias 0.1 \
+    --postInclude 'HITtoRDO:PyJobTransforms.UseFrontier' 'HITtoRDO:Digitization.DigitizationSteering.DigitizationTestingPostInclude' \
+    --preInclude 'HITtoRDO:Campaigns.MC20e' \
+    --preExec 'HITtoRDO:ConfigFlags.Digitization.PU.ProfileConfig="RunDependentSimData.PileUpProfile_run310000_splitting";' \
+    --skipEvents 15
+
+    rc2=$?
+    status=$rc2
+fi
+
+echo  "art-result: $rc2 CAdigi"
+
+rc3=-9999
+if [ $rc2 -eq 0 ]
+then
+    acmd.py diff-root ${DigiOutFileNameCG} ${DigiOutFileNameCA} \
+        --mode=semi-detailed --error-mode resilient --order-trees \
+        --ignore-leaves RecoTimingObj_p1_HITStoRDO_timings index_ref
+    rc3=$?
+    status=$rc3
+fi
+
+echo  "art-result: $rc3 comparison"
+
+exit $status
diff --git a/Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh b/Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh
new file mode 100755
index 0000000000000000000000000000000000000000..900f3b9d1a542c023b45206b7893316cd904c346
--- /dev/null
+++ b/Simulation/Tests/DigitizationTestsMT/test/test_Digi_tf_multistep_MP_presampling_reproducibility.sh
@@ -0,0 +1,135 @@
+#!/bin/sh
+#
+# art-description: Run multistep pile-up presampling
+# art-type: grid
+# art-athena-mt: 8
+# art-include: master/Athena
+# art-output: multistep_presampling_SP.RDO.pool.root
+# art-output: multistep_presampling_MP_fork_evt0.RDO.pool.root
+# art-output: multistep_presampling_MP_fork_evt1.RDO.pool.root
+
+export ATHENA_CORE_NUMBER=8
+
+Events=100
+DigiOutFileNameSP="multistep_presampling_SP.RDO.pool.root"
+DigiOutFileNameMP0="multistep_presampling_MP_fork_evt0.RDO.pool.root"
+DigiOutFileNameMP1="multistep_presampling_MP_fork_evt1.RDO.pool.root"
+
+HSHitsFile="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/OverlayTests/mc16_13TeV.900149.PG_single_nu_Pt50.simul.HITS.e8307_s3482/HITS.24078104._234467.pool.root.1"
+HighPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.800831.Py8EG_minbias_inelastic_highjetphotonlepton.simul.HITS_FILT.e8341_s3687_s3704/*"
+LowPtMinbiasHitsFiles="/cvmfs/atlas-nightlies.cern.ch/repo/data/data-art/Tier0ChainTests/mc16_13TeV.900311.Epos_minbias_inelastic_lowjetphoton.simul.HITS_FILT.e8341_s3687_s3704/*"
+
+
+Digi_tf.py \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameSP} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec "HITtoRDO:ServiceMgr.PileUpEventLoopMgr.AllowSerialAndMPToDiffer=False" \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 0
+
+rc=$?
+status=$rc
+echo "art-result: $rc Digi_tf.py SP"
+
+Digi_tf.py \
+--multiprocess --athenaMPEventsBeforeFork 0 \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameMP0} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec "HITtoRDO:ServiceMgr.PileUpEventLoopMgr.AllowSerialAndMPToDiffer=False" \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 0
+
+rc2=$?
+if [ $status -eq 0 ]; then
+  status=$rc2
+fi
+echo "art-result: $rc2 Digi_tf.py MP fork after 0"
+
+Digi_tf.py \
+--multiprocess --athenaMPEventsBeforeFork 1 \
+--splitConfig 'HITtoRDO:Campaigns.BeamspotSplitMC21a' \
+--detectors Truth \
+--PileUpPresampling True \
+--conditionsTag default:OFLCOND-MC16-SDR-RUN2-08 \
+--digiSeedOffset1 170 --digiSeedOffset2 170 \
+--digiSteeringConf "StandardSignalOnlyTruth" \
+--geometryVersion default:ATLAS-R2-2016-01-00-01 \
+--inputHITSFile ${HSHitsFile} \
+--inputHighPtMinbiasHitsFile ${HighPtMinbiasHitsFiles} \
+--inputLowPtMinbiasHitsFile ${LowPtMinbiasHitsFiles} \
+--jobNumber 568 \
+--maxEvents ${Events} \
+--outputRDOFile ${DigiOutFileNameMP1} \
+--numberOfLowPtMinBias 22.4 \
+--numberOfHighPtMinBias 0.1 \
+--pileupInitialBunch -32 \
+--pileupFinalBunch 6 \
+--preExec "HITtoRDO:from Digitization.DigitizationFlags import digitizationFlags;digitizationFlags.OldBeamSpotZSize.set_Value_and_Lock(0);" \
+--preInclude 'HITtoRDO:Campaigns/MC20e.py,RunDependentSimData/configEvtNbr_sequential.py,Digitization/ForceUseOfPileUpTools.py,SimulationJobOptions/preInlcude.PileUpBunchTrainsMC16c_2017_Config1.py,RunDependentSimData/configLumi_run310000_splitting.py' \
+--postExec "HITtoRDO:ServiceMgr.PileUpEventLoopMgr.AllowSerialAndMPToDiffer=False" \
+--postInclude 'default:PyJobTransforms/UseFrontier.py' \
+--skipEvents 0
+
+rc3=$?
+if [ $status -eq 0 ]; then
+  status=$rc3
+fi
+echo "art-result: $rc3 Digi_tf.py MP fork after 1"
+
+rc4=-9999
+if [ $status -eq 0 ] && [ $rc -eq 0 ] && [ $rc2 -eq 0 ]
+then
+    acmd.py diff-root ${DigiOutFileNameSP} ${DigiOutFileNameMP0} \
+        --mode=semi-detailed --error-mode resilient --order-trees \
+        --ignore-leaves RecoTimingObj_p1_Bkg_HITStoRDO_timings index_ref
+    rc4=$?
+    status=$rc4
+fi
+echo "art-result: $rc4 SP vs MP fork after 0"
+
+rc5=-9999
+if [ $status -eq 0 ] && [ $rc -eq 0 ] && [ $rc3 -eq 0 ]
+then
+    acmd.py diff-root ${DigiOutFileNameSP} ${DigiOutFileNameMP1} \
+        --mode=semi-detailed --error-mode resilient --order-trees \
+        --ignore-leaves RecoTimingObj_p1_Bkg_HITStoRDO_timings index_ref
+    rc5=$?
+    status=$rc5
+fi
+echo "art-result: $rc5 SP vs MP fork after 1"
+
+exit $status
diff --git a/Tools/Campaigns/python/MC21.py b/Tools/Campaigns/python/MC21.py
index 5e1b30bb38d77e1db4198ecee9c782e37cef5c7d..454a411b989576a9025b28322697d67409608070 100644
--- a/Tools/Campaigns/python/MC21.py
+++ b/Tools/Campaigns/python/MC21.py
@@ -33,3 +33,11 @@ def MC21NoPileUp(flags):
 
     flags.Tile.BestPhaseFromCOOL = False
     flags.Tile.correctTime = False
+
+
+def BeamspotSplitMC21a():
+    """MC21a beamspot splitting configuration"""
+    substeps = 3
+    event_fractions = [0.3, 0.3, 0.4]
+
+    return substeps, event_fractions
diff --git a/Tools/Campaigns/python/__init__.py b/Tools/Campaigns/python/__init__.py
index 1ea80b575a1a7e23eb9d3bc6316b5902a26044a1..8c972b6fa8de247813aa7d4125f778ee20de75f0 100644
--- a/Tools/Campaigns/python/__init__.py
+++ b/Tools/Campaigns/python/__init__.py
@@ -2,12 +2,12 @@
 
 from .MC16 import MC16a, MC16d, MC16e, MC16NoPileUp
 from .MC20 import MC20a, MC20d, MC20e, MC20NoPileUp
-from .MC21 import MC21a, MC21NoPileUp
+from .MC21 import MC21a, MC21NoPileUp, BeamspotSplitMC21a
 from .PhaseII import PhaseIIPileUp, PhaseIINoPileUp
 
 __all__ = [
   'MC16a', 'MC16d', 'MC16e', 'MC16NoPileUp',
   'MC20a', 'MC20d', 'MC20e', 'MC20NoPileUp',
-  'MC21a', 'MC21NoPileUp',
+  'MC21a', 'MC21NoPileUp', 'BeamspotSplitMC21a',
   'PhaseIIPileUp', 'PhaseIINoPileUp',
 ]
diff --git a/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py b/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py
index 02d7972ed5349f59ff49619aa7a7455ffc6a211b..f3962800818c5d62cea4ac85078ae0ea6958167a 100644
--- a/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py
+++ b/Tools/PyJobTransforms/python/CommonRunArgsToFlags.py
@@ -24,3 +24,17 @@ def commonRunArgsToFlags(runArgs,configFlags):
 
     if hasattr(runArgs,"concurrentEvents"):
         configFlags.Concurrency.NumConcurrentEvents = runArgs.concurrentEvents
+
+    ## Executor steps
+    if hasattr(runArgs,"totalExecutorSteps"):
+        configFlags.ExecutorSplitting.TotalSteps = runArgs.totalExecutorSteps
+
+    if hasattr(runArgs,"executorStep"):
+        configFlags.ExecutorSplitting.Step = runArgs.executorStep
+
+    if hasattr(runArgs,"executorEventCounts"):
+        configFlags.ExecutorSplitting.TotalEvents = configFlags.Exec.MaxEvents
+        configFlags.Exec.MaxEvents = runArgs.executorEventCounts[configFlags.ExecutorSplitting.Step]
+
+    if hasattr(runArgs,"executorEventSkips"):
+        configFlags.Exec.SkipEvents = runArgs.executorEventSkips[configFlags.ExecutorSplitting.Step]
diff --git a/Tools/PyJobTransforms/python/transform.py b/Tools/PyJobTransforms/python/transform.py
index 431bf849ced1d00d6039a53fa5aa9584ff08fecf..946bfabe469d96464e379ac7fa6efe0b198e99ae 100644
--- a/Tools/PyJobTransforms/python/transform.py
+++ b/Tools/PyJobTransforms/python/transform.py
@@ -8,6 +8,7 @@
 #
 
 import argparse
+import copy
 import os
 import os.path as path
 import re
@@ -24,6 +25,7 @@ from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandle
 from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
 from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
 from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
+from PyJobTransforms.trfExeStepTools import executorStepSuffix, getTotalExecutorSteps
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
 from PyJobTransforms.trfUtils import isInteractiveEnv, calcCpuTime, calcWallTime
@@ -436,6 +438,9 @@ class transform(object):
                             msg.debug('Did not find any argument matching data type {0} - setting to plain argFile: {1}'.format(dataType, self._dataDictionary[dataType]))
                     self._dataDictionary[dataType].name = fileName
 
+            # Do splitting if required
+            self.setupSplitting()
+
             # Now we can set the final executor configuration properly, with the final dataDictionary
             for executor in self._executors:
                 executor.conf.setFromTransform(self)
@@ -517,6 +522,37 @@ class transform(object):
         self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
         self._executorGraph.doToposort()
     
+    ## @brief Setup executor splitting
+    def setupSplitting(self):
+        if 'splitConfig' not in self._argdict:
+            return
+
+        split = []
+        for executionStep in self._executorPath:
+            baseStepName = executionStep['name']
+            if baseStepName in split:
+                continue
+
+            baseExecutor = self._executorDictionary[baseStepName]
+            splitting = getTotalExecutorSteps(baseExecutor, argdict=self._argdict)
+            if splitting <= 1:
+                continue
+
+            msg.info('Splitting {0} into {1} substeps'.format(executionStep, splitting))
+            index = self._executorPath.index(executionStep)
+            baseStep = self._executorPath.pop(index)
+            for i in range(splitting):
+                name = baseStepName + executorStepSuffix + str(i)
+                step = copy.deepcopy(baseStep)
+                step['name'] = name
+                self._executorPath.insert(index + i, step)
+                executor = copy.deepcopy(baseExecutor)
+                executor.name = name
+                executor.conf.executorStep = i
+                executor.conf.totalExecutorSteps = splitting
+                self._executors.add(executor)
+                self._executorDictionary[name] = executor
+                split.append(name)
     
     ## @brief Trace the path through the executor graph
     #  @note This function might need to be called again when the number of 'substeps' is unknown
diff --git a/Tools/PyJobTransforms/python/trfArgClasses.py b/Tools/PyJobTransforms/python/trfArgClasses.py
index 5b6c6f97b3a84a24e20cd4eca4b6c3499936a167..80cb57a6b7a595fdd2a4b9ae798d17aed207380c 100644
--- a/Tools/PyJobTransforms/python/trfArgClasses.py
+++ b/Tools/PyJobTransforms/python/trfArgClasses.py
@@ -20,6 +20,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 
 from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PRWEntries, urlType, ROOTGetSize
 from PyJobTransforms.trfUtils import call
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName
 from PyJobTransforms.trfExitCodes import trfExit as trfExit
 from PyJobTransforms.trfDecorators import timelimited
 from PyJobTransforms.trfAMI import getAMIClient
@@ -1928,7 +1929,9 @@ class argSubstep(argument):
             name = exe.name
             substep = exe.substep
             first = exe.conf.firstExecutor
-            
+
+        name = commonExecutorStepName(name)
+
         value = None
         ## @note First we see if we have an explicit name or substep match, then a special 'first' or 'default' match
         if name in self._value:
diff --git a/Tools/PyJobTransforms/python/trfArgs.py b/Tools/PyJobTransforms/python/trfArgs.py
index f3756915350e5c5e680fdea5e5e3eb63a4f136a2..bd7c4bd2256c9caddb31b0471ae5957b7f944545 100644
--- a/Tools/PyJobTransforms/python/trfArgs.py
+++ b/Tools/PyJobTransforms/python/trfArgs.py
@@ -79,6 +79,9 @@ def addAthenaArguments(parser, maxEventsDefaultSubstep='first', addValgrind=True
                         metavar='substep:POSTINCLUDE',
                         help='Python configuration fragment to include after main job options (can be optionally limited ' 
                         'to a single substep). Will split on commas: frag1.py,frag2.py is understood.')
+    parser.add_argument('--splitConfig', group = 'Athena', type=argFactory(trfArgClasses.argSubstepString),
+                        metavar='substep:SPLITCONFIG',
+                        help='Configuration file to internally split job into multiple parts (can be optionally limited to a single substep)')
     parser.add_argument('--maxEvents', group='Athena', type=argFactory(trfArgClasses.argSubstepInt, defaultSubstep=maxEventsDefaultSubstep), 
                         nargs='+', metavar='substep:maxEvents',
                         help='Set maximum events for each processing step (default substep is "{0}")'.format(maxEventsDefaultSubstep))
diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py
index 1051be1aac635a2bb96a8771b69c288cd6ff7453..ee0e080b918d222495f89abd004208955d1a6d95 100755
--- a/Tools/PyJobTransforms/python/trfExe.py
+++ b/Tools/PyJobTransforms/python/trfExe.py
@@ -26,6 +26,7 @@ from PyJobTransforms.trfJobOptions import JobOptionsTemplate
 from PyJobTransforms.trfUtils import asetupReport, asetupReleaseIsOlderThan, unpackDBRelease, setupDBRelease, \
     cvmfsDBReleaseCheck, forceToAlphaNum, \
     ValgrindCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic, reportEventsPassedSimFilter
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName, executorStepSuffix
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfLogger import stdLogLevels
 from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
@@ -64,7 +65,9 @@ class executorConfig(object):
         self._argdict = argdict
         self._dataDictionary = dataDictionary
         self._firstExecutor = firstExecutor
-       
+        self._executorStep = -1
+        self._totalExecutorSteps = 0
+
     @property
     def argdict(self):
         return self._argdict
@@ -88,7 +91,23 @@ class executorConfig(object):
     @firstExecutor.setter
     def firstExecutor(self, value):
         self._firstExecutor = value
-        
+
+    @property
+    def executorStep(self):
+        return self._executorStep
+
+    @executorStep.setter
+    def executorStep(self, value):
+        self._executorStep = value
+
+    @property
+    def totalExecutorSteps(self):
+        return self._totalExecutorSteps
+
+    @totalExecutorSteps.setter
+    def totalExecutorSteps(self, value):
+        self._totalExecutorSteps = value
+
     ## @brief Set configuration properties from the parent transform
     #  @note  It's not possible to set firstExecutor here as the transform holds
     #  the name of the first executor, which we don't know... (should we?)
@@ -190,7 +209,11 @@ class transformExecutor(object):
     @property
     def name(self):
         return self._name
-    
+
+    @name.setter
+    def name(self, value):
+        self._name = value
+
     @property
     def substep(self):
         if '_substep' in dir(self):
@@ -981,7 +1004,17 @@ class athenaExecutor(scriptExecutor):
                 msg.info("Disabling AthenaMP as number of input events to process is too low ({0} events for {1} workers)".format(expectedEvents, self._athenaMP))
                 self._disableMP = True
                 self._athenaMP = 0
-            
+
+        # Handle executor steps
+        if self.conf.totalExecutorSteps > 1:
+            for dataType in output:
+                if self.conf._dataDictionary[dataType].originalName:
+                    self.conf._dataDictionary[dataType].value[0] = self.conf._dataDictionary[dataType].originalName
+                else:
+                    self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
+                self.conf._dataDictionary[dataType].value[0] += "_{0}{1}".format(executorStepSuffix, self.conf.executorStep)
+                msg.info("Updated athena output filename for {0} to {1}".format(dataType, self.conf._dataDictionary[dataType].value[0]))
+
         # And if this is (still) athenaMP, then set some options for workers and output file report
         if self._athenaMP:
             self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep)
@@ -1021,7 +1054,8 @@ class athenaExecutor(scriptExecutor):
             # so that the mother process output file (if it exists) can be used directly
             # as soft linking can lead to problems in the PoolFileCatalog (see ATLASJT-317)
             for dataType in output:
-                self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
+                if self.conf.totalExecutorSteps <= 1:
+                    self.conf._dataDictionary[dataType].originalName = self.conf._dataDictionary[dataType].value[0]
                 if 'eventService' not in self.conf.argdict or 'eventService' in self.conf.argdict and self.conf.argdict['eventService'].value is False:
                     if 'sharedWriter' in self.conf.argdict and self.conf.argdict['sharedWriter'].value:
                         msg.info("SharedWriter: not updating athena output filename for {0}".format(dataType))
@@ -1040,12 +1074,13 @@ class athenaExecutor(scriptExecutor):
             outputFiles = dict()
             for dataType in output:
                 outputFiles[dataType] = self.conf.dataDictionary[dataType]
-                
+
             # See if we have any 'extra' file arguments
+            nameForFiles = commonExecutorStepName(self._name)
             for dataType, dataArg in self.conf.dataDictionary.items():
-                if dataArg.io == 'input' and self._name in dataArg.executor:
+                if dataArg.io == 'input' and nameForFiles in dataArg.executor:
                     inputFiles[dataArg.subtype] = dataArg
-                
+
             msg.debug('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
             
             # Get the list of top options files that will be passed to athena (=runargs file + all skeletons)
@@ -1105,8 +1140,34 @@ class athenaExecutor(scriptExecutor):
     def postExecute(self):
         super(athenaExecutor, self).postExecute()
 
+        # Handle executor substeps
+        if self.conf.totalExecutorSteps > 1:
+            if self._athenaMP:
+                outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
+                athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP, False, self.conf.argdict)
+            if self.conf.executorStep == self.conf.totalExecutorSteps - 1:
+                # first loop over datasets for the output
+                for dataType in self._output:
+                    newValue = []
+                    if self._athenaMP:
+                        # assume the same number of workers all the time
+                        for i in range(self.conf.totalExecutorSteps):
+                            for v in self.conf.dataDictionary[dataType].value:
+                                newValue.append(v.replace('_{0}{1}_'.format(executorStepSuffix, self.conf.executorStep),
+                                                          '_{0}{1}_'.format(executorStepSuffix, i)))
+                    else:
+                        self.conf.dataDictionary[dataType].multipleOK = True
+                        # just combine all executors
+                        for i in range(self.conf.totalExecutorSteps):
+                            newValue.append(self.conf.dataDictionary[dataType].originalName + '_{0}{1}'.format(executorStepSuffix, i))
+                    self.conf.dataDictionary[dataType].value = newValue
+
+                    # do the merging if needed
+                    if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1:
+                        self._smartMerge(self.conf.dataDictionary[dataType])
+
         # If this was an athenaMP run then we need to update output files
-        if self._athenaMP:
+        elif self._athenaMP:
             outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ])
             ## @note Update argFile values to have the correct outputs from the MP workers
             skipFileChecks=False
@@ -1262,8 +1323,9 @@ class athenaExecutor(scriptExecutor):
         # Find options for the current substep. Name is prioritised (e.g. RAWtoESD) over alias (e.g. r2e). Last look for 'all'
         currentSubstep = None
         if 'athenaopts' in self.conf.argdict:
-            if self.name in self.conf.argdict['athenaopts'].value:
-                currentSubstep = self.name
+            currentName = commonExecutorStepName(self.name)
+            if currentName in self.conf.argdict['athenaopts'].value:
+                currentSubstep = currentName
                 if self.substep in self.conf.argdict['athenaopts'].value:
                     msg.info('Athenaopts found for {0} and {1}, joining options. '
                              'Consider changing your configuration to use just the name or the alias of the substep.'
diff --git a/Tools/PyJobTransforms/python/trfExeStepTools.py b/Tools/PyJobTransforms/python/trfExeStepTools.py
new file mode 100644
index 0000000000000000000000000000000000000000..463d8911ad7a08a5b7572c4d8997d409ae57ef79
--- /dev/null
+++ b/Tools/PyJobTransforms/python/trfExeStepTools.py
@@ -0,0 +1,71 @@
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
+
+
+executorStepSuffix = 'ExecutorStep'
+
+
+def commonExecutorStepName(name):
+    """Remove executor step suffix from executor name."""
+    if executorStepSuffix in name:
+        name, _sep, _tail = name.rpartition(executorStepSuffix)
+    return name
+
+
+def loadSplitConfig(config):
+    """Load splitting configuration from file."""
+    parts = config.split('.')
+    if len(parts) < 2:
+        raise ValueError('Splitting config should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
+
+    function = parts[-1]
+    module = '.'.join(parts[:-1])
+
+    from importlib import import_module
+    loaded_module = import_module(module)
+    function_def = getattr(loaded_module, function)
+    return function_def()
+
+
+def getTotalExecutorSteps(executor, argdict=None):
+    """Get total executor steps from executor."""
+    if not argdict:
+        argdict = executor.conf.argdict
+    if 'splitConfig' not in argdict:
+        return 0
+
+    splitConfig = argdict['splitConfig'].returnMyValue(exe=executor)
+    if not splitConfig:
+        return 0
+
+    steps, fractions = loadSplitConfig(splitConfig)
+    return steps
+
+
+def getExecutorStepEventCounts(executor, argdict=None):
+    """Get executor step event counts from executor config."""
+    if not argdict:
+        argdict = executor.conf.argdict
+    if 'splitConfig' not in argdict:
+        return []
+
+    maxEvents = argdict['maxEvents'].returnMyValue(exe=executor)
+    skipEvents = argdict['skipEvents'].returnMyValue(exe=executor)
+    splitConfig = argdict['splitConfig'].returnMyValue(exe=executor)
+    if not splitConfig:
+        return []
+
+    steps, fractions = loadSplitConfig(splitConfig)
+
+    if sum(fractions) != 1:
+        raise ValueError('Event fractions should total to 1!')
+
+    counts = []
+    for i in range(len(fractions) - 1):
+        counts.append(round(maxEvents * fractions[i]))
+    counts.append(maxEvents - sum(counts))
+
+    sums = []
+    for i in range(len(fractions)):
+        sums.append(skipEvents + sum(counts[:i]))
+
+    return counts, sums
diff --git a/Tools/PyJobTransforms/python/trfJobOptions.py b/Tools/PyJobTransforms/python/trfJobOptions.py
index ba10d6d1778a131bacd8cadab074189b834ef1bc..6e898353908c09be1ed29bef0c74593a52c92699 100644
--- a/Tools/PyJobTransforms/python/trfJobOptions.py
+++ b/Tools/PyJobTransforms/python/trfJobOptions.py
@@ -15,6 +15,7 @@ msg = logging.getLogger(__name__)
 
 import PyJobTransforms.trfArgClasses as trfArgClasses
 import PyJobTransforms.trfExceptions as trfExceptions
+from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
 from PyJobTransforms.trfExitCodes import trfExit
 
 from PyJobTransforms.trfUtils import findFile
@@ -208,6 +209,21 @@ class JobOptionsTemplate(object):
                         print(f"AthenaMPJobProps.AthenaMPFlags.UseSharedWriter={self._exe.conf.argdict['sharedWriter'].value}", file=runargsFile)
                     if 'parallelCompression' in self._exe.conf.argdict:
                         print(f"AthenaMPJobProps.AthenaMPFlags.UseParallelCompression={self._exe.conf.argdict['parallelCompression'].value}", file=runargsFile)
+
+                # Executor substeps
+                print(os.linesep, '# Executor flags', file=runargsFile)
+                msg.debug('Adding runarg {0!s}={1!r}'.format('totalExecutorSteps', self._exe.conf.totalExecutorSteps))
+                print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'totalExecutorSteps', self._exe.conf.totalExecutorSteps), file=runargsFile)
+                if self._exe.conf.executorStep >= 0:
+                    msg.debug('Adding runarg {0!s}={1!r}'.format('executorStep', self._exe.conf.executorStep))
+                    print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorStep', self._exe.conf.executorStep), file=runargsFile)
+                    executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._exe)
+                    msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventCounts', executorEventCounts))
+                    print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventCounts', executorEventCounts), file=runargsFile)
+                    msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventSkips', executorEventSkips))
+                    print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventSkips', executorEventSkips), file=runargsFile)
+
+                # CA
                 if self._exe._isCAEnabled():
                     print(os.linesep, '# Threading flags', file=runargsFile)
                     #Pass the number of threads
@@ -223,7 +239,7 @@ class JobOptionsTemplate(object):
                     print('fromRunArgs({0})'.format(self._runArgsName),file=runargsFile)
 
                 msg.info('Successfully wrote runargs file {0}'.format(self._runArgsFile))
-                
+
             except (IOError, OSError) as e:
                 errMsg = 'Got an error when writing JO template {0}: {1}'.format(self._runArgsFile, e)
                 msg.error(errMsg)
@@ -251,7 +267,9 @@ class JobOptionsTemplate(object):
     #  @param input Input file list
     #  @param output Output file list
     #  @return List of runargs and skeletons to be processed by athena
-    def getTopOptions(self, input = dict(), output = dict()): 
+    def getTopOptions(self, input = dict(), output = dict()):
+        # Update the output name
+        self._runArgsFile = 'runargs.' + self._exe.name + '.py'
         # First Make the runArgs file:
         self.writeRunArgs(input = input, output = output)
         # Make sure runArgs and skeleton are valid
diff --git a/Tools/PyJobTransforms/python/trfMPTools.py b/Tools/PyJobTransforms/python/trfMPTools.py
index 926816bde4edc8fa88f09ea7f9444846b87da267..125f47696cc8e3aab0fdd4fd93111a2224e03c8d 100644
--- a/Tools/PyJobTransforms/python/trfMPTools.py
+++ b/Tools/PyJobTransforms/python/trfMPTools.py
@@ -16,6 +16,7 @@ msg = logging.getLogger(__name__)
 
 from xml.etree import ElementTree
 
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName
 from PyJobTransforms.trfExitCodes import trfExit
 
 import PyJobTransforms.trfExceptions as trfExceptions
@@ -25,6 +26,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 #  @return Integer with the number of processes, N.B. 0 means non-MP serial mode
 def detectAthenaMPProcs(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
     athenaMPProcs = 0
+    currentSubstep = commonExecutorStepName(currentSubstep)
     
     # Try and detect if any AthenaMP has been enabled 
     try:
diff --git a/Tools/PyJobTransforms/python/trfMTTools.py b/Tools/PyJobTransforms/python/trfMTTools.py
index c025e04c986047220c8d2330d44bbfdb75194fbc..1b476b75f8f17545599492d1551e1e601c53f299 100644
--- a/Tools/PyJobTransforms/python/trfMTTools.py
+++ b/Tools/PyJobTransforms/python/trfMTTools.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
+# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
 
 ## @package PyJobTransforms.trfMTTools
 #
@@ -12,6 +12,7 @@ import os
 import logging
 msg = logging.getLogger(__name__)
 
+from PyJobTransforms.trfExeStepTools import commonExecutorStepName
 from PyJobTransforms.trfExitCodes import trfExit
 
 import PyJobTransforms.trfExceptions as trfExceptions
@@ -22,6 +23,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 def detectAthenaMTThreads(argdict = {}, currentSubstep = '', legacyThreadingRelease = False):
     athenaMTThreads = 0
     athenaConcurrentEvents = 0
+    currentSubstep = commonExecutorStepName(currentSubstep)
 
     if legacyThreadingRelease:
         return athenaMTThreads, athenaConcurrentEvents
diff --git a/Tools/PyJobTransforms/python/trfValidation.py b/Tools/PyJobTransforms/python/trfValidation.py
index 9545c52c6511eb8ef3de61ba9149431f1e8b2c4e..238507b896ec28ac9ecbfac1506ea49539211c5e 100644
--- a/Tools/PyJobTransforms/python/trfValidation.py
+++ b/Tools/PyJobTransforms/python/trfValidation.py
@@ -22,6 +22,7 @@ msg = logging.getLogger(__name__)
 
 from PyUtils import RootUtils
 
+from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfLogger import stdLogLevels
 from PyJobTransforms.trfArgClasses import argFile
@@ -989,6 +990,12 @@ class eventMatch(object):
             else:
                 self._maxEvents = None
 
+            # Executor substeps handling
+            if self._executor.conf.totalExecutorSteps > 1 and self._executor.conf.executorStep < self._executor.conf.totalExecutorSteps - 1:
+                executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._executor)
+                self._maxEvents = executorEventCounts[self._executor.conf.executorStep]
+                self._skipEvents = executorEventSkips[self._executor.conf.executorStep]
+
             # Global eventAcceptanceEfficiency set?
             if "eventAcceptanceEfficiency" in self._executor.conf.argdict:
                 self._evAccEff = self._executor.conf.argdict['eventAcceptanceEfficiency'].returnMyValue(exe=self._executor)