From bb3211949bb193603b59eea411e30767fb825594 Mon Sep 17 00:00:00 2001 From: Wainer Vandelli <Wainer.Vandelli@cern.ch> Date: Fri, 4 Sep 2009 11:06:32 +0000 Subject: [PATCH] Trap errors in creating service files and exit --- Script/CastorScript.py | 12 ++++++++- Script/Constants.py | 5 ---- Script/ManagerThread.py | 56 ++++++++++++++++++++++++++--------------- 3 files changed, 47 insertions(+), 26 deletions(-) diff --git a/Script/CastorScript.py b/Script/CastorScript.py index 221901e..083135d 100755 --- a/Script/CastorScript.py +++ b/Script/CastorScript.py @@ -1,6 +1,6 @@ #!/bin/env tdaq_python -__version__='$Revision:$' +__version__='$Revision$' # $Source$ @@ -205,6 +205,16 @@ def main(conf): logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} logger.info('Configuration updated',extra = logInfo) # end if + + + #Check worker states + if not (manager.isAlive() and copy.isAlive() and delete.isAlive()): + logInfo = {'file':logger.findCaller()[0],'line':logger.findCaller()[1]} + logger.warning('Inconsistent worker states. Manager-->%s Copy-->%s Delete-->%s. Exiting!' \ + % tuple(map(str,map(threading.Thread.isAlive,(manager, copy, delete)))), \ + extra = logInfo) + Exit(None,None) + # end while ##### If exit signal, stop threads in clean way ##### diff --git a/Script/Constants.py b/Script/Constants.py index 48dac93..4bf5cf7 100644 --- a/Script/Constants.py +++ b/Script/Constants.py @@ -9,8 +9,3 @@ problematic_ext = '.PROBLEMATIC' needed_parser_symbols = filter(lambda x: '__' not in x, dir(BaseFileNameParser)) -#needed_parser_symbols = ['RunNr', 'LBNr', 'FileNr', -# 'StreamType', 'StreamName', -# 'FileTag', 'AppId', 'ProjectTag'] - - diff --git a/Script/ManagerThread.py b/Script/ManagerThread.py index 595e071..6f3c81a 100755 --- a/Script/ManagerThread.py +++ b/Script/ManagerThread.py @@ -5,7 +5,7 @@ This module describe the Manager Thread. It is in charge of ls on SFO disk. """ -__version__='$Revision:$' +__version__='$Revision$' # $Source$ import threading, os, glob @@ -67,14 +67,9 @@ class ManagerThread(threading.Thread): # end def __init__() - ## Class run(): inheretided from the threading.Thread run() class - # Defines all the actions the Manager Thread has to take - def run(self): - - ##### Set Logger Level ##### - self.setLogLevel() - ##### Do not update configuration parameters while an 'ls' on disk is ongoing ##### + def initialize(self): + ##### Do not update configuration parameters while an 'ls' on disk is ongoing ##### self.updateFlag = False #Check current year and month for directory streaming @@ -84,7 +79,7 @@ class ManagerThread(threading.Thread): self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('Check helper files from previous run',extra = self.logInfo) - + for folder in self.DirList: for oldfile in glob.glob( os.path.join(folder, @@ -106,15 +101,15 @@ class ManagerThread(threading.Thread): CastorPool = CastorInfo[0] CastorDir = CastorInfo[1] StageHost = CastorInfo[2] - + self.CopyFileList.append([DataFile,CastorPool,CastorDir,StageHost]) self.DeleteQueue.put([DataFile,CastorPool,CastorDir,StageHost]) #block until room is available in the queue ????? self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.debug('File:'+ DataFile + 'in DeleteQueue',extra = self.logInfo) - #logging.error('Pack my box with %d dozen %s', 5, 'liquor jugs') # end for # end for + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Size of DeleteQueue:' + str(self.DeleteQueue.qsize()),extra = self.logInfo) @@ -139,15 +134,17 @@ class ManagerThread(threading.Thread): clearCounter = 0 - + + def loop(self): + while not self.exitFlag: ##### Sleep to safe CPU ##### self.event.wait(self.ManagerTimeout) - + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Clear processed files. Size of ClearQueue:' + str(self.ClearQueue.qsize()),extra = self.logInfo) - + while clearCounter < self.nDel: ##### Do not stay in this loop for ever! ##### @@ -158,9 +155,7 @@ class ManagerThread(threading.Thread): ##### Get file to be cleared from ClearQueue ##### clearfile = self.ClearQueue.get(0) - #clearfile = self.ClearQueue.get() - #block until room is available in the queue ????? - + ##### Remove clearfile from CopyFileList #### self.CopyFileList.remove(clearfile) if not glob.glob(clearfile[0] + problematic_ext): @@ -171,7 +166,7 @@ class ManagerThread(threading.Thread): ##### Count the deleted files ##### clearCounter += 1 - + # end while self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('Size of ClearQueue:' + str(self.ClearQueue.qsize()),extra = self.logInfo) @@ -200,8 +195,28 @@ class ManagerThread(threading.Thread): #Check current year and month for directory streaming self.updateDates() - - # end while + + + + ## Class run(): inheretided from the threading.Thread run() class + # Defines all the actions the Manager Thread has to take + def run(self): + + ##### Set Logger Level ##### + self.setLogLevel() + + try: + #### Initialize manager #### + self.initialize() + + self.loop() + + except IOError, e: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.critical('Cannot write Castor Info to %s because: "%s". ManagerThread is stopping!' \ + % (e.filename, e.strerror), \ + extra = self.logInfo) + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info('ManagerThread exited',extra = self.logInfo) # end def run() @@ -444,6 +459,7 @@ class ManagerThread(threading.Thread): InfoFile = open(filename + tobecopied_ext,'w') InfoFile.writelines([FileName,CastorPool,CastorDir,StageHost]) InfoFile.close() + # end def writeCastorInfo() -- GitLab