diff --git a/Configs/configparsertest_modifiedvalues.cfg b/Configs/configparsertest_modifiedvalues.cfg index e0aa7a23e86d4f804395cc3edf6d9d2191622ede..b985a2d43eea5df7bd261e0fb6cc93221b51d7ff 100644 --- a/Configs/configparsertest_modifiedvalues.cfg +++ b/Configs/configparsertest_modifiedvalues.cfg @@ -73,6 +73,8 @@ DeleteLowWaterMark: [1] DeleteIgnoreLock: True DeleteFileSystemUsagePeriod: 1 DeleteMinFileAge: 1 +MigrationCheckMaxDurationHour: 64 +MigrationCheckMaxDurationLogLevel: 'ERROR' MigrationCheck: True MigrationCheckDelay: 1 MigrationModuleName: 'cassette' diff --git a/DeploymentTest/tbed.test.bash b/DeploymentTest/tbed.test.bash index e6ecb503b466688b5fb7c20bdbca28b4d99fb1ac..25c3d5852754d0b6a44dafb48b84fca0be9b30cd 100755 --- a/DeploymentTest/tbed.test.bash +++ b/DeploymentTest/tbed.test.bash @@ -82,8 +82,10 @@ else LISTING_COMMAND="find $TMP_DIR -maxdepth 1 -name ${BASEFILENAME}* | sort" fi +echo "TERM=$TERM" + LISTING_OUTPUT=`eval $LISTING_COMMAND` -if [ x$TERM == x ]; then +if [ x$TERM != x ]; then NL=`echo "$LISTING_OUTPUT" | wc -l` for i in `seq 0 $NL`; do echo; done for i in `seq 0 $NL`; do tput cuu1; done @@ -96,13 +98,13 @@ while [ "x$LISTING_OUTPUT" != x ]; do echo "timeout" exit 4 fi - if [ x$TERM == x ]; then + if [ x$TERM != x ]; then echo "$LISTING_OUTPUT" fi sleep 1 - if [ x$TERM == x ]; then + if [ x$TERM != x ]; then for i in `seq 0 $NL`; do tput cuu1; done NCOLS=`tput cols` for i in `seq 0 $NL`; do diff --git a/Script/cs/StorageBackends/noop.py b/Script/cs/StorageBackends/noop.py index 20d0b4dad50036e55b5d6e2c91d80322e838297b..2192a7c8ef96d0943f105a27251696c65fa869c7 100644 --- a/Script/cs/StorageBackends/noop.py +++ b/Script/cs/StorageBackends/noop.py @@ -40,7 +40,7 @@ def listdir(directory, stager, logger=None): def migrated(filename, stager, logger=None): del filename, stager, logger - return True + return False def mkdir(directory, stager, logger=None): del directory, stager, logger diff --git a/Script/cs/Threads/DeleteThread.py b/Script/cs/Threads/DeleteThread.py index 8d3a3df254a7ea358c3797db16e474e9adf3afb1..21bd3c17bc330930c1e3d1f27efd30121b80d9ea 100644 --- a/Script/cs/Threads/DeleteThread.py +++ b/Script/cs/Threads/DeleteThread.py @@ -2,7 +2,7 @@ import threading import os import importlib -from time import time +import time import re import cs.Tools.Constants as Constants from cs.Tools.utils import thread_id_string @@ -81,7 +81,7 @@ class DeleteThread(threading.Thread): # Do not delete file if it's too recent if self.conf.DeleteMinFileAge != 0: - if (time() - fmd.modification_time) < self.conf.DeleteMinFileAge: + if (time.time() - fmd.modification_time) < self.conf.DeleteMinFileAge: self.logger.debug('file too young to be deleted yet: %s', filename) self.deletequeue.put(fmd) continue @@ -125,10 +125,7 @@ class DeleteThread(threading.Thread): self.deletequeue.put(fmd) continue - self.logger.debug('checking migration: %s', filename) - remote_file = os.path.join(fmd.remote_dir, os.path.basename(filename)) - - if self.checkMigration(remote_file, fmd.eos_instance): + if self.checkMigration(fmd): self.logger.debug('file migrated: %s', filename) self.delete(fmd) else: @@ -136,26 +133,45 @@ class DeleteThread(threading.Thread): self.deletequeue.put(fmd) - def checkMigration(self, remotefile, eosinstance): + def checkMigration(self, fmd): + self.logger.debug('checking migration: %s', fmd.file_name) + remotefile = os.path.join(fmd.remote_dir, os.path.basename(fmd.file_name)) + if self.migration_module is None: - return self.conf.backend.migrated(remotefile, eosinstance, self.logger) + migrated = self.conf.backend.migrated(remotefile, fmd.eos_instance, self.logger) else: - return self.migration_module.isMigrated(remotefile) + migrated = self.migration_module.isMigrated(remotefile) + + if migrated: + return True + + # check if this file has been waiting for too long + if self.conf.MigrationCheckMaxDurationHour and \ + ((time.time() - fmd.modification_time) / 3600) \ + > self.conf.MigrationCheckMaxDurationHour: + self.logger.log( + self.conf.MigrationCheckMaxDurationLogLevel, + 'old file still waiting for deletion: %s, mtime: %s', + fmd.file_name, + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fmd.modification_time))) + + return False def checkMigrationDelayExpired(self, filename): if self.conf.MigrationCheckDelay == 0: return True + transfer_completed_time = 0 if filename not in self.migration_waitroom: try: transfer_completed_time = os.path.getctime(filename + Constants.copied_ext) except FileNotFoundError as exc: self.logger.warning(exc) - transfer_completed_time = time() + transfer_completed_time = time.time() self.migration_waitroom[filename] = transfer_completed_time else: transfer_completed_time = self.migration_waitroom.get(filename) - if (time() - transfer_completed_time) > self.conf.MigrationCheckDelay: + if (time.time() - transfer_completed_time) > self.conf.MigrationCheckDelay: return True return False @@ -233,11 +249,12 @@ class DeleteThread(threading.Thread): def getFileSystemUsage(self, filename): - diff = time() - self.last_fs_usage_check_time + now = time.time() + diff = now - self.last_fs_usage_check_time if diff >= self.conf.DeleteFileSystemUsagePeriod: self.logger.debug('getting file system usage after %d seconds', diff) self.fs_usages = self._getFileSystemUsage() - self.last_fs_usage_check_time = time() + self.last_fs_usage_check_time = now self.logger.debug('fs usages: %s', str(self.fs_usages)) try: diff --git a/Script/cs/Tools/ConfigParser.py b/Script/cs/Tools/ConfigParser.py index 8704c09f2bc0ddf8f33329a30d47a2347f8711e4..6d642ed73374e4e19f79863c1ede03a74b44df51 100644 --- a/Script/cs/Tools/ConfigParser.py +++ b/Script/cs/Tools/ConfigParser.py @@ -1,9 +1,11 @@ #!/bin/env python from cs.Tools.Libraries.config import Config +from cs.Tools import LogConfig import importlib from collections import namedtuple import re import os +import logging class REWrap: ### @@ -425,6 +427,8 @@ class ConfigHolder(object): self.MigrationCheckDelay = 1800 self.MigrationModuleName = None self.MigrationModuleConf = {} + self.MigrationCheckMaxDurationHour = None + self.MigrationCheckMaxDurationLogLevel = None self.DeleteExcludeFileRegex = None try: # Enable/disable deletion thread and functionality completely @@ -481,6 +485,17 @@ class ConfigHolder(object): # of DataFilePattern self.DeleteExcludeFileRegex = cfg.DeleteExcludeFileRegex except AttributeError: pass + + try: + # alert when a file waiting for migration delete is older than + # this value + # old is determined from the file's modification time + self.MigrationCheckMaxDurationHour = cfg.MigrationCheckMaxDurationHour + # default value when previous parameter is valued + self.MigrationCheckMaxDurationLogLevel = logging.CRITICAL + self.MigrationCheckMaxDurationLogLevel = \ + LogConfig.str_to_log_level(cfg.MigrationCheckMaxDurationLogLevel) + except AttributeError: pass # ENDIF DeleteEnabled # DDM MONITORING diff --git a/Script/cs/Tools/LogConfig.py b/Script/cs/Tools/LogConfig.py index 93177db4d898a330e3bc2f9ee6d46328eede4603..24aa201507d48c1c1da7967d660eebd235f8e996 100644 --- a/Script/cs/Tools/LogConfig.py +++ b/Script/cs/Tools/LogConfig.py @@ -30,15 +30,17 @@ def enable_mail_logging(config): set_log_level(config.EmailLogLevel, mail_handler) logging.getLogger().addHandler(mail_handler) - -def set_log_level(level, logger): - level = level.upper() +def str_to_log_level(level_str): + level_str = level_str.upper() levelmap = {'DEBUG':logging.DEBUG, 'INFO':logging.INFO, 'WARNING':logging.WARNING, 'ERROR':logging.ERROR, 'CRITICAL':logging.CRITICAL} - logger.setLevel(levelmap[level]) + return levelmap[level_str] + +def set_log_level(level, logger): + logger.setLevel(str_to_log_level(level)) formatter = logging.Formatter('%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s') diff --git a/UnitTests/ConfigParser_Test.py b/UnitTests/ConfigParser_Test.py index 66662297dc997af42fced0e087c3076cd20dbbd6..15067f7ec959baa81abd000168d9c0a284e0f798 100644 --- a/UnitTests/ConfigParser_Test.py +++ b/UnitTests/ConfigParser_Test.py @@ -1,6 +1,7 @@ import unittest import os import re +import logging if __name__ == '__main__': import sys @@ -97,6 +98,8 @@ class TestConfigHolder(unittest.TestCase): self.assert_equal_del('DdmMonitoringProxy', 'atlasgw-exp:3128') self.assert_equal_del('DdmMonitoringEndpoint', 'https://rucio-lb-prod.cern.ch/traces/') self.assert_equal_del('DdmMonitoringQueueMaxSize', 200) + self.assert_equal_del('MigrationCheckMaxDurationHour', None) + self.assert_equal_del('MigrationCheckMaxDurationLogLevel', None) # test and remove parameters that are valued in the config self.assert_equal_del('FileName', 'minimal.cfg') @@ -214,6 +217,8 @@ class TestConfigHolder(unittest.TestCase): self.assert_equal_del('DdmMonitoringProxy', 'ddmproxy') self.assert_equal_del('DdmMonitoringEndpoint', 'ddmendpoint') self.assert_equal_del('DdmMonitoringQueueMaxSize', 1) + self.assert_equal_del('MigrationCheckMaxDurationHour', 64) + self.assert_equal_del('MigrationCheckMaxDurationLogLevel', logging.ERROR) del self.c.backend self.assertEqual(len(self.c.__dict__), 0,