From cd69c4944c7f149cf0937832561e1c69c9b87d00 Mon Sep 17 00:00:00 2001
From: TJ Khoo <khoo@cern.ch>
Date: Thu, 22 Nov 2018 19:18:22 +0100
Subject: [PATCH] Migrate setup from AtlasThreadedJob to a ComponentAccumulator
   * Adds MainServicesThreadedCfg, which builds on the serial config.   * This
 requires a different value for the EventLoopMgr property of     theApp, which
 could maybe be handled more elegantly.

---
 .../python/AllConfigFlags.py                  | 10 ++-
 .../python/MainServicesConfig.py              | 87 ++++++++++++++++++-
 2 files changed, 92 insertions(+), 5 deletions(-)

diff --git a/Control/AthenaConfiguration/python/AllConfigFlags.py b/Control/AthenaConfiguration/python/AllConfigFlags.py
index fff969b827b..b4ab8c54cb1 100644
--- a/Control/AthenaConfiguration/python/AllConfigFlags.py
+++ b/Control/AthenaConfiguration/python/AllConfigFlags.py
@@ -13,7 +13,15 @@ def _createCfgFlags():
     acf.addFlag('Input.isMC', lambda prevFlags : GetFileMD(prevFlags.Input.Files).get("isMC",None)) # former global.isMC
     acf.addFlag('Input.RunNumber', lambda prevFlags : list(GetFileMD(prevFlags.Input.Files).get("RunNumber",None))) # former global.RunNumber
     acf.addFlag('Input.ProjectName', lambda prevFlags : GetFileMD(prevFlags.Input.Files).get("Project","data17_13TeV") ) # former global.ProjectName
-    
+
+    acf.addFlag('Concurrency.NumProcs', 0)
+    acf.addFlag('Concurrency.NumThreads', 0)
+    acf.addFlag('Concurrency.NumConcurrentEvents', 0)
+
+    acf.addFlag('Scheduler.CheckDependencies', True)
+    acf.addFlag('Scheduler.ShowDataDeps', False)
+    acf.addFlag('Scheduler.ShowDataFlow', False)
+    acf.addFlag('Scheduler.ShowControlFlow', False)
 
     acf.addFlag('Common.isOnline', False ) #  Job runs in an online environment (access only to resources available at P1) # former global.isOnline
 
diff --git a/Control/AthenaConfiguration/python/MainServicesConfig.py b/Control/AthenaConfiguration/python/MainServicesConfig.py
index e34e8275a72..87703e1f0e5 100644
--- a/Control/AthenaConfiguration/python/MainServicesConfig.py
+++ b/Control/AthenaConfiguration/python/MainServicesConfig.py
@@ -1,12 +1,12 @@
 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
 from AthenaCommon.AlgSequence import AthSequencer
 
-def MainServicesMiniCfg():
+def MainServicesMiniCfg(LoopMgr='AthenaEventLoopMgr'):
     #Mininmal basic config, just good enough for HelloWorld and alike
     cfg=ComponentAccumulator()
     cfg.setAppProperty('TopAlg',['AthSequencer/AthAlgSeq']) #Just one sequence, no nesting
     cfg.setAppProperty('MessageSvcType', 'MessageSvc')
-    cfg.setAppProperty('EventLoop', 'AthenaEventLoopMgr') 
+    cfg.setAppProperty('EventLoop', LoopMgr) 
     cfg.setAppProperty('ExtSvcCreates', 'False')
     cfg.setAppProperty('JobOptionsSvcType', 'JobOptionsSvc')
     cfg.setAppProperty('JobOptionsType', 'NONE')
@@ -15,9 +15,9 @@ def MainServicesMiniCfg():
     return cfg
 
 
-def MainServicesSerialCfg():
+def MainServicesSerialCfg(LoopMgr='AthenaEventLoopMgr'):
     cfg=ComponentAccumulator("AthMasterSeq")
-    cfg.merge(MainServicesMiniCfg())
+    cfg.merge(MainServicesMiniCfg(LoopMgr))
     cfg.setAppProperty('TopAlg',['AthSequencer/AthMasterSeq'],overwrite=True)
     cfg.setAppProperty('OutStreamType', 'AthenaOutputStream')    
     
