diff --git a/Script/CopyThread.py b/Script/CopyThread.py index f0ec52d3d732845c464dd030a7eac3f703c87940..044dab1d4f84445f20d7b98b7c744910a81cf406 100755 --- a/Script/CopyThread.py +++ b/Script/CopyThread.py @@ -109,17 +109,46 @@ class CopyThread(threading.Thread): nsmkdir.wait() self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logger.info(nsmkdir.stdout.read(),extra = self.logInfo) + + + #### Fetch information for consistency checks + SFO_filesize = os.stat(copyfile[0])[6] + + dbchecksum = dbfilesize = filehealth = None + + if self.db: + self.dbLock.acquire() + dbchecksum,dbfilesize,filehealth = \ + self.db.FileInfo(copyfile[0]) + self.dbLock.release() + + valid = True + + if filehealth and not filehealth == 'TRUNCATED': + valid &= (dbfilesize==SFO_filesize) + + if valid: - ##### Copy the file in Castor ##### - rfcp = Popen(['rfcp', copyfile[0], copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) + ##### Copy the file in Castor ##### + rfcp = Popen(['rfcp', copyfile[0], copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) - #### Rename .TOBECOPIED into .COPYING file ##### - os.rename(copyfile[0] + tobecopied_ext, - copyfile[0]+ copying_ext) + #### Rename .TOBECOPIED into .COPYING file ##### + os.rename(copyfile[0] + tobecopied_ext, + copyfile[0]+ copying_ext) - ##### Create a list of (copy process, .COPYING file name, n retry = 0, Pool, CopyDir, timestamp, Castor stage hostxem) ##### - self.CopyList.append([rfcp,copyfile[0],0,pool,copyDir,time(),stagehost]) - + ##### Create a list of (copy process, .COPYING file name, n retry = 0, Pool, CopyDir, timestamp, Castor stage hostxem) ##### + self.CopyList.append([rfcp,copyfile[0],0,pool,copyDir,time(),stagehost, SFO_filesize, dbchecksum, dbfilesize, filehealth]) + + else: + self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} + self.logger.warning('File ' + \ + copyfile[0] + ' is invalid for copy', \ + extra = self.logInfo) + os.rename(copyfile[0] + tobecopied_ext, + copyfile[0] + problematic_ext) + self.ClearQueue.put(copyfile) + + # end while ##### Check copy processes status ##### @@ -179,7 +208,7 @@ class CopyThread(threading.Thread): self.logger.debug(element[0].stdout.read(),extra = self.logInfo) ##### Compare size of original (SFO) and copied (Castor) files ##### - SFO_filesize = os.stat(DataFile)[6] + SFO_filesize = element[7] Castor_filesize, checksum = \ castorinfo(CastorFile, castorEnv, \ @@ -188,12 +217,9 @@ class CopyThread(threading.Thread): success = (SFO_filesize == Castor_filesize) ### If DB is available, compare also checksum and db filesize - if self.db: - self.dbLock.acquire() - dbchecksum,dbfilesize,filehealth = \ - self.db.FileInfo(DataFile) - self.dbLock.release() - + if element[10]: + dbchecksum,dbfilesize,filehealth = element[8:] + if not filehealth == 'TRUNCATED': success &= (Castor_filesize == dbfilesize) success &= (dbchecksum.lower() == checksum.lower())