Skip to content
Snippets Groups Projects
Commit b33c8838 authored by Wainer Vandelli's avatar Wainer Vandelli
Browse files

No commit message

No commit message
parent a728fb9a
No related branches found
No related tags found
No related merge requests found
...@@ -109,17 +109,46 @@ class CopyThread(threading.Thread): ...@@ -109,17 +109,46 @@ class CopyThread(threading.Thread):
nsmkdir.wait() nsmkdir.wait()
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]} self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.info(nsmkdir.stdout.read(),extra = self.logInfo) self.logger.info(nsmkdir.stdout.read(),extra = self.logInfo)
#### Fetch information for consistency checks
SFO_filesize = os.stat(copyfile[0])[6]
dbchecksum = dbfilesize = filehealth = None
if self.db:
self.dbLock.acquire()
dbchecksum,dbfilesize,filehealth = \
self.db.FileInfo(copyfile[0])
self.dbLock.release()
valid = True
if filehealth and not filehealth == 'TRUNCATED':
valid &= (dbfilesize==SFO_filesize)
if valid:
##### Copy the file in Castor ##### ##### Copy the file in Castor #####
rfcp = Popen(['rfcp', copyfile[0], copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv) rfcp = Popen(['rfcp', copyfile[0], copyDir], stdout = PIPE, stderr = STDOUT, env = castorEnv)
#### Rename .TOBECOPIED into .COPYING file ##### #### Rename .TOBECOPIED into .COPYING file #####
os.rename(copyfile[0] + tobecopied_ext, os.rename(copyfile[0] + tobecopied_ext,
copyfile[0]+ copying_ext) copyfile[0]+ copying_ext)
##### Create a list of (copy process, .COPYING file name, n retry = 0, Pool, CopyDir, timestamp, Castor stage hostxem) ##### ##### Create a list of (copy process, .COPYING file name, n retry = 0, Pool, CopyDir, timestamp, Castor stage hostxem) #####
self.CopyList.append([rfcp,copyfile[0],0,pool,copyDir,time(),stagehost]) self.CopyList.append([rfcp,copyfile[0],0,pool,copyDir,time(),stagehost, SFO_filesize, dbchecksum, dbfilesize, filehealth])
else:
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.warning('File ' + \
copyfile[0] + ' is invalid for copy', \
extra = self.logInfo)
os.rename(copyfile[0] + tobecopied_ext,
copyfile[0] + problematic_ext)
self.ClearQueue.put(copyfile)
# end while # end while
##### Check copy processes status ##### ##### Check copy processes status #####
...@@ -179,7 +208,7 @@ class CopyThread(threading.Thread): ...@@ -179,7 +208,7 @@ class CopyThread(threading.Thread):
self.logger.debug(element[0].stdout.read(),extra = self.logInfo) self.logger.debug(element[0].stdout.read(),extra = self.logInfo)
##### Compare size of original (SFO) and copied (Castor) files ##### ##### Compare size of original (SFO) and copied (Castor) files #####
SFO_filesize = os.stat(DataFile)[6] SFO_filesize = element[7]
Castor_filesize, checksum = \ Castor_filesize, checksum = \
castorinfo(CastorFile, castorEnv, \ castorinfo(CastorFile, castorEnv, \
...@@ -188,12 +217,9 @@ class CopyThread(threading.Thread): ...@@ -188,12 +217,9 @@ class CopyThread(threading.Thread):
success = (SFO_filesize == Castor_filesize) success = (SFO_filesize == Castor_filesize)
### If DB is available, compare also checksum and db filesize ### If DB is available, compare also checksum and db filesize
if self.db: if element[10]:
self.dbLock.acquire() dbchecksum,dbfilesize,filehealth = element[8:]
dbchecksum,dbfilesize,filehealth = \
self.db.FileInfo(DataFile)
self.dbLock.release()
if not filehealth == 'TRUNCATED': if not filehealth == 'TRUNCATED':
success &= (Castor_filesize == dbfilesize) success &= (Castor_filesize == dbfilesize)
success &= (dbchecksum.lower() == checksum.lower()) success &= (dbchecksum.lower() == checksum.lower())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment