Skip to content
Snippets Groups Projects
Commit 90a7a15e authored by Andreas Battaglia's avatar Andreas Battaglia
Browse files

subdirFlag removed, since the copy directory in Castor is fully specified by the keys

parent 90670616
No related branches found
No related tags found
No related merge requests found
......@@ -142,9 +142,6 @@ def main(conf):
copy.copyExit()
delete.deleteExit()
#### If threads are sleeping, wake them up #####
event.set()
##### Wait for the threads to finish and then stop #####
manager.join()
copy.join()
......
......@@ -9,7 +9,7 @@ class Conf:
########## Common parameters ##########
# Log directory
self.LogDir = '/afs/cern.ch/user/b/battagli/public/Python/DAQ/DataFlow/CastorScript/Script'
self.LogDir = '/tmp/battagli/DataTest/Log'
#Lock file
self.lockFile = 'SFO.lock'
......@@ -50,7 +50,7 @@ class Conf:
########## MAIN THREAD ##########
# If Oracle database is down, retry to connect to it every DBTimeout (s)
self.DBTimeout = 2
self.DBTimeout = 2000
# Timeout to refresh the Oracle connection
self.DBReconnectTimeout = 3600
......@@ -72,7 +72,7 @@ class Conf:
self.nDel = 2
# Sleep timeout (s) before updating the list of files to be copied
self.ManagerTimeout = 2
self.ManagerTimeout = 2000
# Delay (s) before to put .PROBLEMATIC file back into the CopyQueue
self.ProblDelay = 3
......@@ -89,9 +89,6 @@ class Conf:
#%(runnumber)s
self.CopyDir = '/castor/cern.ch/user/b/battagli/CopyTest'
# Is a run number subdirectory substructure required in Castor?
self.subdirFlag = True
# Pool in Castor
self.Pool = 'default'
......@@ -102,7 +99,7 @@ class Conf:
self.drivenPool = [
#['Physics','Electron','default','/castor/cern.ch/user/b/battagli/RemoteFarm1','castoratlas'],
#['Calibration','Test','default','/castor/cern.ch/user/b/battagli/RemoteFarm2','castoratlas']
['physics','express','t0atlas','/castor/cern.ch/grid/atlas/DAQ/test/streaming','castoratlas']
#['physics','express','t0atlas','/castor/cern.ch/grid/atlas/DAQ/test/streaming','castoratlas']
]
......@@ -116,10 +113,10 @@ class Conf:
self.maxRetry = 2
# Timeout (s) after which kill and retry an rfcp command in None status
self.NoneTimeout = 300
self.NoneTimeout = 0
# Sleep timeout (s) before checking again the copy process status
self.CopyTimeout = 2
self.CopyTimeout = 2000
########## DELETE THREAD ##########
......@@ -146,7 +143,7 @@ class Conf:
self.lowWaterMark = [1,1,1]
# Sleep timeout (s) before deleting next file in DeleteQueue
self.DeleteTimeout = 1
self.DeleteTimeout = 2000
# Delay before to check if file is migrated to Castor tape (s)
self.MigDelay = 600
......@@ -258,6 +258,6 @@ class Database:
def dbConf(self,conf):
logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.info('Update log level',extra = logInfo)
self.logger.info('Update configuration',extra = logInfo)
self.conf = conf
......@@ -38,7 +38,6 @@ class ManagerThread(threading.Thread):
self.Pool = self.conf.Pool
self.StageHost = self.conf.StageHost
self.drivenPool = self.conf.drivenPool
self.subdirFlag = self.conf.subdirFlag
self.LogDir = self.conf.LogDir
self.LogLevel = self.conf.LogLevel
......@@ -311,7 +310,7 @@ class ManagerThread(threading.Thread):
##### Get directory where to copy files in Castor and Castor environment #####
def getCastorInfo(self,filename):
##### For files from previous run, read CopyDir and CastorEnv from .TOBECOPIED,.COPYING,.PROBLEMATIC, .COPIED files #####
##### For files from previous run, read CopyDir and CastorEnv from .TOBECOPIED,.COPYING,.PROBLEMATIC,.COPIED files #####
infoFile = ''
if len(glob.glob(filename + tobecopied_ext))!= 0: infoFile = filename + tobecopied_ext
elif len(glob.glob(filename + copying_ext))!= 0: infoFile = filename + copying_ext
......@@ -357,13 +356,7 @@ class ManagerThread(threading.Thread):
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Stream driven case',extra = self.logInfo)
CastorDir = element[3]%dirdict
if self.subdirFlag:
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Run subdirectory structure case',extra = self.logInfo)
CastorDir = CastorDir + '/Run_' + filecore_field[2]
# end if
pool = element[2]
pool = element[2]
stagehost = element[4]
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Castor Info: pool = ' + pool + ' , copy directory = ' + CastorDir + ' , stage host = ' + stagehost,extra = self.logInfo)
......@@ -372,18 +365,13 @@ class ManagerThread(threading.Thread):
# end for
# end if
##### Normal case and Subdirectory structure case (run number) #####
##### Normal case #####
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Normal case',extra = self.logInfo)
pool = self.Pool
stagehost = self.StageHost
CastorDir = self.CopyDir%dirdict
if self.subdirFlag:
CastorDir = CastorDir + '/Run_' + filecore_field[2]
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Run subdirectory structure case',extra = self.logInfo)
# end if
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Castor Info: pool = ' + pool + ' , copy directory = ' + CastorDir + ' , stage host = ' + stagehost,extra = self.logInfo)
......@@ -447,7 +435,7 @@ class ManagerThread(threading.Thread):
##### Do not update if an 'ls' on disk is ongoing #####
while(1):
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Do not update DirList,ProblDelay,ProblMaxRetry,CopyDir,Pool,StageHost,drivenPool,subdirFlag if an ls in ongoing',extra = self.logInfo)
self.logger.debug('Do not update DirList,ProblDelay,ProblMaxRetry,CopyDir,Pool,StageHost,drivenPool if an ls in ongoing',extra = self.logInfo)
if self.updateFlag:
self.lockFile = self.conf.lockFile
self.DirList = self.conf.DirList
......@@ -458,9 +446,8 @@ class ManagerThread(threading.Thread):
self.Pool = self.conf.Pool
self.StageHost = self.conf.StageHost
self.drivenPool = self.conf.drivenPool
self.subdirFlag = self.conf.subdirFlag
self.logInfo = {'file':self.logger.findCaller()[0],'line':self.logger.findCaller()[1]}
self.logger.debug('Now update DirList,ProblDelay,ProblMaxRetry,CopyDir,Pool,StageHost,drivenPool,subdirFlag',extra = self.logInfo)
self.logger.debug('Now update DirList,ProblDelay,ProblMaxRetry,CopyDir,Pool,StageHost,drivenPool',extra = self.logInfo)
break
# end if
self.event.wait(self.ManagerTimeout)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment