diff --git a/Control/AthenaCommon/python/AthOptionsParser.py b/Control/AthenaCommon/python/AthOptionsParser.py index ddd8a529b1a312a97cae6daa05a40b9f0bb4434e..e9c6d169d94b3d246467340203a535603fd86436 100644 --- a/Control/AthenaCommon/python/AthOptionsParser.py +++ b/Control/AthenaCommon/python/AthOptionsParser.py @@ -34,7 +34,7 @@ _userlongopts = [ "debugWorker", "pycintex_minvmem=", "cppyy_minvmem", "minimal", # private, undocumented - "threads=", + "threads=", "concurrent-events=", "evtMax=", #will set theApp.EvtMax just before theApp.run() in runbatch.py "skipEvents=",#will set svcMgr.EventSelector.SkipEvents just before theApp.run() in runbatch.py "filesInput=" #will set the AthenaCommonFlags.FilesInput job option and lock it @@ -96,8 +96,9 @@ Accepted command line options: --stdcmalloc ... use libc malloc for memory allocation --preloadlib=<lib> ... localized preload of library <lib> --nprocs=n ... enable AthenaMP if n>=1 or n==-1 - --threads=n ... number of threads for Hive. + --threads=n ... number of threads for AthenaMT With AthenaMP, number of threads per worker + --concurrent-events ... number of concurrent events for AthenaMT --debugWorker ... pause AthenaMP workers at bootstrap until SIGUSR1 signal received [<file1>.py [<file2>.py [...]]] ... scripts to run """ @@ -152,7 +153,8 @@ def parse(chk_tcmalloc=True): opts.do_heap_mon = False # default is not to do any heap monitoring opts.profile_python = None # set to file name to collect and dump python profile opts.nprocs = 0 # enable AthenaMP if >= 1 or == -1 - opts.threads = 0 # enable Hive if >= 1 + opts.threads = 0 # enable AthenaMT if >= 1 + opts.concurrent_events = 0 # enable AthenaMT if >= 1 opts.debug_worker = False # pause AthenaMP worker after bootstrap until SIGUSR1 received opts.cppyy_minvmem = None # artificial vmem bump around cppyy's import opts.minimal = False # private, undocumented @@ -398,6 +400,15 @@ def parse(chk_tcmalloc=True): _help_and_exit() opts.threads = arg + elif opt in ("--concurrent-events",): + if not arg: + arg = 0 + try: arg = int(arg) + except Exception,err: + print "ERROR:",err + _help_and_exit() + opts.concurrent_events = arg + elif opt in ("--debugWorker",): opts.debug_worker = True diff --git a/Control/AthenaCommon/python/AtlasThreadedJob.py b/Control/AthenaCommon/python/AtlasThreadedJob.py index d56a9314920c14061d85085599b743f79abd07c7..185b0516ef289d7c37d27bffa31a7733a732f99a 100644 --- a/Control/AthenaCommon/python/AtlasThreadedJob.py +++ b/Control/AthenaCommon/python/AtlasThreadedJob.py @@ -29,7 +29,7 @@ def _setupAtlasThreadedJob(): svcMgr.StatusCodeSvc.AbortOnError = False nThreads = jps.ConcurrencyFlags.NumThreads() - numStores = nThreads + numStores = jps.ConcurrencyFlags.NumConcurrentEvents() numAlgsInFlight = nThreads numThreads = nThreads diff --git a/Control/AthenaCommon/python/ConcurrencyFlags.py b/Control/AthenaCommon/python/ConcurrencyFlags.py index 00942a5ba5a5cc5eac971bc4452eeb51d37a9213..a4ebbf3b3acf2c6f9f105743c14240a4d03a56af 100644 --- a/Control/AthenaCommon/python/ConcurrencyFlags.py +++ b/Control/AthenaCommon/python/ConcurrencyFlags.py @@ -66,6 +66,32 @@ class NumThreads(JobProperty): return + +class NumConcurrentEvents(JobProperty): + """ Flag to indicate number of concurrent events, possibly per worker" + """ + statusOn = True + allowedTypes = ['int'] + StoredValue = 0 + + def _do_action(self): + try: + import GaudiHive + except ImportError: + from Logging import log + log.fatal("GaudiHive not in release - can't use --concurrent-events parameter") + import sys, ExitCodes + sys.exit(ExitCodes.IMPORT_ERROR) + + if (self.get_Value() < 0): + from Logging import log + log.fatal("Number of concurrent events [%s] cannot be negative",self.get_Value()) + import sys, ExitCodes + sys.exit(ExitCodes.CONFIGURATION_ERROR) + + return + + class DebugWorkers(JobProperty): """ stops the worker in bootstratp until a SIGUSR1 is recieved. Used as debug hook """ @@ -86,6 +112,7 @@ jobproperties.add_Container(ConcurrencyFlags) list_jobproperties = [ NumProcs, NumThreads, + NumConcurrentEvents, DebugWorkers, ] diff --git a/Control/AthenaCommon/share/Preparation.py b/Control/AthenaCommon/share/Preparation.py index 8a8f6245b5b08d7365c4000b7e159590aa1a60c7..93bb8875d36f81e72f90f05f45d7ad636299fb85 100644 --- a/Control/AthenaCommon/share/Preparation.py +++ b/Control/AthenaCommon/share/Preparation.py @@ -195,13 +195,26 @@ if not opts.minimal: jps.ConcurrencyFlags.DebugWorkers = True _msg.info (" Workers will pause after fork until SIGUSR1 signal received") - if opts.threads and (opts.threads != 0): - jps.ConcurrencyFlags.NumThreads = opts.threads + if ( (opts.threads and (opts.threads != 0)) or (opts.concurrent_events and (opts.concurrent_events != 0)) ): + + if (opts.threads == 0 and opts.concurrent_events > 0) : + ## num threads = num concurrent evts + jps.ConcurrencyFlags.NumThreads = opts.concurrent_events + jps.ConcurrencyFlags.NumConcurrentEvents = opts.concurrent_events + elif ( opts.threads > 0 and opts.concurrent_events == 0) : + ## num concurrent evts = num threads + jps.ConcurrencyFlags.NumThreads = opts.threads + jps.ConcurrencyFlags.NumConcurrentEvents = opts.threads + else : + ## both concurrent evts and threads set individually + jps.ConcurrencyFlags.NumThreads = opts.threads + jps.ConcurrencyFlags.NumConcurrentEvents = opts.concurrent_events + if (jps.ConcurrencyFlags.NumProcs() > 0) : - _msg.info ("configuring hybrid AthenaMP/Hive with [%s] concurrent threads per AthenaMP worker", opts.threads) + _msg.info ("configuring hybrid AthenaMP/AthenaMT with [%s] concurrent threads and [%s] concurrent events per AthenaMP worker", jps.ConcurrencyFlags.NumThreads, jps.ConcurrencyFlags.NumConcurrentEvents) elif (jps.ConcurrencyFlags.NumProcs() == 0) : - _msg.info ("configuring AthenaHive with [%s] concurrent threads", opts.threads) + _msg.info ("configuring AthenaHive with [%s] concurrent threads and [%s] concurrent events", jps.ConcurrencyFlags.NumThreads(), jps.ConcurrencyFlags.NumConcurrentEvents()) else: # we should never get here _msg.error ("ConcurrencyFlags.NumProcs() cannot == -1 !!")