Skip to content
Snippets Groups Projects
Commit ec276a01 authored by Fabrice Le Goff's avatar Fabrice Le Goff
Browse files

add option to disable delete thread

parent 6ab93f68
No related branches found
No related tags found
No related merge requests found
Pipeline #592427 passed
......@@ -64,7 +64,10 @@ DdmMonitoringQueue = Queue(config.DdmMonitoringQueueMaxSize) # created even if
manager = ManagerThread(config, event, CopyQueue, DeleteQueue, ClearQueue)
copy = CopyThread(config, event, dbLock, CopyQueue, DeleteQueue, ClearQueue,
DdmMonitoringQueue)
delete = DeleteThread(config, event, dbLock, DeleteQueue, ClearQueue)
delete = None
if config.DeleteEnabled:
delete = DeleteThread(config, event, dbLock, DeleteQueue, ClearQueue)
ddm_publisher = None
if config.DdmMonitoringEnabled:
......@@ -207,7 +210,8 @@ def main(conf):
##### Start the threads #####
manager.start()
copy.start()
delete.start()
if delete is not None:
delete.start()
if conf.ERSenabled:
check.start()
logger.info('Manager, Copy, Delete and Check Threads started')
......@@ -248,7 +252,7 @@ def main(conf):
#Check worker states
if not (manager.isAlive() and copy.isAlive() and delete.isAlive()):
if not (manager.isAlive() and copy.isAlive() and (delete is None or delete.isAlive())):
logger.warning('Inconsistent worker states. Manager-->%s Copy-->%s '\
'Delete-->%s. Exiting!', manager.isAlive(),
copy.isAlive(), delete.isAlive())
......@@ -260,7 +264,8 @@ def main(conf):
logger.info('Exit signal (12) received: exit in clean way')
manager.managerExit()
copy.copyExit()
delete.deleteExit()
if delete:
delete.deleteExit()
if conf.ERSenabled:
check.checkExit()
......@@ -271,7 +276,8 @@ def main(conf):
##### Wait for the threads to finish and then stop #####
manager.join()
copy.join()
delete.join()
if delete:
delete.join()
if conf.ERSenabled:
check.join()
logger.info('Manager,Copy, Delete and Check Threads joined the main thread')
......@@ -290,7 +296,7 @@ def Exit(signal_number, stackframe):
def checkDB(db, logger, dblogger, parser, conf):
dbFlag = copy.getDBFlag() and delete.getDBFlag()
dbFlag = copy.getDBFlag() and (delete is None or delete.getDBFlag())
if not dbFlag:
db = None
if not db and conf.connection:
......@@ -305,7 +311,8 @@ def checkDB(db, logger, dblogger, parser, conf):
# end try, except
copy.setDB(db)
delete.setDB(db)
if delete:
delete.setDB(db)
# end if not
return db
......
......@@ -100,14 +100,16 @@ class CheckThread(threading.Thread):
self.check_logger.addHandler(self.check_ERS_handler)
self.copy.AddRemoveERS(True)
self.manager.AddRemoveERS(True)
self.delete.AddRemoveERS(True)
if self.delete:
self.delete.AddRemoveERS(True)
else:
##### Remove ERS from all loggers #####
self.logger.removeHandler(self.ERS_handler)
self.check_logger.removeHandler(self.check_ERS_handler)
self.copy.AddRemoveERS(False)
self.manager.AddRemoveERS(False)
self.delete.AddRemoveERS(False)
if self.delete:
self.delete.AddRemoveERS(False)
# end if,else
# end def change_state_ers()
......
......@@ -192,6 +192,8 @@ bwDevice: None
#[Delete]
########## DELETE THREAD ##########
# Disable to completely disable the feature (thread not started)
DeleteEnabled: True
# Is simultaneous writing and deleting allowed?
ignoreLock: True
......
......@@ -303,6 +303,10 @@ class Conf:
########## DELETE THREAD ##########
try:
self.DeleteEnabled = cfg.DeleteEnabled
except AttributeError:
self.DeleteEnabled = True
# Is simultaneous writing and deleting allowed?
self.ignoreLock = cfg.ignoreLock
......
......@@ -378,7 +378,8 @@ class CopyThread(threading.Thread):
)
##### Copy successfull: put .COPIED file in DeleteQueue for deletion #####
self.DeleteQueue.put([DataFile,pool,copyDir,stagehost])
if self.conf.DeleteEnabled:
self.DeleteQueue.put([DataFile,pool,copyDir,stagehost])
# block until room is available in the queue ?????
##### Remove the copy process from CopyList #####
......
......@@ -99,7 +99,8 @@ class ManagerThread(threading.Thread):
StageHost = CastorInfo[2]
self.CopyFileList.append(DataFile)
self.DeleteQueue.put([DataFile,CastorPool,CastorDir,StageHost])
if self.conf.DeleteEnabled:
self.DeleteQueue.put([DataFile,CastorPool,CastorDir,StageHost])
self.logger.debug('File: %s in DeleteQueue', DataFile)
self.logger.info('Size of DeleteQueue: %d', self.DeleteQueue.qsize())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment