diff --git a/Control/AthenaCommon/python/AthOptionsParser.py b/Control/AthenaCommon/python/AthOptionsParser.py index 796884efb5529966da0d0b1c59cbef8b8969665b..12ea16d825a68316dcceada1f15bff129cd0ace0 100644 --- a/Control/AthenaCommon/python/AthOptionsParser.py +++ b/Control/AthenaCommon/python/AthOptionsParser.py @@ -4,7 +4,7 @@ # @purpose the central module to parse command line options of athena.py # @date December 2009 -__version__ = "$Revision: 596655 $" +__version__ = "$Revision: 672545 $" __doc__ = "a module to parse command line options for athena.py" __author__ = "Sebastien Binet" @@ -31,8 +31,10 @@ _userlongopts = [ "dump-configuration=", "tcmalloc", "stdcmalloc", "preloadlib=", "nprocs=", + "debugWorker", "pycintex_minvmem=", "cppyy_minvmem", "minimal", # private, undocumented + "threads=", ] _allowed_values = { @@ -86,6 +88,9 @@ Accepted command line options: --stdcmalloc ... use libc malloc for memory allocation --preloadlib=<lib> ... localized preload of library <lib> --nprocs=n ... enable AthenaMP if n>=1 or n==-1 + --threads=n ... number of threads for Hive. + With AthenaMP, number of threads per worker + --debugWorker ... pause AthenaMP workers at bootstrap until SIGUSR1 signal received [<file1>.py [<file2>.py [...]]] ... scripts to run """ @@ -138,6 +143,8 @@ def parse(chk_tcmalloc=True): opts.memchk_mode = '' # no mode selected by default opts.do_heap_mon = False # default is not to do any heap monitoring opts.nprocs = 0 # enable AthenaMP if >= 1 or == -1 + opts.threads = 0 # enable Hive if >= 1 + opts.debug_worker = False # pause AthenaMP worker after bootstrap until SIGUSR1 received opts.cppyy_minvmem = None # artificial vmem bump around cppyy's import opts.minimal = False # private, undocumented opts.user_opts = [] # left-over opts after '-' @@ -365,12 +372,26 @@ def parse(chk_tcmalloc=True): _help_and_exit() opts.nprocs = arg + elif opt in ("--threads",): + if not arg: + arg = 0 + try: arg = int(arg) + except Exception,err: + print "ERROR:",err + _help_and_exit() + opts.threads = arg + + elif opt in ("--debugWorker",): + opts.debug_worker = True + # overwrite nprovs if ATHENA_PROC_NUMBER is set envNProcs = os.getenv('ATHENA_PROC_NUMBER') if envNProcs : envNProcs = int(envNProcs) print "ATHENA_PROC_NUMBER set to ", envNProcs, " will run by default with --nprocs=", envNProcs opts.nprocs = envNProcs # enable AthenaMP if >= 1 or == -1 + from ConcurrencyFlags import jobproperties as jps + jps.ConcurrencyFlags.NumProcs = envNProcs # for the benefit of PyROOT if not opts.display and not '-b' in sys.argv: diff --git a/Control/AthenaCommon/python/AtlasThreadedJob.py b/Control/AthenaCommon/python/AtlasThreadedJob.py new file mode 100644 index 0000000000000000000000000000000000000000..a09b0ae72351afe414de47a69e92d40e13c76a89 --- /dev/null +++ b/Control/AthenaCommon/python/AtlasThreadedJob.py @@ -0,0 +1,77 @@ +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +## @file AtlasThreadedJob.py +## @brief py-module to configure the Athena AppMgr for threaded (Hive) jobs +## @author Charles Leggett +############################################################### + +def _setupAtlasThreadedJob(): + from AppMgr import theApp + from AppMgr import ServiceMgr as svcMgr + + import SystemOfUnits as Units + from Constants import VERBOSE, DEBUG, INFO, ERROR + + from ConcurrencyFlags import jobproperties as jps + + if (jps.ConcurrencyFlags.NumProcs() == 0) : + theApp.MessageSvcType = "InertMessageSvc" + else: + # InertMessageSvc doesn't play nice with MP + theApp.MessageSvcType = "MessageSvc" + + svcMgr.MessageSvc.defaultLimit = 0 + msgFmt = "% F%40W%S%4W%e%s%7W%R%T %0W%M" + svcMgr.MessageSvc.Format = msgFmt + + theApp.StatusCodeCheck = False + + from AthenaServices.AthenaServicesConf import AthenaHiveEventLoopMgr + + svcMgr += AthenaHiveEventLoopMgr() + svcMgr.AthenaHiveEventLoopMgr.WhiteboardSvc = "EventDataSvc" +# svcMgr.AthenaHiveEventLoopMgr.OutputLevel = INFO + + theApp.EventLoop = "AthenaHiveEventLoopMgr" + + svcMgr.StatusCodeSvc.AbortOnError = False + + nThreads = jps.ConcurrencyFlags.NumThreads() + numStores = nThreads + numAlgsInFlight = nThreads + numThreads = nThreads + + from StoreGate.StoreGateConf import SG__HiveMgrSvc + svcMgr += SG__HiveMgrSvc("EventDataSvc") + svcMgr.EventDataSvc.NSlots = numStores +# svcMgr.EventDataSvc.OutputLevel = INFO + + from GaudiHive.GaudiHiveConf import AlgResourcePool + svcMgr += AlgResourcePool( OutputLevel = INFO ); + + from GaudiHive.GaudiHiveConf import ForwardSchedulerSvc + svcMgr += ForwardSchedulerSvc() + svcMgr.ForwardSchedulerSvc.OutputLevel = INFO + svcMgr.ForwardSchedulerSvc.MaxEventsInFlight = numStores + svcMgr.ForwardSchedulerSvc.MaxAlgosInFlight = numAlgsInFlight + svcMgr.ForwardSchedulerSvc.ThreadPoolSize = numThreads + + # enable timeline recording + from GaudiHive.GaudiHiveConf import TimelineSvc + svcMgr += TimelineSvc( RecordTimeline = True, Partial = False ); + + # + ## Setup SGCommitAuditor to sweep new DataObjects at end of Alg execute + # + + theAuditorSvc = svcMgr.AuditorSvc + theApp.AuditAlgorithms=True + from SGComps.SGCompsConf import SGCommitAuditor + theAuditorSvc += SGCommitAuditor() + + +## load basic services configuration at module import +_setupAtlasThreadedJob() + +## clean-up: avoid running multiple times this method +del _setupAtlasThreadedJob diff --git a/Control/AthenaCommon/python/ConcurrencyFlags.py b/Control/AthenaCommon/python/ConcurrencyFlags.py new file mode 100644 index 0000000000000000000000000000000000000000..00942a5ba5a5cc5eac971bc4452eeb51d37a9213 --- /dev/null +++ b/Control/AthenaCommon/python/ConcurrencyFlags.py @@ -0,0 +1,97 @@ +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +#======================================================================= +# File: AthenaCommon/python/ConcurrencyFlags.py +#======================================================================= +""" Flags for concurrency +""" +# +# +__author__ = 'Charles Leggett' +__version__="$Revision: 1.0 $" +__doc__="concurrency flags " + + + + +#======================================================================= +# imports +#======================================================================= +from JobProperties import JobProperty, JobPropertyContainer +from JobProperties import jobproperties + +class NumProcs(JobProperty): + """ Flag to indicate number of parallel workers" + """ + statusOn = True + allowedTypes = ['int'] + StoredValue = 0 + def _do_action(self): + import multiprocessing + if (self.get_Value() == -1): + self.set_Value( multiprocessing.cpu_count() ) + elif ( self.get_Value() < -1 ) : + from Logging import log + log.fatal("nprocs cannot be < -1") + import sys, ExitCodes + sys.exit(ExitCodes.CONFIGURATION_ERROR) + elif (self.get_Value() > multiprocessing.cpu_count()): + from Logging import log + log.warning("nprocs is larger than core count [%s]!", + multiprocessing.cpu_count()) + + return + +class NumThreads(JobProperty): + """ Flag to indicate number of concurrent threads, possibly per worker" + """ + statusOn = True + allowedTypes = ['int'] + StoredValue = 0 + + def _do_action(self): + try: + import GaudiHive + except ImportError: + from Logging import log + log.fatal("GaudiHive not in release - can't use --threads parameter") + import sys, ExitCodes + sys.exit(ExitCodes.IMPORT_ERROR) + + if (self.get_Value() < 0): + from Logging import log + log.fatal("Number of threads [%s] cannot be negative",self.get_Value()) + import sys, ExitCodes + sys.exit(ExitCodes.CONFIGURATION_ERROR) + + return + +class DebugWorkers(JobProperty): + """ stops the worker in bootstratp until a SIGUSR1 is recieved. Used as debug hook + """ + statusOn = True + allowedTypes = ['bool'] + StoredValue = False + + +class ConcurrencyFlags(JobPropertyContainer): + """ The global ConcurrencyFlags job property container. + """ + pass + +# add the concurrency flags container to the top container +jobproperties.add_Container(ConcurrencyFlags) + +# I want always the following flags in the Concurrency container +list_jobproperties = [ + NumProcs, + NumThreads, + DebugWorkers, +] + +for i in list_jobproperties: + jobproperties.ConcurrencyFlags.add_JobProperty(i) + +del list_jobproperties + +#======================================================================= diff --git a/Control/AthenaCommon/python/ConfigurableDb.py b/Control/AthenaCommon/python/ConfigurableDb.py index fc24d137fc99cdbc7361e64c73ba5e1425e90781..6123eb2a42eb07eaf38d93638213389af8fbc3d5 100755 --- a/Control/AthenaCommon/python/ConfigurableDb.py +++ b/Control/AthenaCommon/python/ConfigurableDb.py @@ -173,6 +173,8 @@ def loadConfigurableDb(): try: cfgDb._loadModule( confDb ) except Exception, err: + import traceback + traceback.print_exc() log.warning( "Could not load file [%s] !", confDb ) log.warning( "Reason: %s", err ) nFiles += 1 diff --git a/Control/AthenaCommon/python/ConfiguredFactory.py b/Control/AthenaCommon/python/ConfiguredFactory.py index 7514f03d2af7d2d3978189f1a9515be3c735fe58..19c61c285caf0081cbf5e8fadb2e0b420c70e0ef 100644 --- a/Control/AthenaCommon/python/ConfiguredFactory.py +++ b/Control/AthenaCommon/python/ConfiguredFactory.py @@ -1255,12 +1255,14 @@ class ConfiguredFactory(object): self.logger().debug( "importing configDb modules..." ) nFiles = 0 startTime = time.time() - for path in sys.path: + + paths = [p for p in sys.path if p.endswith ('InstallArea/python')] + if not paths: paths = sys.path + + for path in paths: if not os.path.exists( path ): continue - - if not path.endswith( "InstallArea/python" ): - continue + if not os.path.isdir (path): continue self.logger().verbose( "searching in %s..." % path ) diff --git a/Control/AthenaCommon/python/DetFlags.py b/Control/AthenaCommon/python/DetFlags.py index a98506465ab29bb86217606ee8e736021dfc16a6..711595930d4bd036ef3e599d85cc053dac34ea53 100755 --- a/Control/AthenaCommon/python/DetFlags.py +++ b/Control/AthenaCommon/python/DetFlags.py @@ -3,7 +3,7 @@ # DetFlags : Author Tadashi Maeno # Mods: David Rousseau, Paolo Calafiura, M. Gallas ( more flags and # tasks) -# detectors : ID = bpipe pixel SCT TRT BCM +# detectors : ID = bpipe pixel SCT TRT BCM DBM # Forward = Lucid ZDC ALFA AFP FwdRegion # LAr = em HEC FCal # Calo = em HEC FCal Tile @@ -26,6 +26,7 @@ # writeRDOPool : write RDO in pool # readRIOPool : read RIO from pool # writeRIOPool : write RIO in pool +# overlay : overlay setup # # Some tasks are OR of other classes. They can only be used to test, not to set. @@ -38,7 +39,7 @@ # tasks can be switched on/off for all detectors: # DetFlags.geometry.all_setOn() (or all_setOff() # a task for a given subdetector can be switched on/off -# DetFlags.readRDO.Pixel_setOn() (or setOff() ) +# DetFlags.readRDO.pixel_setOn() (or setOff() ) # setting off/on a group of subdetectors (e.g. LAr or Muon) switch off/on # all the corresponding subdetectors # a subdetector can be switched on/off for all tasks: @@ -67,6 +68,7 @@ class DetFlags: self._flag_SCT = False self._flag_TRT = False self._flag_BCM = False + self._flag_DBM = False # Forward self._flag_Lucid = False self._flag_ZDC = False @@ -137,12 +139,14 @@ class DetFlags: self.SCT_setOn() self.TRT_setOn() self.BCM_setOn() + self.DBM_setOn() def ID_setOff (self): self.bpipe_setOff() self.pixel_setOff() self.SCT_setOff() self.TRT_setOff() self.BCM_setOff() + self.DBM_setOff() def Calo_setOn (self): self.em_setOn() self.HEC_setOn() @@ -200,9 +204,9 @@ class DetFlags: def Forward_allOn (self): return self.ALFA_on() & self.ZDC_on() & self.Lucid_on() & self.AFP_on() & self.FwdRegion_on() def ID_on (self): - return self.bpipe_on() | self.pixel_on() | self.SCT_on() | self.TRT_on() | self.BCM_on() + return self.bpipe_on() | self.pixel_on() | self.SCT_on() | self.TRT_on() | self.BCM_on() | self.DBM_on() def ID_allOn (self): - return self.bpipe_on() & self.pixel_on() & self.SCT_on() & self.TRT_on() & self.BCM_on() + return self.bpipe_on() & self.pixel_on() & self.SCT_on() & self.TRT_on() & self.BCM_on() & self.DBM_on() def LAr_on (self): return self.em_on() | self.HEC_on() | self.FCal_on() def LAr_allOn (self): @@ -269,6 +273,7 @@ class DetFlags: writeRDOPool = SubDetectors() readRIOPool = SubDetectors() writeRIOPool = SubDetectors() + overlay = SubDetectors() # task list _taskList = [] @@ -287,6 +292,7 @@ class DetFlags: _taskList.append(writeRDOPool) _taskList.append(readRIOPool) _taskList.append(writeRIOPool) + _taskList.append(overlay) # ORed tasks haveRIO = ORedTask(makeRIO) @@ -327,6 +333,10 @@ class DetFlags: cls._setAllTask('BCM','setOn') def BCM_setOff (cls): cls._setAllTask('BCM','setOff') + def DBM_setOn (cls): + cls._setAllTask('DBM','setOn') + def DBM_setOff (cls): + cls._setAllTask('DBM','setOff') def ALFA_setOn (cls): cls._setAllTask('ALFA','setOn') @@ -452,6 +462,8 @@ class DetFlags: return cls._anyTask_on('TRT') def BCM_on (cls): return cls._anyTask_on('BCM') + def DBM_on (cls): + return cls._anyTask_on('DBM') def ALFA_on (cls): return cls._anyTask_on('ALFA') @@ -512,7 +524,7 @@ class DetFlags: # show flags def Print (cls): - id =["bpipe","pixel","SCT","TRT","BCM"] + id =["bpipe","pixel","SCT","TRT","BCM","DBM"] forward=["Lucid", "ZDC", "ALFA", "AFP", "FwdRegion"] calo=["em","HEC","FCal","Tile"] muon=["MDT","CSC","TGC","RPC","sTGC","Micromegas"] @@ -526,7 +538,7 @@ class DetFlags: format = "%13s :" alldets=id+forward+calo+muon+truth+l1+bf for det in alldets: - format += "%6s" + format += "%"+str(max(6,len(det)+1))+"s" #"%10s" item.append(det) # print detectors row print format % tuple(item) @@ -560,6 +572,8 @@ class DetFlags: TRT_setOff = classmethod(TRT_setOff) BCM_setOn = classmethod(BCM_setOn) BCM_setOff = classmethod(BCM_setOff) + DBM_setOn = classmethod(DBM_setOn) + DBM_setOff = classmethod(DBM_setOff) ALFA_setOn = classmethod(ALFA_setOn) ALFA_setOff= classmethod(ALFA_setOff) AFP_setOn = classmethod(AFP_setOn) @@ -615,6 +629,7 @@ class DetFlags: SCT_on = classmethod(SCT_on) TRT_on = classmethod(TRT_on) BCM_on = classmethod(BCM_on) + DBM_on = classmethod(DBM_on) ALFA_on = classmethod(ALFA_on) AFP_on = classmethod(AFP_on) ZDC_on = classmethod(ZDC_on) diff --git a/Control/AthenaCommon/python/ExitCodes.py b/Control/AthenaCommon/python/ExitCodes.py index fb207c4f6d8941331c175cb7b8be2e272525147f..99d77cdf5f507bb4f54b26ea6bc589546e5ec8e9 100755 --- a/Control/AthenaCommon/python/ExitCodes.py +++ b/Control/AthenaCommon/python/ExitCodes.py @@ -42,6 +42,7 @@ DATA_OUTFILE_ERROR = 0x05 # not implemented HISTO_OUTFILE_ERROR = 0x06 # not implemented DATABASE_ERROR = 0x07 # not implemented UNKNOWN_EXCEPTION = 0x08 +CONFIGURATION_ERROR = 0x09 ### helper to get a human-readable string diff --git a/Control/AthenaCommon/python/GlobalFlags.py b/Control/AthenaCommon/python/GlobalFlags.py index 304dabb36f00df1fe8da84431439f0db98c575e0..22d8e6066a455c7ed8f96b0626fecffe794840e1 100755 --- a/Control/AthenaCommon/python/GlobalFlags.py +++ b/Control/AthenaCommon/python/GlobalFlags.py @@ -58,14 +58,14 @@ class DetDescrVersion(JobProperty): # 'Rome-Initial-v00','Rome-Final','DC1-Initial',\ # 'DC1-Final','CTB','Commissioning-Calo','Commissioning-Muon'\ # 'ATLAS-DC3-01','ATLAS-DC3-02','Default'] - StoredValue='ATLAS-GEO-16-00-00' + StoredValue='ATLAS-R2-2015-03-01-00' class ConditionsTag(JobProperty): """ See https://twiki.cern.ch/twiki/bin/view/Atlas/CoolProdTags """ statusOn=True allowedTypes=['str'] - StoredValue='OFLCOND-DR-BS7T-ANom-11' + StoredValue='OFLCOND-RUN12-SDR-25' # class DatabaseInstance(JobProperty): diff --git a/Control/AthenaCommon/python/Include.py b/Control/AthenaCommon/python/Include.py index cb941ef53310fc1c0379daec4b16491bfe4fc133..2c5aac2b12f72474fff89baf426beac4526afae6 100755 --- a/Control/AthenaCommon/python/Include.py +++ b/Control/AthenaCommon/python/Include.py @@ -38,6 +38,7 @@ excludeTracePattern = [ '*/PyUtils/decorator.py', # very verbose '*/PyUtils/Decorators.py', # ditto '*/PyUtils/Helper*.py', # ditto + '*/lib/ROOT.py', # ROOT import hook gets executed very many times ] # unless they are explicitly included here: diff --git a/Control/AthenaCommon/share/Execution.py b/Control/AthenaCommon/share/Execution.py index cac828197597ce948b7b912f618f7a3081ba7049..edbba8c6f3ef2397a25b34f0fd5890c146acfea1 100644 --- a/Control/AthenaCommon/share/Execution.py +++ b/Control/AthenaCommon/share/Execution.py @@ -76,9 +76,8 @@ else: if DbgStage.value == "init": hookDebugger() - + ## setup multi-process running and debugging if not opts.minimal and opts.nprocs and (opts.nprocs >= 1 or opts.nprocs==-1): - _msg.info ("enabling AthenaMP with [%s] sub-workers", opts.nprocs) from AthenaCommon.AppMgr import ServiceMgr as svcMgr import AthenaMP.PyComps as _amppy svcMgr += _amppy.MpEvtLoopMgr(NWorkers=opts.nprocs) diff --git a/Control/AthenaCommon/share/Preparation.py b/Control/AthenaCommon/share/Preparation.py index ef826f26806a5f52d8d111dfe01d5bb057c277e0..0fc0c38f9a98d65ab3e5ff5e012988ff6be71119 100644 --- a/Control/AthenaCommon/share/Preparation.py +++ b/Control/AthenaCommon/share/Preparation.py @@ -189,3 +189,31 @@ if opts.do_leak_chk != False: svcMgr.AuditorSvc += HephaestusAuditor( mode = opts.memchk_mode, auditOn = opts.do_leak_chk ) +## basic job configuration for Hive and AthenaMP +if not opts.minimal: + from AthenaCommon.ConcurrencyFlags import jobproperties as jps + if opts.nprocs and (opts.nprocs >= 1 or opts.nprocs==-1): + jps.ConcurrencyFlags.NumProcs = opts.nprocs + _msg.info ("configuring AthenaMP with [%s] sub-workers", + jps.ConcurrencyFlags.NumProcs()) + + if (opts.debug_worker == True) : + jps.ConcurrencyFlags.DebugWorkers = True + _msg.info (" Workers will pause after fork until SIGUSR1 signal received") + + if opts.threads and (opts.threads != 0): + jps.ConcurrencyFlags.NumThreads = opts.threads + + if (jps.ConcurrencyFlags.NumProcs() > 0) : + _msg.info ("configuring hybrid AthenaMP/Hive with [%s] concurrent threads per AthenaMP worker", opts.threads) + elif (jps.ConcurrencyFlags.NumProcs() == 0) : + _msg.info ("configuring AthenaHive with [%s] concurrent threads", opts.threads) + else: + # we should never get here + _msg.error ("ConcurrencyFlags.NumProcs() cannot == -1 !!") + sys.exit() + + import AthenaCommon.AtlasThreadedJob + + +