Skip to content
Snippets Groups Projects
Forked from atlas / athena
120608 commits behind the upstream repository.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
AlgScheduler.py 5.10 KiB
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration

# Configuration for the Hive Algorithm Scheduler.
#
# Allows easy low level replacement of the specific Scheduler, without
# requiring clients to know which one is in use.
#
# the AlgScheduler will be setup with the same number of threads
# as are specified on the command line with the "--threads=N" parameter
#
# usage:
#  from AthenaCommon.AlgScheduler import AlgScheduler
# clients can then configure runtime printouts, threadpool, etc:
#  AlgScheduler.ShowDataDependencies( True )
#  AlgScheduler.ShowControlFlow( True )
#  AlgScheduler.ShowDataFlow( True )
#  AlgScheduler.setThreadPoolSize( 7 )
#
# if a specific scheduler lacks that option, a warning message is printed
# clients can also replace the default scheduler with another one
#  from GaudiHive.GaudiHiveConf import ForwardSchedulerSvc
#  myScheduler = ForwardSchedulerSvc()
#  AlgScheduler.SetScheduler( myScheduler )
#  AlgScheduler.setThreadPoolSize( 7 )
#
# if this is done, the HiveEventLoopMgr also needs to know about it
#    from AthenaServices.AthenaServicesConf import AthenaHiveEventLoopMgr
#    svcMgr.AthenaHiveEventLoopMgr.SchedulerSvc = AlgScheduler.getScheduler().getName()
#


class AlgScheduler:
    def __init__(self,theSched=None):
        """Setup Algorithm Scheduler"""

        from AppMgr import ServiceMgr as svcMgr
        from Constants import VERBOSE, DEBUG, INFO, ERROR

        from ConcurrencyFlags import jobproperties as jps
        from AthenaCommon.Logging import logging

        self.log = logging.getLogger( 'AlgScheduler' )

        if (theSched == None) :
            from GaudiHive.GaudiHiveConf import AvalancheSchedulerSvc
            svcMgr += AvalancheSchedulerSvc()
            self.SchedulerSvc = svcMgr.AvalancheSchedulerSvc
        else :
            svcMgr += theSched
            self.SchedulerSvc = theSched
            
        self.SchedulerSvc.OutputLevel = INFO
        self.SchedulerSvc.CheckDependencies = True
        self.SchedulerSvc.ThreadPoolSize = jps.ConcurrencyFlags.NumThreads()

        self.log.info("setting up " + self.SchedulerSvc.getFullName() + " with " + str(jps.ConcurrencyFlags.NumThreads()) + " threads")


#
## exchange the current scheduler for another one
    def SetScheduler(self,theSched):
        """setup a different Scheduler"""

        if (self.SchedulerSvc.getFullName() != theSched.getFullName()) :
            self.log.info("replacing " + self.SchedulerSvc.getFullName() 
                          + " with " + theSched.getFullName())
            from AppMgr import ServiceMgr as svcMgr
            svcMgr.remove(self.SchedulerSvc)
            svcMgr += theSched
            self.SchedulerSvc = theSched

#
## change the output level
    def OutputLevel(self,level) :
        self.SchedulerSvc.OutputLevel = level
        
#
## control checking of data deps at beginning of job for unmet input deps
    def CheckDependencies(self,check=True):
        if ( 'CheckDependencies' in self.SchedulerSvc.properties() ):
            self.SchedulerSvc.CheckDependencies = check
        else :
            self.log.warning( self.SchedulerSvc.getFullName() + " has no property \"CheckDependencies\"")

#
## control printout of control flow at beginning of job
    def ShowControlFlow(self,show=True):
        if ( 'ShowControlFlow' in self.SchedulerSvc.properties() ):
            self.SchedulerSvc.ShowControlFlow = show
        else :
            self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowControlFlow\"")

#
## control printout of data flow at beginning of job
    def ShowDataFlow(self,show=True):
        if ( 'ShowDataFlow' in self.SchedulerSvc.properties() ):
            self.SchedulerSvc.ShowDataFlow = show
        else :
            self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowDataFlow\"")

#
## control printout of data dependencies at beginning of job
    def ShowDataDependencies(self,show=True):
        if ( 'ShowDataDependencies' in self.SchedulerSvc.properties() ):
            self.SchedulerSvc.ShowDataDependencies = show
        else :
            self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"ShowDataDependencies\"")


#
## set the DataLoader Algorithm to handle unmet input data deps
    def setDataLoaderAlg(self,dataLoadAlg):
        if ( 'DataLoaderAlg' in self.SchedulerSvc.properties() ):
            self.SchedulerSvc.DataLoaderAlg = dataLoadAlg
        else :
            self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"DataLoaderAlg\"")

#
## enable condition handling
    def EnableConditions(self,enable=True):
        if ( 'EnableConditions' in self.SchedulerSvc.properties() ):
            self.SchedulerSvc.EnableConditions = enable
        else :
            self.log.warning(self.SchedulerSvc.getFullName() + " has no property \"EnableConditions\"")


#
## explicitly set the thread pool size
    def setThreadPoolSize(self,tps) :
        self.SchedulerSvc.ThreadPoolSize = tps

#
## get the currently configured scheduler
    def getScheduler(self):
        """Get the Scheduler"""
        return self.SchedulerSvc

AlgScheduler = AlgScheduler()