diff --git a/Script/CastorScript.py b/Script/CastorScript.py index 71567c6d11eae9aebf16c1b3e10e0e8ca054e613..c27cc700f442209ead43a3b5b20f05255f7baa4b 100644 --- a/Script/CastorScript.py +++ b/Script/CastorScript.py @@ -22,9 +22,8 @@ import cs.Tools.Libraries.Database as Database def main_exit(signal_number, stackframe): del signal_number, stackframe - global exitflag - exitflag = True - global_event.set() + logger.debug('exit signal received') + exit_event.set() def checkDB(db_, logger_, dblogger_, parser_, conf_): @@ -55,8 +54,6 @@ def checkDB(db_, logger_, dblogger_, parser_, conf_): return db_ -exitflag = 0 - if len(sys.argv) >= 2: try: config = ConfigParser.ConfigHolder(sys.argv[1]) @@ -76,7 +73,7 @@ if not config.CopyEnabled and not config.DeleteEnabled: os.umask(0o002) # Main objects instantiation -global_event = threading.Event() +exit_event = threading.Event() db_lock = threading.Lock() copyqueue = Queue() @@ -87,28 +84,28 @@ ddmqueue = None if config.DdmMonitoringEnabled: ddmqueue = Queue(config.DdmMonitoringQueueMaxSize) -manager_thread = ManagerThread(config, global_event, db_lock, copyqueue, +manager_thread = ManagerThread(config, exit_event, db_lock, copyqueue, deletequeue, clearqueue) copy_thread = None if config.CopyEnabled: - copy_thread = CopyThread(config, global_event, db_lock, copyqueue, + copy_thread = CopyThread(config, exit_event, db_lock, copyqueue, deletequeue, clearqueue, ddmqueue) delete_thread = None if config.DeleteEnabled: - delete_thread = DeleteThread(config, global_event, db_lock, deletequeue, + delete_thread = DeleteThread(config, exit_event, db_lock, deletequeue, clearqueue) if config.DdmMonitoringEnabled: - ddm_publisher = DdmMonitoringThread(config, global_event, ddmqueue) + ddm_publisher = DdmMonitoringThread(config, exit_event, ddmqueue) if config.ISEnabled: - info_service_thread = InfoServiceThread(config, global_event, + info_service_thread = InfoServiceThread(config, exit_event, copy_thread, delete_thread, manager_thread) if config.ERSEnabled: - ers_thread = ERSThread(config, global_event) + ers_thread = ERSThread(config, exit_event) # General configuration LogConfig.enable_root_logger() @@ -196,7 +193,7 @@ if config.DeleteEnabled: if config.DdmMonitoringEnabled: ddm_publisher.start() -while not exitflag: +while not exit_event.is_set(): logger.debug('checking DB connection') db = checkDB(db, logger, dblogger, filename_parser, config) @@ -205,7 +202,6 @@ while not exitflag: if db and (time() - db_last_connection_time) > config.DBReconnectTimeout: logger.info('refreshing database connection') with db_lock: - db_lock.acquire() try: db.Reconnect() except Exception as ex: @@ -238,20 +234,7 @@ while not exitflag: mgr_status, copy_status, del_status) main_exit(None, None) - global_event.wait(config.MainThreadEventTimeout) - -logger.debug('exit signal received') -manager_thread.exit() -if config.CopyEnabled: - copy_thread.exit() -if config.DeleteEnabled: - delete_thread.exit() -if config.ERSEnabled: - ers_thread.exit() -if config.DdmMonitoringEnabled: - ddm_publisher.exit() -if config.ISEnabled: - info_service_thread.exit() + exit_event.wait(config.MainThreadEventTimeout) manager_thread.join() logger.info('manager thread stopped') diff --git a/Script/cs/StorageBackends/ctastorage.py b/Script/cs/StorageBackends/ctastorage.py index 81e4577af02dedd288c7b89857d00640b81126e8..0a7f6780d3fd6be1cfe59049caf59a1822b81b7c 100644 --- a/Script/cs/StorageBackends/ctastorage.py +++ b/Script/cs/StorageBackends/ctastorage.py @@ -2,27 +2,22 @@ import subprocess import threading from time import time import sys +from cs.Tools.utils import thread_id_string class DirCache(threading.Thread): - def __init__(self, validity_s, logger=None): + def __init__(self, validity_s, exitevent, logger=None): threading.Thread.__init__(self, name="CTADirCacheThread") self.cache = {} self.lock = threading.Lock() self.validity_s = validity_s - self.event = threading.Event() - self.exit_flag = False + self.exitevent = exitevent self.granularity_s = 10 self.logger = logger def run(self): - if self.logger: self.logger.debug('thread started') - while True: - self.event.wait(self.granularity_s) - if self.exit_flag: - self.logger.debug('exiting') - break - + if self.logger: self.logger.info(thread_id_string()) + while not self.exitevent.is_set(): now = time() to_delete = [] with self.lock: @@ -34,9 +29,10 @@ class DirCache(threading.Thread): if self.logger: self.logger.debug('deleting cache entry: %s', dirname) del self.cache[dirname] - def exit(self): - self.exit_flag = True - self.event.set() + self.exitevent.wait(self.granularity_s) + + if self.logger: self.logger.info('ctastorage dircache stopping') + def listdir(self, directory, stager): del stager @@ -53,8 +49,8 @@ class DirCache(threading.Thread): self.cache[dirname] = (time(), dir_contents) -def start_cache(validity_s, logger): - cache = DirCache(validity_s, logger) +def start_cache(validity_s, logger, exitevent): + cache = DirCache(validity_s, exitevent, logger) cache.start() return cache diff --git a/Script/cs/Threads/CopyThread.py b/Script/cs/Threads/CopyThread.py index 83922a37be67613188021e15788567a7dce68334..b77a3cd80796c5c1d96ad8e89c26b95eef83e3d1 100644 --- a/Script/cs/Threads/CopyThread.py +++ b/Script/cs/Threads/CopyThread.py @@ -25,13 +25,13 @@ class TransferInfo: class CopyThread(threading.Thread): - def __init__(self, conf, event, dblock, copyqueue, deletequeue, clearqueue, + def __init__(self, conf, exitevent, dblock, copyqueue, deletequeue, clearqueue, ddmmonitoringqueue): threading.Thread.__init__(self,name="CopyThread") self.conf = conf - self.event = event + self.exitevent = exitevent self.dblock = dblock self.copyqueue = copyqueue self.deletequeue = deletequeue @@ -43,7 +43,6 @@ class CopyThread(threading.Thread): self.problematic_transfer_failures = 0 self.files_copied = 0 - self.exitflag = False self.transfers = [] self.nbtransfers = 0 # no lock needed for multithread access in python self.db = None @@ -66,15 +65,15 @@ class CopyThread(threading.Thread): def run(self): self.logger.info(thread_id_string()) - while True: - if not self.exitflag: - self.startTransfers() + while not self.exitevent.is_set(): + self.startTransfers() self.checkTransfers() - self.event.wait(self.conf.CopyThreadEventTimeout) - if self.exitflag and self.nbtransfers == 0: break + self.exitevent.wait(self.conf.CopyThreadEventTimeout) - self.logger.info('CopyThread exited') + self.logger.info('CopyThread stopping') + while self.nbtransfers != 0: + self.checkTransfers() def getCurrentBW(self): @@ -346,7 +345,7 @@ class CopyThread(threading.Thread): short_reason, # copy status ) - if self.exitflag: + if self.exitevent.is_set(): toremove.append(transfer) return @@ -385,11 +384,6 @@ class CopyThread(threading.Thread): toremove.append(transfer) - def exit(self): - self.exitflag = 1 - self.logger.debug('exit signal received') - - def setDB(self, db): self.logger.debug('setting database') self.db = db diff --git a/Script/cs/Threads/DdmMonitoringThread.py b/Script/cs/Threads/DdmMonitoringThread.py index e76d767c3fc44c6fb1d9bede8a0bd5ba3322c847..a25ff7a505b4ff30273b78b5c52db864331c9eb0 100644 --- a/Script/cs/Threads/DdmMonitoringThread.py +++ b/Script/cs/Threads/DdmMonitoringThread.py @@ -10,14 +10,13 @@ from cs.Tools.LogConfig import enable_file_logging class DdmMonitoringThread(threading.Thread): - def __init__(self, conf, event, ddm_monitoring_queue): + def __init__(self, conf, exitevent, ddm_monitoring_queue): threading.Thread.__init__(self, name="DdmMonitoringThread") self.conf = conf - self.event = event + self.exitevent = exitevent self.DdmMonitoringQueue = ddm_monitoring_queue - self.exitFlag = False self.requestErrorSince = None self.requestErrorCriticalLogged = False @@ -37,64 +36,50 @@ class DdmMonitoringThread(threading.Thread): self.logger.info('DDMMonitoring publication endpoint: %s', self.conf.DdmMonitoringEndpoint) while True: - if self.exitFlag and self.DdmMonitoringQueue.empty(): - break - - while not self.DdmMonitoringQueue.empty(): - try: - data = self.DdmMonitoringQueue.get_nowait() - self.logger.debug('processing entry for %s', data['filename']) - except queue.Empty: - break - - try: - req = urllib.request.Request(self.conf.DdmMonitoringEndpoint) - req.add_header('Content-Type', 'application/json') - urllib.request.urlopen(req, json.dumps(data).encode('utf-8'), timeout=self.conf.DdmMonitoringTimeout) - self.logger.debug('published entry for %s: %s', - data['filename'], json.dumps(data)) - - if self.requestErrorSince is not None: - self.logger.warning('request error stopped: duration= %s', - time.time() - self.requestErrorSince) - self.requestErrorSince = None - self.requestErrorCriticalLogged = False - except Exception as exc: - self.logger.warning('could not publish data for %s: %s', - data['filename'], str(exc)) - if self.exitFlag: - self.logger.warning( - 'exit requested: will not retry to publish data for %s', - data['filename']) - else: - self.logger.debug('will retry to publish data for %s', - data['filename']) - try: - self.DdmMonitoringQueue.put(data, block=True, - timeout=self.conf.DdmMonitoringTimeout) - except queue.Full: - self.logger.warning('could not put entry back in the queue for %s: ' - 'timeout, queue is full', data['filename']) - - if self.requestErrorSince is None: - self.requestErrorSince = time.time() - else: - if not self.requestErrorCriticalLogged: - if (time.time() - self.requestErrorSince) > 300: - self.logger.critical('request error for more than 300 seconds') - self.requestErrorCriticalLogged = True - - # An error has occurred and the non-published entry has been - # re-added to the queue: if we don't break here we will spin - # in this loop with no pause until the issue is fixed. It's - # better to take a little break and retry later. - break - - self.event.wait(self.conf.DdmMonitoringTimeout) - - self.logger.info('DdmMonitoringThread exited') - - - def exit(self): - self.logger.debug('exit signal received') - self.exitFlag = True + if self.exitevent.is_set(): break + + try: + data = self.DdmMonitoringQueue.get_nowait() + self.logger.debug('processing entry for %s', data['filename']) + except queue.Empty: + self.exitevent.wait(self.conf.DdmMonitoringTimeout) + continue + + try: + req = urllib.request.Request(self.conf.DdmMonitoringEndpoint) + req.add_header('Content-Type', 'application/json') + urllib.request.urlopen(req, json.dumps(data).encode('utf-8'), timeout=self.conf.DdmMonitoringTimeout) + self.logger.debug('published entry for %s: %s', + data['filename'], json.dumps(data)) + + if self.requestErrorSince is not None: + self.logger.warning('request error stopped: duration= %s', + time.time() - self.requestErrorSince) + self.requestErrorSince = None + self.requestErrorCriticalLogged = False + except Exception as exc: + self.logger.warning('could not publish data for %s: %s', + data['filename'], str(exc)) + if self.exitevent.is_set(): + self.logger.warning( + 'exit requested: will not retry to publish data for %s', + data['filename']) + else: + self.logger.debug('will retry to publish data for %s', + data['filename']) + try: + self.DdmMonitoringQueue.put(data, block=True, + timeout=self.conf.DdmMonitoringTimeout) + except queue.Full: + self.logger.warning('could not put entry back in the queue for %s: ' + 'timeout, queue is full', data['filename']) + + if self.requestErrorSince is None: + self.requestErrorSince = time.time() + else: + if not self.requestErrorCriticalLogged: + if (time.time() - self.requestErrorSince) > 300: + self.logger.critical('request error for more than 300 seconds') + self.requestErrorCriticalLogged = True + + self.logger.info('DdmMonitoringThread stopping') diff --git a/Script/cs/Threads/DeleteThread.py b/Script/cs/Threads/DeleteThread.py index 9d070acc69b136be467cebe9fb170829ba1b2887..fd99cef2d54b5e0565c4f25836993c54b659ecb4 100644 --- a/Script/cs/Threads/DeleteThread.py +++ b/Script/cs/Threads/DeleteThread.py @@ -14,18 +14,17 @@ from queue import Empty as QueueEmpty class DeleteThread(threading.Thread): - def __init__(self, conf, event, dblock, deletequeue, clearqueue): + def __init__(self, conf, exitevent, dblock, deletequeue, clearqueue): threading.Thread.__init__(self, name="DeleteThread") self.conf = conf - self.event = event + self.exitevent = exitevent self.dblock = dblock self.deletequeue = deletequeue self.clearqueue = clearqueue self.files_deleted = 0 self.migration_waitroom = {} - self.exitflag = 0 self.db = None self.dbflag = True self.deletenow = False @@ -40,18 +39,19 @@ class DeleteThread(threading.Thread): pymod = importlib.import_module( f"cs.Tools.MigrationCheckers.{self.conf.MigrationModuleName}") self.migration_module = pymod.MigrationChecker( - self.conf.MigrationModuleConf, self.logger) + self.conf.MigrationModuleConf, self.logger, exitevent) def run(self): self.logger.info(thread_id_string()) - while True: + while not self.exitevent.is_set(): self.loopIteration() - self.event.wait(self.conf.DeleteThreadEventTimeout) - if self.exitflag: break + self.exitevent.wait(self.conf.DeleteThreadEventTimeout) - self.logger.info('DeleteThread exited') + self.logger.info('DeleteThread stopping') + if self.migration_module: + self.migration_module.stop() def loopIteration(self): @@ -156,13 +156,6 @@ class DeleteThread(threading.Thread): return False - def exit(self): - if self.migration_module: - self.migration_module.stop() - self.exitflag = 1 - self.logger.debug('exit signal received') - - def setDB(self,db): self.logger.debug('setting database') self.db = db diff --git a/Script/cs/Threads/ERSThread.py b/Script/cs/Threads/ERSThread.py index 8696f1324b8ce3dba56fc8ec24c7420d93a3145d..0d7d4cb1438673db78cd87c80994df83fb7a3602 100644 --- a/Script/cs/Threads/ERSThread.py +++ b/Script/cs/Threads/ERSThread.py @@ -13,14 +13,13 @@ from cs.Tools.LogConfig import enable_file_logging, set_log_level, make_tdaq_app class ERSThread(threading.Thread): - def __init__(self, conf, event): + def __init__(self, conf, exitevent): threading.Thread.__init__(self, name="ERSThread") - self.event = event + self.exitevent = exitevent self.ers_timeout = conf.ERSTimeout self.log_level = conf.ERSLogLevel - self.exit_flag = False self.partition_connected = False self.ipc_partition = IPCPartition(conf.ERSPartition) self.ers_handler = None @@ -47,7 +46,7 @@ class ERSThread(threading.Thread): logging.getLogger().addHandler(self.ers_handler) self.partition_connected = True - while not self.exit_flag: + while not self.exitevent.is_set(): if self.ipc_partition.isValid() and not self.partition_connected: self.partition_connected = True self.logger.warning('partition %s is now valid: logging to ERS enabled', self.ipc_partition.name()) @@ -67,12 +66,9 @@ class ERSThread(threading.Thread): logging.getLogger().removeHandler(self.ers_handler) - self.event.wait(self.ers_timeout) + self.exitevent.wait(self.ers_timeout) - - def exit(self): - self.exit_flag = True - self.logger.info('Exit signal received') + self.logger.info('ERSThread stopping') def set_environment(config): diff --git a/Script/cs/Threads/InfoServiceThread.py b/Script/cs/Threads/InfoServiceThread.py index 8f02800fdca9c4442ec808c7896634b23455d8fb..d08c3e74a5921b60229d418a0c7d4fccfac39ea9 100644 --- a/Script/cs/Threads/InfoServiceThread.py +++ b/Script/cs/Threads/InfoServiceThread.py @@ -17,16 +17,15 @@ from cs.Tools.LogConfig import enable_file_logging, make_tdaq_app_name, make_fs_ class InfoServiceThread(threading.Thread): - def __init__(self, conf, event, copy_thread, delete_thread, manager_thread): + def __init__(self, conf, exitevent, copy_thread, delete_thread, manager_thread): threading.Thread.__init__(self, name="InfoServiceThread") - self.event = event + self.exitevent = exitevent self.conf = conf self.time_created = datetime.utcnow() self._copy_thread = copy_thread self._delete_thread = delete_thread self._manager_thread = manager_thread - self.exit_flag = False # get partition to publish to self.ipc_partition = ispy.IPCPartition(conf.ISPartition) #pylint: disable=no-member @@ -54,7 +53,7 @@ class InfoServiceThread(threading.Thread): # partition_valid_at + wait_after_partition_valid partition_valid_at = None - while not self.exit_flag: + while not self.exitevent.is_set(): if not self.partition_connected and self.ipc_partition.isValid(): self.partition_connected = True self.logger.warning('partition %s is now valid: IS publications enabled' @@ -77,17 +76,13 @@ class InfoServiceThread(threading.Thread): if partition_valid_at is None: self.send_update() - self.event.wait(self.conf.ISPublicationPeriod) + self.exitevent.wait(self.conf.ISPublicationPeriod) # to reset the uptime just before normal exit + self.logger.info('InfoServiceThread stopping') self.update_cs_info() - self.logger.info('InfoServiceThread exited') - def exit(self): - self.exit_flag = True - self.logger.info('Exit signal received') - def send_update(self): self.update_cs_info() if self.conf.ISFileSystemInfoEnabled: @@ -165,7 +160,7 @@ class InfoServiceThread(threading.Thread): def get_process_uptime_in_seconds(self): - if self.exit_flag: + if self.exitevent.is_set(): # to reset the uptime just before normal exit return 0 else: diff --git a/Script/cs/Threads/ManagerThread.py b/Script/cs/Threads/ManagerThread.py index 20067a5887518634be1a187d7d3d0b05bc7379f6..f5b0d096b723627f9b21e137df8152192808c179 100644 --- a/Script/cs/Threads/ManagerThread.py +++ b/Script/cs/Threads/ManagerThread.py @@ -17,13 +17,13 @@ import cs.Threads.FileMetaData as FileMetaData class ManagerThread(threading.Thread): - def __init__(self, conf, event, dblock, copyqueue, deletequeue, clearqueue): + def __init__(self, conf, exitevent, dblock, copyqueue, deletequeue, clearqueue): threading.Thread.__init__(self, name="ManagerThread") self.parser = None self.conf = conf - self.event = event + self.exitevent = exitevent self.copyqueue = copyqueue self.deletequeue = deletequeue self.clearqueue = clearqueue @@ -35,7 +35,6 @@ class ManagerThread(threading.Thread): self.managedfiles = [] # all files currently handled by CastorScript self.problematic = {} # files declared problematic with extra info to manage retries self.problematic_lock = threading.Lock() - self.exitflag = False self.year = '' self.month = '' @@ -60,15 +59,14 @@ class ManagerThread(threading.Thread): try: self.initialize() - while True: + while not self.exitevent.is_set(): self.loopIteration() - self.event.wait(self.conf.ManagerThreadEventTimeout) - if self.exitflag: break + self.exitevent.wait(self.conf.ManagerThreadEventTimeout) except IOError as e: self.logger.critical('I/O error on file %s: %s; ' 'stopping ManagerThread', e.filename, e.strerror) - self.logger.info('ManagerThread exited') + self.logger.info('ManagerThread stopping') def initialize(self): @@ -118,7 +116,7 @@ class ManagerThread(threading.Thread): break self.logger.debug('got file %s from queue', fmd.file_name) - if self.exitflag: break + if self.exitevent.is_set(): break self.managedfiles.remove(fmd.file_name) if not os.path.isfile(fmd.file_name + Constants.problematic_ext): @@ -420,11 +418,6 @@ class ManagerThread(threading.Thread): return fmd - def exit(self): - self.exitflag = 1 - self.logger.debug('exit signal received') - - def updateYearMonthCache(self): self.year = datetime.date.today().year self.month = datetime.date.today().month diff --git a/Script/cs/Tools/MigrationCheckers/DDMMigrated.py b/Script/cs/Tools/MigrationCheckers/DDMMigrated.py index d2ae70d6a43a26a9dc64b0491377ac46ff1252f3..3d35056de29fe09f9293f0d1e58f0e3c7641765d 100644 --- a/Script/cs/Tools/MigrationCheckers/DDMMigrated.py +++ b/Script/cs/Tools/MigrationCheckers/DDMMigrated.py @@ -6,7 +6,7 @@ from operator import itemgetter class MigrationChecker: - def __init__(self, conf, logger): + def __init__(self, conf, logger, exitevent): """ conf is a dictionary that contains or may contain the following keys: 'eosinstance': EOS instance for storage backend @@ -29,12 +29,12 @@ class MigrationChecker: cache_validity_s = conf.get('cache_validity_s', None) self.backend_cache = None if cache_validity_s is not None: - self.backend_cache = self.backend.start_cache(cache_validity_s, self.logger) + self.backend_cache = self.backend.start_cache(cache_validity_s, + self.logger, exitevent) def stop(self): if self.backend_cache is not None: - self.backend_cache.exit() self.backend_cache.join() self.backend_cache = None @@ -183,5 +183,6 @@ if __name__ == '__main__': handler = logging.StreamHandler(sys.stdout) logr.addHandler(handler) - checker = MigrationChecker(config, logr) + # cache_validity_s is not set so the exitevent is not used: None + checker = MigrationChecker(config, logr, None) print(checker.isMigrated(args.file, verbose=True)) diff --git a/UnitTests/DDMMigrated_Test.py b/UnitTests/DDMMigrated_Test.py index cd32ac1b8330953db68bc67672d0621b57ec6793..fa02bda85399b3fafe542bf4e14d7fd2b42f5205 100644 --- a/UnitTests/DDMMigrated_Test.py +++ b/UnitTests/DDMMigrated_Test.py @@ -28,7 +28,7 @@ class TestDDMMigrated(unittest.TestCase): cls.logger.setLevel(logging.ERROR) cls.logger.addHandler(logging.StreamHandler(sys.stdout)) - cls.mchecker = DDMMigrated.MigrationChecker(cls.conf, cls.logger) + cls.mchecker = DDMMigrated.MigrationChecker(cls.conf, cls.logger, None) @classmethod diff --git a/UnitTests/ERSThread_test.py b/UnitTests/ERSThread_test.py index 92f1deb706512dc2d1772e3a5319a6bab6553135..39bb01d9b668bee2de66c8460a03c078482e9f92 100644 --- a/UnitTests/ERSThread_test.py +++ b/UnitTests/ERSThread_test.py @@ -60,10 +60,10 @@ class TestERSThread(unittest.TestCase): lines = ers_log_file.readlines() self.assertGreaterEqual(len(lines), 1) self.assertTrue(ext in lines[0]) - ers_thread.exit() + event.set() ers_thread.join() except: - ers_thread.exit() + event.set() ers_thread.join() raise diff --git a/UnitTests/InfoServiceThread_Test.py b/UnitTests/InfoServiceThread_Test.py index 934378559312b0aa1d896957e563d74771f3878a..8b322c0a21c4c04e09bd5056ab6f75ae462fd467 100644 --- a/UnitTests/InfoServiceThread_Test.py +++ b/UnitTests/InfoServiceThread_Test.py @@ -20,9 +20,7 @@ class TestInfoServiceThread(unittest.TestCase): @classmethod def setUpClass(self): ## we mock the event so that the thread won't be left hanging - self.mock_event = unittest.mock.create_autospec(threading.Event()) - kwargs = {"wait.return_value": True} - self.mock_event.configure_mock(**kwargs) + self.event = threading.Event() ## we mock the copythread with queues self.mock_CopyThread = unittest.mock.create_autospec(CopyThread) @@ -69,14 +67,16 @@ class TestInfoServiceThread(unittest.TestCase): ## we're not interested in any logging for this test with unittest.mock.patch("cs.Threads.InfoServiceThread.enable_file_logging"): - self.info_thread = InfoServiceThread(self.mock_cfg, self.mock_event, self.mock_CopyThread, self.mock_DeleteThread, self.mock_ManagerThread) + self.info_thread = InfoServiceThread(self.mock_cfg, self.event, + self.mock_CopyThread, self.mock_DeleteThread, + self.mock_ManagerThread) class TestInfoServiceExit(TestInfoServiceThread): def test_it_should_stop_thread_execution(self): # I run the thread self.info_thread.start() # and shut it down - self.info_thread.exit() + type(self).event.set() self.info_thread.join(3.0) # and I see that the thread has been killed