From ae96466b05a166e18cf75d5ab8ec15b3232ed14f Mon Sep 17 00:00:00 2001
From: Vakho Tsulaia <vakhtang.tsulaia@cern.ch>
Date: Fri, 23 Oct 2020 02:37:29 +0200
Subject: [PATCH] Allow running hybrid MPMT in the BSRDOtoRAW step of Job
 Transforms

Fixed detectAthenaMPProcs() and detectAthenaMTThreads() so that they perform
--athenaopts checks only for the current substep. If MPMT mode is detected
for the BSRDOtoRAW substep, then the job is not aborted.

Addresses: ATR-21933
---
 Tools/PyJobTransforms/python/trfExe.py     |  6 +--
 Tools/PyJobTransforms/python/trfMPTools.py | 28 +++++++------
 Tools/PyJobTransforms/python/trfMTTools.py | 46 +++++++++++-----------
 3 files changed, 42 insertions(+), 38 deletions(-)

diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py
index a4544e53e31..9fd45fb0576 100755
--- a/Tools/PyJobTransforms/python/trfExe.py
+++ b/Tools/PyJobTransforms/python/trfExe.py
@@ -970,14 +970,14 @@ class athenaExecutor(scriptExecutor):
                                                             'either --multithreaded nor --multiprocess command line option provided but ATHENA_CORE_NUMBER environment has not been set')
 
         # Try to detect AthenaMT mode, number of threads and number of concurrent events
-        self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict)
+        self._athenaMT, self._athenaConcurrentEvents = detectAthenaMTThreads(self.conf.argdict,self.name)
 
         # Try to detect AthenaMP mode and number of workers
-        self._athenaMP = detectAthenaMPProcs(self.conf.argdict)
+        self._athenaMP = detectAthenaMPProcs(self.conf.argdict,self.name)
 
         # Another constistency check: make sure we don't have a configuration like follows:
         # ... --multithreaded --athenaopts=--nprocs=N
-        if (self._athenaMT != 0 and self._athenaMP != 0):
+        if (self.name != 'BSRDOtoRAW' and self._athenaMT != 0 and self._athenaMP != 0):
             raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_SETUP'),
                                                             'transform configured to run Athena in both MT and MP modes. Only one parallel mode at a time must be used')
 
diff --git a/Tools/PyJobTransforms/python/trfMPTools.py b/Tools/PyJobTransforms/python/trfMPTools.py
index b49903be9cf..4b1e261ac4d 100644
--- a/Tools/PyJobTransforms/python/trfMPTools.py
+++ b/Tools/PyJobTransforms/python/trfMPTools.py
@@ -26,25 +26,27 @@ import PyJobTransforms.trfExceptions as trfExceptions
 ## @brief Detect if AthenaMP has been requested
 #  @param argdict Argument dictionary, used to access athenaopts for the job
 #  @return Integer with the number of processes, N.B. 0 means non-MP serial mode
-def detectAthenaMPProcs(argdict = {}):
+def detectAthenaMPProcs(argdict = {}, currentSubstep = ''):
     athenaMPProcs = 0
     
     # Try and detect if any AthenaMP has been enabled 
     try:
         if 'athenaopts' in argdict:
             for substep in argdict['athenaopts'].value:
-                procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt]
-                if len(procArg) == 0:
-                    athenaMPProcs = 0
-                elif len(procArg) == 1:
-                    if 'multiprocess' in argdict:
-                        raise ValueError("Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
-                    athenaMPProcs = int(procArg[0])
-                    if athenaMPProcs < -1:
-                        raise ValueError("--nprocs was set to a value less than -1")
-                else:
-                    raise ValueError("--nprocs was set more than once in 'athenaopts'")
-                msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
+                if substep == 'all' or substep == currentSubstep:
+                    procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt]
+                    if len(procArg) == 0:
+                        athenaMPProcs = 0
+                    elif len(procArg) == 1:
+                        if 'multiprocess' in argdict:
+                            raise ValueError("Detected conflicting methods to configure AthenaMP: --multiprocess and --nprocs=N (via athenaopts). Only one method must be used")
+                        athenaMPProcs = int(procArg[0])
+                        if athenaMPProcs < -1:
+                            raise ValueError("--nprocs was set to a value less than -1")
+                    else:
+                        raise ValueError("--nprocs was set more than once in 'athenaopts'")
+                    if athenaMPProcs > 0:
+                        msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep))
         if (athenaMPProcs == 0 and
             'ATHENA_CORE_NUMBER' in os.environ and
             'multiprocess' in argdict):
