Skip to content
Snippets Groups Projects
Commit 6efaab8b authored by Vakhtang Tsulaia's avatar Vakhtang Tsulaia
Browse files

Merge branch 'dev/master/job-prop_num-conc-evts' into 'master'

Add command line option "--concurrent-events" for AthenaMT

See merge request !4718

Former-commit-id: cf09d359
parents 825eaad5 3d3a8380
No related branches found
No related tags found
No related merge requests found
...@@ -34,7 +34,7 @@ _userlongopts = [ ...@@ -34,7 +34,7 @@ _userlongopts = [
"debugWorker", "debugWorker",
"pycintex_minvmem=", "cppyy_minvmem", "pycintex_minvmem=", "cppyy_minvmem",
"minimal", # private, undocumented "minimal", # private, undocumented
"threads=", "threads=", "concurrent-events=",
"evtMax=", #will set theApp.EvtMax just before theApp.run() in runbatch.py "evtMax=", #will set theApp.EvtMax just before theApp.run() in runbatch.py
"skipEvents=",#will set svcMgr.EventSelector.SkipEvents just before theApp.run() in runbatch.py "skipEvents=",#will set svcMgr.EventSelector.SkipEvents just before theApp.run() in runbatch.py
"filesInput=" #will set the AthenaCommonFlags.FilesInput job option and lock it "filesInput=" #will set the AthenaCommonFlags.FilesInput job option and lock it
...@@ -96,8 +96,9 @@ Accepted command line options: ...@@ -96,8 +96,9 @@ Accepted command line options:
--stdcmalloc ... use libc malloc for memory allocation --stdcmalloc ... use libc malloc for memory allocation
--preloadlib=<lib> ... localized preload of library <lib> --preloadlib=<lib> ... localized preload of library <lib>
--nprocs=n ... enable AthenaMP if n>=1 or n==-1 --nprocs=n ... enable AthenaMP if n>=1 or n==-1
--threads=n ... number of threads for Hive. --threads=n ... number of threads for AthenaMT
With AthenaMP, number of threads per worker With AthenaMP, number of threads per worker
--concurrent-events ... number of concurrent events for AthenaMT
--debugWorker ... pause AthenaMP workers at bootstrap until SIGUSR1 signal received --debugWorker ... pause AthenaMP workers at bootstrap until SIGUSR1 signal received
[<file1>.py [<file2>.py [...]]] ... scripts to run [<file1>.py [<file2>.py [...]]] ... scripts to run
""" """
...@@ -152,7 +153,8 @@ def parse(chk_tcmalloc=True): ...@@ -152,7 +153,8 @@ def parse(chk_tcmalloc=True):
opts.do_heap_mon = False # default is not to do any heap monitoring opts.do_heap_mon = False # default is not to do any heap monitoring
opts.profile_python = None # set to file name to collect and dump python profile opts.profile_python = None # set to file name to collect and dump python profile
opts.nprocs = 0 # enable AthenaMP if >= 1 or == -1 opts.nprocs = 0 # enable AthenaMP if >= 1 or == -1
opts.threads = 0 # enable Hive if >= 1 opts.threads = 0 # enable AthenaMT if >= 1
opts.concurrent_events = 0 # enable AthenaMT if >= 1
opts.debug_worker = False # pause AthenaMP worker after bootstrap until SIGUSR1 received opts.debug_worker = False # pause AthenaMP worker after bootstrap until SIGUSR1 received
opts.cppyy_minvmem = None # artificial vmem bump around cppyy's import opts.cppyy_minvmem = None # artificial vmem bump around cppyy's import
opts.minimal = False # private, undocumented opts.minimal = False # private, undocumented
...@@ -398,6 +400,15 @@ def parse(chk_tcmalloc=True): ...@@ -398,6 +400,15 @@ def parse(chk_tcmalloc=True):
_help_and_exit() _help_and_exit()
opts.threads = arg opts.threads = arg
elif opt in ("--concurrent-events",):
if not arg:
arg = 0
try: arg = int(arg)
except Exception,err:
print "ERROR:",err
_help_and_exit()
opts.concurrent_events = arg
elif opt in ("--debugWorker",): elif opt in ("--debugWorker",):
opts.debug_worker = True opts.debug_worker = True
......
...@@ -29,7 +29,7 @@ def _setupAtlasThreadedJob(): ...@@ -29,7 +29,7 @@ def _setupAtlasThreadedJob():
svcMgr.StatusCodeSvc.AbortOnError = False svcMgr.StatusCodeSvc.AbortOnError = False
nThreads = jps.ConcurrencyFlags.NumThreads() nThreads = jps.ConcurrencyFlags.NumThreads()
numStores = nThreads numStores = jps.ConcurrencyFlags.NumConcurrentEvents()
numAlgsInFlight = nThreads numAlgsInFlight = nThreads
numThreads = nThreads numThreads = nThreads
......
...@@ -66,6 +66,32 @@ class NumThreads(JobProperty): ...@@ -66,6 +66,32 @@ class NumThreads(JobProperty):
return return
class NumConcurrentEvents(JobProperty):
""" Flag to indicate number of concurrent events, possibly per worker"
"""
statusOn = True
allowedTypes = ['int']
StoredValue = 0
def _do_action(self):
try:
import GaudiHive
except ImportError:
from Logging import log
log.fatal("GaudiHive not in release - can't use --concurrent-events parameter")
import sys, ExitCodes
sys.exit(ExitCodes.IMPORT_ERROR)
if (self.get_Value() < 0):
from Logging import log
log.fatal("Number of concurrent events [%s] cannot be negative",self.get_Value())
import sys, ExitCodes
sys.exit(ExitCodes.CONFIGURATION_ERROR)
return
class DebugWorkers(JobProperty): class DebugWorkers(JobProperty):
""" stops the worker in bootstratp until a SIGUSR1 is recieved. Used as debug hook """ stops the worker in bootstratp until a SIGUSR1 is recieved. Used as debug hook
""" """
...@@ -86,6 +112,7 @@ jobproperties.add_Container(ConcurrencyFlags) ...@@ -86,6 +112,7 @@ jobproperties.add_Container(ConcurrencyFlags)
list_jobproperties = [ list_jobproperties = [
NumProcs, NumProcs,
NumThreads, NumThreads,
NumConcurrentEvents,
DebugWorkers, DebugWorkers,
] ]
......
...@@ -195,13 +195,26 @@ if not opts.minimal: ...@@ -195,13 +195,26 @@ if not opts.minimal:
jps.ConcurrencyFlags.DebugWorkers = True jps.ConcurrencyFlags.DebugWorkers = True
_msg.info (" Workers will pause after fork until SIGUSR1 signal received") _msg.info (" Workers will pause after fork until SIGUSR1 signal received")
if opts.threads and (opts.threads != 0): if ( (opts.threads and (opts.threads != 0)) or (opts.concurrent_events and (opts.concurrent_events != 0)) ):
jps.ConcurrencyFlags.NumThreads = opts.threads
if (opts.threads == 0 and opts.concurrent_events > 0) :
## num threads = num concurrent evts
jps.ConcurrencyFlags.NumThreads = opts.concurrent_events
jps.ConcurrencyFlags.NumConcurrentEvents = opts.concurrent_events
elif ( opts.threads > 0 and opts.concurrent_events == 0) :
## num concurrent evts = num threads
jps.ConcurrencyFlags.NumThreads = opts.threads
jps.ConcurrencyFlags.NumConcurrentEvents = opts.threads
else :
## both concurrent evts and threads set individually
jps.ConcurrencyFlags.NumThreads = opts.threads
jps.ConcurrencyFlags.NumConcurrentEvents = opts.concurrent_events
if (jps.ConcurrencyFlags.NumProcs() > 0) : if (jps.ConcurrencyFlags.NumProcs() > 0) :
_msg.info ("configuring hybrid AthenaMP/Hive with [%s] concurrent threads per AthenaMP worker", opts.threads) _msg.info ("configuring hybrid AthenaMP/AthenaMT with [%s] concurrent threads and [%s] concurrent events per AthenaMP worker", jps.ConcurrencyFlags.NumThreads, jps.ConcurrencyFlags.NumConcurrentEvents)
elif (jps.ConcurrencyFlags.NumProcs() == 0) : elif (jps.ConcurrencyFlags.NumProcs() == 0) :
_msg.info ("configuring AthenaHive with [%s] concurrent threads", opts.threads) _msg.info ("configuring AthenaHive with [%s] concurrent threads and [%s] concurrent events", jps.ConcurrencyFlags.NumThreads(), jps.ConcurrencyFlags.NumConcurrentEvents())
else: else:
# we should never get here # we should never get here
_msg.error ("ConcurrencyFlags.NumProcs() cannot == -1 !!") _msg.error ("ConcurrencyFlags.NumProcs() cannot == -1 !!")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment