diff --git a/Script/CastorScript.py b/Script/CastorScript.py index 221901e6a55b4d8c29738959144980967ca0b3e8..083135d524592b04399f47eb8744c6fff597a5b1 100755 --- a/Script/CastorScript.py +++ b/Script/CastorScript.py @@ -1,6 +1,6 @@ #!/bin/env tdaq_python -__version__='$Revision:$' +__version__='$Revision$' # $Source$ @@ -205,6 +205,16 @@ def main(conf): logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} logger.info('Configuration updated',extra = logInfo) # end if + + + #Check worker states + if not (manager.isAlive() and copy.isAlive() and delete.isAlive()): + logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} + logger.warning('Inconsistent worker states. Manager-->%s Copy-->%s Delete-->%s. Exiting!' \ + % tuple(map(str,map(threading.Thread.isAlive,(manager, copy, delete)))), \ + extra = logInfo) + Exit(None,None) + # end while ##### If exit signal, stop threads in clean way ##### diff --git a/Script/Constants.py b/Script/Constants.py index 48dac93b0d0c3cf690f78801a8277e0151973fdb..4bf5cf763e4914a78a9d0c0d856490dd8c1bac05 100644 --- a/Script/Constants.py +++ b/Script/Constants.py @@ -9,8 +9,3 @@ problematic_ext = '.PROBLEMATIC' needed_parser_symbols = filter(lambda x: '__' not in x, dir(BaseFileNameParser)) -#needed_parser_symbols = ['RunNr', 'LBNr', 'FileNr', -# 'StreamType', 'StreamName', -# 'FileTag', 'AppId', 'ProjectTag'] - - diff --git a/Script/ManagerThread.py b/Script/ManagerThread.py index 595e071f3a58b5dafa7b022f85db03d6de32447b..6f3c81a71a6c50a71713349c4860700295680cd3 100755 --- a/Script/ManagerThread.py +++ b/Script/ManagerThread.py @@ -5,7 +5,7 @@ This module describe the Manager Thread. It is in charge of ls on SFO disk. """ -__version__='$Revision:$' +__version__='$Revision$' # $Source$ import threading, os, glob @@ -67,14 +67,9 @@ class ManagerThread(threading.Thread): # end def __init__() - ## Class run(): inheretided from the threading.Thread run() class - # Defines all the actions the Manager Thread has to take - def run(self): - - ##### Set Logger Level ##### - self.setLogLevel() - ##### Do not update configuration parameters while an 'ls' on disk is ongoing ##### + def initialize(self): + ##### Do not update configuration parameters while an 'ls' on disk is ongoing ##### self.updateFlag = False #Check current year and month for directory streaming @@ -84,7 +79,7 @@ class ManagerThread(threading.Thread): self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Check helper files from previous run',extra = self.logInfo) - + for folder in self.DirList: for oldfile in glob.glob( os.path.join(folder, @@ -106,15 +101,15 @@ class ManagerThread(threading.Thread): CastorPool = CastorInfo[0] CastorDir = CastorInfo[1] StageHost = CastorInfo[2] - + self.CopyFileList.append([DataFile,CastorPool,CastorDir,StageHost]) self.DeleteQueue.put([DataFile,CastorPool,CastorDir,StageHost]) #block until room is available in the queue ????? self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('File:'+ DataFile + 'in DeleteQueue',extra = self.logInfo) - #logging.error('Pack my box with %d dozen %s', 5, 'liquor jugs') # end for # end for + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Size of DeleteQueue:' + str(self.DeleteQueue.qsize()),extra = self.logInfo) @@ -139,15 +134,17 @@ class ManagerThread(threading.Thread): clearCounter = 0 - + + def loop(self): + while not self.exitFlag: ##### Sleep to safe CPU ##### self.event.wait(self.ManagerTimeout) - + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Clear processed files. Size of ClearQueue:' + str(self.ClearQueue.qsize()),extra = self.logInfo) - + while clearCounter < self.nDel: ##### Do not stay in this loop for ever! ##### @@ -158,9 +155,7 @@ class ManagerThread(threading.Thread): ##### Get file to be cleared from ClearQueue ##### clearfile = self.ClearQueue.get(0) - #clearfile = self.ClearQueue.get() - #block until room is available in the queue ????? - + ##### Remove clearfile from CopyFileList #### self.CopyFileList.remove(clearfile) if not glob.glob(clearfile[0] + problematic_ext): @@ -171,7 +166,7 @@ class ManagerThread(threading.Thread): ##### Count the deleted files ##### clearCounter += 1 - + # end while self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Size of ClearQueue:' + str(self.ClearQueue.qsize()),extra = self.logInfo) @@ -200,8 +195,28 @@ class ManagerThread(threading.Thread): #Check current year and month for directory streaming self.updateDates() - - # end while + + + + ## Class run(): inheretided from the threading.Thread run() class + # Defines all the actions the Manager Thread has to take + def run(self): + + ##### Set Logger Level ##### + self.setLogLevel() + + try: + #### Initialize manager #### + self.initialize() + + self.loop() + + except IOError, e: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.critical('Cannot write Castor Info to %s because: "%s". ManagerThread is stopping!' \ + % (e.filename, e.strerror), \ + extra = self.logInfo) + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('ManagerThread exited',extra = self.logInfo) # end def run() @@ -444,6 +459,7 @@ class ManagerThread(threading.Thread): InfoFile = open(filename + tobecopied_ext,'w') InfoFile.writelines([FileName,CastorPool,CastorDir,StageHost]) InfoFile.close() + # end def writeCastorInfo()