diff --git a/Tools/PyJobTransforms/python/trfMTTools.py b/Tools/PyJobTransforms/python/trfMTTools.py
index 1575df067d7..8ac57ee4f29 100644
--- a/Tools/PyJobTransforms/python/trfMTTools.py
+++ b/Tools/PyJobTransforms/python/trfMTTools.py
@@ -19,7 +19,7 @@ import PyJobTransforms.trfExceptions as trfExceptions
 ## @brief Detect if AthenaMT has been requested
 #  @param argdict Argument dictionary, used to access athenaopts for the job
 #  @return Two integers with the number of threads and number of concurrent events, N.B. 0 means non-MT serial mode
-def detectAthenaMTThreads(argdict = {}):
+def detectAthenaMTThreads(argdict = {}, currentSubstep = ''):
     athenaMTThreads = 0
     athenaConcurrentEvents = 0
 
@@ -27,27 +27,29 @@ def detectAthenaMTThreads(argdict = {}):
     try:
         if 'athenaopts' in argdict:
             for substep in argdict['athenaopts'].value:
-                threadArg = [opt.replace("--threads=", "") for opt in argdict['athenaopts'].value[substep] if '--threads' in opt]
-                if len(threadArg) == 0:
-                    athenaMTThreads = 0
-                elif len(threadArg) == 1:
-                    if 'multithreaded' in argdict:
-                        raise ValueError("Detected conflicting methods to configure AthenaMT: --multithreaded and --threads=N (via athenaopts). Only one method must be used")
-                    athenaMTThreads = int(threadArg[0])
-                    if athenaMTThreads < -1:
-                        raise ValueError("--threads was set to a value less than -1")
-                else:
-                    raise ValueError("--threads was set more than once in 'athenaopts'")
-                msg.info('AthenaMT detected from "threads" setting with {0} threads for substep {1}'.format(athenaMTThreads,substep))
-
-                concurrentEventsArg = [opt.replace("--concurrent-events=", "") for opt in argdict['athenaopts'].value[substep] if '--concurrent-events' in opt]
-                if len(concurrentEventsArg) == 1:
-                    athenaConcurrentEvents = int(concurrentEventsArg[0])
-                    if athenaConcurrentEvents < -1:
-                        raise ValueError("--concurrent-events was set to a value less than -1")
-                    msg.info('Custom concurrent event setting read from "concurrent-events" with {0} events for substep {1}'.format(athenaConcurrentEvents,substep))
-                else:
-                    athenaConcurrentEvents = athenaMTThreads
+                if substep == 'all' or substep == currentSubstep:
+                    threadArg = [opt.replace("--threads=", "") for opt in argdict['athenaopts'].value[substep] if '--threads' in opt]
+                    if len(threadArg) == 0:
+                        athenaMTThreads = 0
+                    elif len(threadArg) == 1:
+                        if 'multithreaded' in argdict:
+                            raise ValueError("Detected conflicting methods to configure AthenaMT: --multithreaded and --threads=N (via athenaopts). Only one method must be used")
+                        athenaMTThreads = int(threadArg[0])
+                        if athenaMTThreads < -1:
+                            raise ValueError("--threads was set to a value less than -1")
+                    else:
+                        raise ValueError("--threads was set more than once in 'athenaopts'")
+                    if athenaMTThreads > 0:
+                        msg.info('AthenaMT detected from "threads" setting with {0} threads for substep {1}'.format(athenaMTThreads,substep))
+
+                    concurrentEventsArg = [opt.replace("--concurrent-events=", "") for opt in argdict['athenaopts'].value[substep] if '--concurrent-events' in opt]
+                    if len(concurrentEventsArg) == 1:
+                        athenaConcurrentEvents = int(concurrentEventsArg[0])
+                        if athenaConcurrentEvents < -1:
+                            raise ValueError("--concurrent-events was set to a value less than -1")
+                        msg.info('Custom concurrent event setting read from "concurrent-events" with {0} events for substep {1}'.format(athenaConcurrentEvents,substep))
+                    else:
+                        athenaConcurrentEvents = athenaMTThreads
         if (athenaMTThreads == 0 and
             'ATHENA_CORE_NUMBER' in os.environ and
             'multithreaded' in argdict):
-- 
GitLab