diff --git a/Script/ManagerThread.py b/Script/ManagerThread.py index 59e98c030dfd638834b13468ca087434f07dd1f3..b3ff2eac48118b9eb1afa72f5411e8b947733404 100644 --- a/Script/ManagerThread.py +++ b/Script/ManagerThread.py @@ -30,7 +30,7 @@ class ManagerThread(threading.Thread): self.DeleteQueue = DeleteQueue self.ClearQueue = ClearQueue - ##### Get configuration parameters ##### + # Get configuration parameters self.lockFile = self.conf.lockFile self.DirList = [os.path.normpath(k) for k in self.conf.DirList] self.DataFilePattern = self.conf.DataFilePattern @@ -56,7 +56,7 @@ class ManagerThread(threading.Thread): self.Year = '' self.Month = '' - ##### Set Logger for ManagerThread ##### + # Set Logger for ManagerThread self.logFile = os.path.join(self.LogDir,'ManagerLog.out') self.ManagerLog = logging.FileHandler(self.logFile,'w') self.ManagerLog.setFormatter(formatter) @@ -65,7 +65,6 @@ class ManagerThread(threading.Thread): self.logger = logging.getLogger('ManagerThread') self.logger.addHandler(self.ManagerLog) - ##### Add ERS handler ######### if self.ERSenabled: try: import ers @@ -78,10 +77,10 @@ class ManagerThread(threading.Thread): def initialize(self): - #Check current year and month for directory streaming + # Check current year and month for directory streaming self.updateDates() - ##### Check the validity of all the helper from the previous run ##### + # Check the validity of all the helper files from previous runs self.logger.debug('Check helper files from previous run') for folder in self.DirList: @@ -90,7 +89,7 @@ class ManagerThread(threading.Thread): Constants.copying_ext, Constants.problematic_ext))): if len(file(oldfile).readlines()) < 4: os.remove(oldfile) - ##### Handle .COPIED files from previous run ##### + # Handle .COPIED files from previous run self.logger.info('Handle .COPIED files from previous run') for folder in self.DirList: for oldfile in glob.iglob(os.path.join(folder,'*'+Constants.copied_ext)): @@ -102,20 +101,19 @@ class ManagerThread(threading.Thread): self.CopyFileList.append(DataFile) self.DeleteQueue.put([DataFile,CastorPool,CastorDir,StageHost]) - #block until room is available in the queue ????? self.logger.debug('File: '+ DataFile + ' in DeleteQueue') self.logger.info('Size of DeleteQueue: ' + str(self.DeleteQueue.qsize())) - ##### Get the initial sorted list of files to be copied ##### - ##### FileList is a list of (.TOBECOPIED filename, Pool, Castor Directory,StageHost) ##### + # Get the initial sorted list of files to be copied + # CopyFileList is a list of (.TOBECOPIED filename, Pool, Castor Directory,StageHost) self.logger.info('Get the initial list of files to be copied') - ##### Send those files to the copy queue ##### + # Send those files to the copy queue for element in self.getCopyFiles(): + # order is important here: first CopyFileList then send to CopyThread self.CopyFileList.append(element[0]) self.CopyQueue.put(element) - #block until room is available in the queue ????? self.logger.debug('File: ' + element[0] + ' in CopyQueue') self.logger.info('Size of CopyQueue: ' + str(self.CopyQueue.qsize())) @@ -123,8 +121,6 @@ class ManagerThread(threading.Thread): def loop(self): while not self.exitFlag: - - ##### Sleep to safe CPU ##### self.event.wait(self.ManagerTimeout) self.logger.info('Clear processed files. Size of ClearQueue: ' + str(self.ClearQueue.qsize())) @@ -132,51 +128,44 @@ class ManagerThread(threading.Thread): while clearCounter < self.nDel: - ##### Do not stay in this loop for ever! ##### if self.exitFlag: break if self.ClearQueue.qsize() == 0: break self.logger.debug('clearCounter = ' + str(clearCounter) + '; nDel = ' + str(self.nDel)) - ##### Get file to be cleared from ClearQueue ##### + # Get file to be cleared from ClearQueue clearfile = self.ClearQueue.get(0) - ##### Remove clearfile from CopyFileList #### + # Remove clearfile from CopyFileList self.CopyFileList.remove(clearfile[0]) if not glob.glob(clearfile[0] + Constants.problematic_ext): self.ProblDict.pop(clearfile[0], None) self.logger.debug('File: ' + clearfile[0] + ' removed from CopyFileList') - ##### Count the deleted files ##### + # Count the deleted files clearCounter += 1 self.logger.info('Size of ClearQueue: ' + str(self.ClearQueue.qsize())) - ##### After deletion of nDel files, update the list of files to be copied and the copy queue ##### + # After deletion of nDel files, update the list of files to be copied and the copy queue self.logger.info('Update the list of files to be copied') NewList = self.getCopyFiles() for element in NewList: self.CopyFileList.append(element[0]) self.CopyQueue.put(element) - #block until room is available in the queue ????? self.logger.debug('File: ' + element[0] + ' in CopyQueue') - # end for + self.logger.info('Size of CopyQueue: ' + str(self.CopyQueue.qsize())) - #Check current year and month for directory streaming + # Update current year and month for directory streaming self.updateDates() - ## Class run(): inheretided from the threading.Thread run() class - # Defines all the actions the Manager Thread has to take def run(self): - self.logger.info(thread_id_string()) try: - #### Initialize manager #### self.initialize() - self.loop() except IOError, e: @@ -186,23 +175,16 @@ class ManagerThread(threading.Thread): self.logger.info('ManagerThread exited') - ##### Get the (new) files to be copied ##### + # Get the new files to be copied def getCopyFiles(self): date_file_list = [] - ##### Consider only unlocked filesystems ##### for folder in self.DirList: - + # Consider only unlocked filesystems if glob.glob(os.path.join(folder,self.lockFile)): self.logger.debug('Filesystem "' + folder + '" is locked: do not copy files from there') continue - #### Create a sorted list of all files that have to be copied #### - #### .TOBECOPIED, .COPYING, read .PROBLEMATIC files - #### from previous run - #### new .data files from current run - #### .data files from current run, locked when taken by CopyThread - files = sum([glob.glob(os.path.join(folder,p)) for p in self.DataFilePattern],[]) self.logger.debug('%d files found on folder "%s" matching "%s"', len(files), folder, pprint.pformat(self.DataFilePattern)) @@ -243,56 +225,48 @@ class ManagerThread(threading.Thread): continue - ##### Check if .PROBLEMATIC file is ready to be copied ##### + # Check if .PROBLEMATIC file is ready to be copied if glob.glob(filename + Constants.problematic_ext) \ and not self.checkProbl(filename): continue - ##### Get Castor pool, copy directory and stage host ##### + # Get Castor pool, copy directory and stage host CastorInfo = self.getCastorInfo(filename) Pool = CastorInfo[0] CopyDir = CastorInfo[1] StageHost = CastorInfo[2] - ##### Get the last modified date (mtime) for file ##### - ##### stat = (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) ##### + # Get the last modified date (mtime) for file + # stat = (mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime) timestamp = os.stat(filename)[8] - ##### Get if file is an Express stream ##### + # Get if file is an Express stream express = self.isExpress(filename) - ##### Create a list of the previous lists ##### + # Create a list of the previous lists date_file_list.append((timestamp, filename, express, Pool, CopyDir, StageHost)) else: self.logger.debug('File ' + filename + ' already in the internal copy list') - ##### Sort the files with priority to express streams, then to oldest ones ##### self.logger.debug('Sort new files, with priority to express streams, then to oldest ones') date_file_list.sort(self.Compare) return [([liste[1],liste[3],liste[4],liste[5]]) for liste in date_file_list] - # end def getCopyFiles() - - - ##### Find out if a file is an Express stream ##### + # Find out if a file is an Express stream def isExpress(self,filename): return 'express' in filename.lower() - # end def isExpress() - ##### Compare two (timestamp, filename, is express) ##### + # Compare two (timestamp, filename, is express) def Compare(self,a, b): if (a[2]!= b[2]): return cmp(-a[2],-b[2]) else: return cmp(a[0],b[0]) - # end def Compare() - - ##### Check for .PROBLEMATIC files to be copied ##### - ##### ProblDict is a dictionary of {.PROBLEMATIC file : [timestamp,nretry]} + # ProblDict is a dictionary of {.PROBLEMATIC file : [timestamp,nretry]} def checkProbl(self,ProblFile): self.logger.debug('Problematic file: ' + ProblFile + '. Check if it is time to retry the copying') if ProblFile not in self.ProblDict: @@ -306,15 +280,15 @@ class ManagerThread(threading.Thread): #print (time()-timestamp),self.ProblDelay*math.exp(self.ProblScalingFactor*nretry) #print len(self.ProblDict) - ### Try to copy the file again only if the timeout is expired ### + # Try to copy the file again only if the timeout is expired if (time()-timestamp) > \ self.ProblDelay*math.exp(self.ProblScalingFactor*nretry) : nretry += 1 self.ProblDict[ProblFile] = [time(), nretry] self.logger.debug('Time to retry (' + str(nretry) + ' times up to now)') - ## Inform the user if we cannot copy the file and the retry time - ## limit has been reached + # Inform the user if we cannot copy the file and the retry time + # limit has been reached if self.ProblDelay*math.exp(self.ProblScalingFactor*nretry) > \ self.ProblDelayLimit: self.logger.critical('Problematic file %s reached the delay retry limit of %d seconds. Please have a look!', @@ -323,14 +297,13 @@ class ManagerThread(threading.Thread): else: self.logger.debug('Last retry was too recent: retry later') return False - # end def checkProbl() - - ##### Get directory where to copy files in Castor and Castor environment ##### + # Get directory where to copy files in Castor and Castor environment def getCastorInfo(self,filename): - ##### For files from previous run, read CopyDir and CastorEnv from .TOBECOPIED,.COPYING,.PROBLEMATIC,.COPIED files ##### + # For files from previous run, read CopyDir and CastorEnv from + # .TOBECOPIED,.COPYING,.PROBLEMATIC,.COPIED files infoFile = '' if glob.glob(filename + Constants.tobecopied_ext): infoFile = filename + Constants.tobecopied_ext elif glob.glob(filename + Constants.copying_ext): infoFile = filename + Constants.copying_ext @@ -353,7 +326,6 @@ class ManagerThread(threading.Thread): if not infoFile.endswith(Constants.copied_ext): os.rename(infoFile,filename + Constants.tobecopied_ext) return [pool,CastorDir,stagehost] - # end if try: parsed = self.parser(filename) @@ -379,12 +351,12 @@ class ManagerThread(threading.Thread): 'userdef2':parsed.UserDef2(), 'userdef3':parsed.UserDef3()} - ##### For new files, read CopyDir and CastorEnv from the configuration and write in .TOBECOPIED file - ##### Stream driven case ##### + # For new files, read CopyDir and CastorEnv from the configuration and + # write in .TOBECOPIED file self.logger.debug('New file: ' + filename + '. Read Castor Info from Configuration') + # Stream driven case def match_pool(drivenPool, parsed): - return (drivenPool.projecttag.match(parsed.ProjectTag()) and drivenPool.streamtype.match(parsed.StreamType()) and drivenPool.streamname.match(parsed.StreamName()) and @@ -392,7 +364,6 @@ class ManagerThread(threading.Thread): drivenPool.application.match(parsed.AppId()) and drivenPool.directory.match(parsed.Directory())) - if self.drivenPool: for e in self.drivenPool: if match_pool(e, parsed): @@ -403,10 +374,8 @@ class ManagerThread(threading.Thread): self.logger.debug('Castor Info: pool = ' + pool + ', copy directory = ' + CastorDir + ', stage host = ' + stagehost) self.writeCastorInfo(filename,pool,CastorDir,stagehost) return [pool,CastorDir,stagehost] - # end for - # end if - ##### Normal case ##### + # Normal case self.logger.debug('Normal case') pool = self.Pool stagehost = self.StageHost @@ -417,11 +386,8 @@ class ManagerThread(threading.Thread): self.writeCastorInfo(filename,pool,CastorDir,stagehost) return [pool,CastorDir,stagehost] - # end def getCastorInfo() - - - ##### Write Castor copy directory and environment to .TOBECOPIED file ##### + # Write Castor copy directory and environment to .TOBECOPIED file def writeCastorInfo(self,filename,Pool,CopyDir,stagehost): self.logger.debug('Write Castor Info to ' + filename + Constants.tobecopied_ext) FileName = 'FileName = ' + filename + '\n' @@ -432,31 +398,25 @@ class ManagerThread(threading.Thread): InfoFile.writelines([FileName,CastorPool,CastorDir,StageHost]) InfoFile.close() - # end def writeCastorInfo() - - ##### Exit handler: exit in clean way ##### + # Exit handler: exit in clean way def managerExit(self): self.exitFlag = 1 self.logger.info('Exit signal received') - # end def managerExit() def updateDates(self): self.Year = datetime.date.today().year self.Month = datetime.date.today().month - # end def updateDates(self) def setFileParser(self, parser): self.parser = parser - # end def setFileParser(self, parser) - ##### Function to add / remove ERS handler ##### + # Function to add / remove ERS handler def AddRemoveERS(self, create_ers_handler): if create_ers_handler: self.logger.addHandler( self.manage_ERS_handler ) else: self.logger.removeHandler( self.manage_ERS_handler ) - # end AddRemoveERS()