@@ -57,3 +57,82 @@ def MainServicesSerialCfg():
     cfg.setAppProperty('InitializationLoopCheck',False)
     return cfg
     
+
+def MainServicesThreadedCfg(cfgFlags):
+    # Neater ways to set the loop manager? Can't be altered
+    # after setting up the 
+    cfg = MainServicesSerialCfg("AthenaHiveEventLoopMgr")
+
+    # Run a serial job for threads=0
+    if cfgFlags.Concurrency.NumThreads==0:
+        return cfg
+
+    # Migrated code from AtlasThreadedJob.py
+    from AthenaCommon.Constants import VERBOSE, DEBUG, INFO, ERROR
+    from GaudiCoreSvc.GaudiCoreSvcConf import MessageSvc
+    from GaudiSvc.GaudiSvcConf import StatusCodeSvc, AuditorSvc
+
+    msgsvc = MessageSvc()
+    msgsvc.defaultLimit = 0 
+    msgFmt = "% F%40W%S%4W%e%s%7W%R%T %0W%M"
+    msgsvc.Format = msgFmt
+    cfg.addService(msgsvc)
+
+    scsvc = StatusCodeSvc()
+    scsvc.AbortOnError = False
+    cfg.addService(scsvc)
+    cfg.setAppProperty('StatusCodeCheck',False)
+
+    from StoreGate.StoreGateConf import SG__HiveMgrSvc
+    hivesvc = SG__HiveMgrSvc("EventDataSvc")
+    hivesvc.NSlots = cfgFlags.Concurrency.NumConcurrentEvents
+    cfg.addService( hivesvc )
+
+    import StoreGate.StoreGateConf as StoreGateConf
+    cfg.addService( StoreGateConf.StoreGateSvc("ConditionStore") )
+
+    from GaudiHive.GaudiHiveConf import AlgResourcePool
+    arp=AlgResourcePool( OutputLevel = INFO );
+    arp.TopAlg=["AthMasterSeq"] #this should enable control flow
+    cfg.addService( arp )
+
+    from GaudiHive.GaudiHiveConf import AvalancheSchedulerSvc
+    scheduler = AvalancheSchedulerSvc()
+    scheduler.CheckDependencies    = cfgFlags.Scheduler.CheckDependencies
+    scheduler.ShowDataDependencies = cfgFlags.Scheduler.ShowDataDeps
+    scheduler.ShowDataFlow         = cfgFlags.Scheduler.ShowDataFlow
+    scheduler.ShowControlFlow      = cfgFlags.Scheduler.ShowControlFlow
+    scheduler.ThreadPoolSize       = cfgFlags.Concurrency.NumThreads
+    cfg.addService(scheduler)
+
+    from SGComps.SGCompsConf import SGInputLoader
+    # FailIfNoProxy=False makes it a warning, not an error, if unmet data
+    # dependencies are not found in the store.  It should probably be changed
+    # to True eventually.
+    inputloader = SGInputLoader (FailIfNoProxy = False)
+    cfg.addEventAlgo( inputloader )
+    scheduler.DataLoaderAlg = inputloader.getName()
+
+    from AthenaServices.AthenaServicesConf import AthenaHiveEventLoopMgr
+
+    elmgr = AthenaHiveEventLoopMgr()
+    elmgr.WhiteboardSvc = "EventDataSvc"
+    elmgr.SchedulerSvc = scheduler.getName()
+    cfg.addService( elmgr )
+
+    # enable timeline recording
+    from GaudiHive.GaudiHiveConf import TimelineSvc
+    cfg.addService( TimelineSvc( RecordTimeline = True, Partial = False ) )
+    
+    #
+    ## Setup SGCommitAuditor to sweep new DataObjects at end of Alg execute
+    #
+    
+    auditorsvc = AuditorSvc()
+    from SGComps.SGCompsConf import SGCommitAuditor
+    auditorsvc += SGCommitAuditor()
+    cfg.addService( auditorsvc )
+    cfg.setAppProperty("AuditAlgorithms", True)
+
+    return cfg
+    
-- 
GitLab