Skip to content
Snippets Groups Projects
Commit cf09d359 authored by Vakhtang Tsulaia's avatar Vakhtang Tsulaia
Browse files

Merge branch 'dev/master/job-prop_num-conc-evts' into 'master'

Add command line option "--concurrent-events" for AthenaMT

See merge request !4718
parents b91adc59 f829830e
No related merge requests found
......@@ -34,7 +34,7 @@ _userlongopts = [
"debugWorker",
"pycintex_minvmem=", "cppyy_minvmem",
"minimal", # private, undocumented
"threads=",
"threads=", "concurrent-events=",
"evtMax=", #will set theApp.EvtMax just before theApp.run() in runbatch.py
"skipEvents=",#will set svcMgr.EventSelector.SkipEvents just before theApp.run() in runbatch.py
"filesInput=" #will set the AthenaCommonFlags.FilesInput job option and lock it
......@@ -96,8 +96,9 @@ Accepted command line options:
--stdcmalloc ... use libc malloc for memory allocation
--preloadlib=<lib> ... localized preload of library <lib>
--nprocs=n ... enable AthenaMP if n>=1 or n==-1
--threads=n ... number of threads for Hive.
--threads=n ... number of threads for AthenaMT
With AthenaMP, number of threads per worker
--concurrent-events ... number of concurrent events for AthenaMT
--debugWorker ... pause AthenaMP workers at bootstrap until SIGUSR1 signal received
[<file1>.py [<file2>.py [...]]] ... scripts to run
"""
......@@ -152,7 +153,8 @@ def parse(chk_tcmalloc=True):
opts.do_heap_mon = False # default is not to do any heap monitoring
opts.profile_python = None # set to file name to collect and dump python profile
opts.nprocs = 0 # enable AthenaMP if >= 1 or == -1
opts.threads = 0 # enable Hive if >= 1
opts.threads = 0 # enable AthenaMT if >= 1
opts.concurrent_events = 0 # enable AthenaMT if >= 1
opts.debug_worker = False # pause AthenaMP worker after bootstrap until SIGUSR1 received
opts.cppyy_minvmem = None # artificial vmem bump around cppyy's import
opts.minimal = False # private, undocumented
......@@ -398,6 +400,15 @@ def parse(chk_tcmalloc=True):
_help_and_exit()
opts.threads = arg
elif opt in ("--concurrent-events",):
if not arg:
arg = 0
try: arg = int(arg)
except Exception,err:
print "ERROR:",err
_help_and_exit()
opts.concurrent_events = arg
elif opt in ("--debugWorker",):
opts.debug_worker = True
......
......@@ -29,7 +29,7 @@ def _setupAtlasThreadedJob():
svcMgr.StatusCodeSvc.AbortOnError = False
nThreads = jps.ConcurrencyFlags.NumThreads()
numStores = nThreads
numStores = jps.ConcurrencyFlags.NumConcurrentEvents()
numAlgsInFlight = nThreads
numThreads = nThreads
......
......@@ -66,6 +66,32 @@ class NumThreads(JobProperty):
return
class NumConcurrentEvents(JobProperty):
""" Flag to indicate number of concurrent events, possibly per worker"
"""
statusOn = True
allowedTypes = ['int']
StoredValue = 0
def _do_action(self):
try:
import GaudiHive
except ImportError:
from Logging import log
log.fatal("GaudiHive not in release - can't use --concurrent-events parameter")
import sys, ExitCodes
sys.exit(ExitCodes.IMPORT_ERROR)
if (self.get_Value() < 0):
from Logging import log
log.fatal("Number of concurrent events [%s] cannot be negative",self.get_Value())
import sys, ExitCodes
sys.exit(ExitCodes.CONFIGURATION_ERROR)
return
class DebugWorkers(JobProperty):
""" stops the worker in bootstratp until a SIGUSR1 is recieved. Used as debug hook
"""
......@@ -86,6 +112,7 @@ jobproperties.add_Container(ConcurrencyFlags)
list_jobproperties = [
NumProcs,
NumThreads,
NumConcurrentEvents,
DebugWorkers,
]
......
......@@ -195,13 +195,26 @@ if not opts.minimal:
jps.ConcurrencyFlags.DebugWorkers = True
_msg.info (" Workers will pause after fork until SIGUSR1 signal received")
if opts.threads and (opts.threads != 0):
jps.ConcurrencyFlags.NumThreads = opts.threads
if ( (opts.threads and (opts.threads != 0)) or (opts.concurrent_events and (opts.concurrent_events != 0)) ):
if (opts.threads == 0 and opts.concurrent_events > 0) :
## num threads = num concurrent evts
jps.ConcurrencyFlags.NumThreads = opts.concurrent_events
jps.ConcurrencyFlags.NumConcurrentEvents = opts.concurrent_events
elif ( opts.threads > 0 and opts.concurrent_events == 0) :
## num concurrent evts = num threads
jps.ConcurrencyFlags.NumThreads = opts.threads
jps.ConcurrencyFlags.NumConcurrentEvents = opts.threads
else :
## both concurrent evts and threads set individually
jps.ConcurrencyFlags.NumThreads = opts.threads
jps.ConcurrencyFlags.NumConcurrentEvents = opts.concurrent_events
if (jps.ConcurrencyFlags.NumProcs() > 0) :
_msg.info ("configuring hybrid AthenaMP/Hive with [%s] concurrent threads per AthenaMP worker", opts.threads)
_msg.info ("configuring hybrid AthenaMP/AthenaMT with [%s] concurrent threads and [%s] concurrent events per AthenaMP worker", jps.ConcurrencyFlags.NumThreads, jps.ConcurrencyFlags.NumConcurrentEvents)
elif (jps.ConcurrencyFlags.NumProcs() == 0) :
_msg.info ("configuring AthenaHive with [%s] concurrent threads", opts.threads)
_msg.info ("configuring AthenaHive with [%s] concurrent threads and [%s] concurrent events", jps.ConcurrencyFlags.NumThreads(), jps.ConcurrencyFlags.NumConcurrentEvents())
else:
# we should never get here
_msg.error ("ConcurrencyFlags.NumProcs() cannot == -1 !!")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment