diff --git a/Tools/PyJobTransforms/python/trfArgClasses.py b/Tools/PyJobTransforms/python/trfArgClasses.py index 5b6c6f97b3a84a24e20cd4eca4b6c3499936a167..7065e81f303e7dad069e8e100525416fe9951a34 100644 --- a/Tools/PyJobTransforms/python/trfArgClasses.py +++ b/Tools/PyJobTransforms/python/trfArgClasses.py @@ -1359,7 +1359,8 @@ class argPOOLFile(argAthenaFile): myMergeConf = executorConfig(myargdict, myDataDictionary) myMerger = athenaExecutor(name='POOLMergeAthenaMP{0}{1}'.format(self._subtype, counter), conf=myMergeConf, skeletonFile = 'RecJobTransforms/skeleton.MergePool_tf.py', - inData=set(['POOL_MRG_INPUT']), outData=set(['POOL_MRG_OUTPUT']), disableMP=True) + inData=set(['POOL_MRG_INPUT']), outData=set(['POOL_MRG_OUTPUT']), + disableMT=True, disableMP=True) myMerger.doAll(input=set(['POOL_MRG_INPUT']), output=set(['POOL_MRG_OUTPUT'])) # OK, if we got to here with no exceptions, we're good shape @@ -1396,7 +1397,8 @@ class argHITSFile(argPOOLFile): myMergeConf = executorConfig(myargdict, myDataDictionary) myMerger = athenaExecutor(name = mySubstepName, skeletonFile = 'SimuJobTransforms/skeleton.HITSMerge.py', conf=myMergeConf, - inData=set(['HITS']), outData=set(['HITS_MRG']), disableMP=True) + inData=set(['HITS']), outData=set(['HITS_MRG']), + disableMT=True, disableMP=True) myMerger.doAll(input=set(['HITS']), output=set(['HITS_MRG'])) # OK, if we got to here with no exceptions, we're good shape @@ -1434,7 +1436,8 @@ class argEVNT_TRFile(argPOOLFile): myMergeConf = executorConfig(myargdict, myDataDictionary) myMerger = athenaExecutor(name = mySubstepName, skeletonFile = 'SimuJobTransforms/skeleton.EVNT_TRMerge.py', conf=myMergeConf, - inData=set(['EVNT_TR']), outData=set(['EVNT_TR_MRG']), disableMP=True) + inData=set(['EVNT_TR']), outData=set(['EVNT_TR_MRG']), + disableMT=True, disableMP=True) myMerger.doAll(input=set(['EVNT_TR']), output=set(['EVNT_TR_MRG'])) # OK, if we got to here with no exceptions, we're good shape @@ -1471,7 +1474,8 @@ class argRDOFile(argPOOLFile): myMergeConf = executorConfig(myargdict, myDataDictionary) myMerger = athenaExecutor(name = 'RDOMergeAthenaMP{0}'.format(counter), skeletonFile = 'RecJobTransforms/skeleton.MergeRDO_tf.py', conf=myMergeConf, - inData=set(['RDO']), outData=set(['RDO_MRG']), disableMP=True) + inData=set(['RDO']), outData=set(['RDO_MRG']), + disableMT=True, disableMP=True) myMerger.doAll(input=set(['RDO']), output=set(['RDO_MRG'])) # OK, if we got to here with no exceptions, we're good shape @@ -1508,7 +1512,8 @@ class argEVNTFile(argPOOLFile): myMergeConf = executorConfig(myargdict, myDataDictionary) myMerger = athenaExecutor(name = mySubstepName, skeletonFile = 'PyJobTransforms/skeleton.EVNTMerge.py', conf=myMergeConf, - inData=set(['EVNT']), outData=set(['EVNT_MRG']), disableMP=True) + inData=set(['EVNT']), outData=set(['EVNT_MRG']), + disableMT=True, disableMP=True) myMerger.doAll(input=set(['EVNT']), output=set(['EVNT_MRG'])) # OK, if we got to here with no exceptions, we're good shape diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py index 1051be1aac635a2bb96a8771b69c288cd6ff7453..486810aabc6e68c68213e948561209d06aa105ff 100755 --- a/Tools/PyJobTransforms/python/trfExe.py +++ b/Tools/PyJobTransforms/python/trfExe.py @@ -833,6 +833,7 @@ class athenaExecutor(scriptExecutor): # executor to the workflow graph, run the executor manually with these data parameters (useful for # post-facto executors, e.g., for AthenaMP merging) # @param memMonitor Enable subprocess memory monitoring + # @param disableMT Ensure that AthenaMT is not used # @param disableMP Ensure that AthenaMP is not used # @note The difference between @c extraRunargs, @c runtimeRunargs and @c literalRunargs is that: @c extraRunargs # uses repr(), so the RHS is the same as the python object in the transform; @c runtimeRunargs uses str() so @@ -842,7 +843,7 @@ class athenaExecutor(scriptExecutor): inData = set(), outData = set(), inputDataTypeCountCheck = None, exe = 'athena.py', exeArgs = ['athenaopts'], substep = None, inputEventTest = True, perfMonFile = None, tryDropAndReload = True, extraRunargs = {}, runtimeRunargs = {}, literalRunargs = [], dataArgs = [], checkEventCount = False, errorMaskFiles = None, - manualDataDictionary = None, memMonitor = True, disableMP = False): + manualDataDictionary = None, memMonitor = True, disableMT = False, disableMP = False): self._substep = forceToAlphaNum(substep) self._inputEventTest = inputEventTest @@ -853,6 +854,7 @@ class athenaExecutor(scriptExecutor): self._dataArgs = dataArgs self._errorMaskFiles = errorMaskFiles self._inputDataTypeCountCheck = inputDataTypeCountCheck + self._disableMT = disableMT self._disableMP = disableMP self._skeletonCA=skeletonCA @@ -897,6 +899,14 @@ class athenaExecutor(scriptExecutor): @disableMP.setter def disableMP(self, value): self._disableMP = value + + @property + def disableMT(self): + return self._disableMT + + @disableMT.setter + def disableMT(self, value): + self._disableMT = value def preExecute(self, input = set(), output = set()): self.setPreExeStart() @@ -972,6 +982,9 @@ class athenaExecutor(scriptExecutor): # Try to detect AthenaMP mode and number of workers self._athenaMP = detectAthenaMPProcs(self.conf.argdict, self.name, legacyThreadingRelease) + if self._disableMT: + self._athenaMT = 0 + if self._disableMP: self._athenaMP = 0 else: @@ -1340,13 +1353,13 @@ class athenaExecutor(scriptExecutor): msg.info('Skipping test for "--drop-and-reload" in this executor') # For AthenaMT apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multithreaded - if self._athenaMT != 0 : + if self._athenaMT != 0 and not self._disableMT: if not ('athenaopts' in self.conf.argdict and any('--threads' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])): self._cmd.append('--threads=%s' % str(self._athenaMT)) - # For AthenaMP apply --threads=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess - if (self._athenaMP !=0 and not self._disableMP): + # For AthenaMP apply --nprocs=N if threads have been configured via ATHENA_CORE_NUMBER + multiprocess + if self._athenaMP != 0 and not self._disableMP: if not ('athenaopts' in self.conf.argdict and any('--nprocs' in opt for opt in self.conf.argdict['athenaopts'].value[currentSubstep])): self._cmd.append('--nprocs=%s' % str(self._athenaMP)) @@ -1403,6 +1416,8 @@ class athenaExecutor(scriptExecutor): print('export CORAL_DBLOOKUP_PATH={directory}'.format(directory = path.join(dbroot, 'XMLConfig')), file=wrapper) print('export TNS_ADMIN={directory}'.format(directory = path.join(dbroot, 'oracle-admin')), file=wrapper) print('DATAPATH={dbroot}:$DATAPATH'.format(dbroot = dbroot), file=wrapper) + if self._disableMT: + print("# AthenaMT explicitly disabled for this executor", file=wrapper) if self._disableMP: print("# AthenaMP explicitly disabled for this executor", file=wrapper) if self._envUpdate.len > 0: