From eae4b01d6df58191f13eb65267ed3a83936f963e Mon Sep 17 00:00:00 2001
From: Fabrice Le Goff <fabrice.le.goff@cern.ch>
Date: Tue, 14 Nov 2023 14:25:21 +0100
Subject: [PATCH] added: notification when a file is waiting for migration for
 too long (configurable)

---
 Configs/configparsertest_modifiedvalues.cfg |  2 +
 DeploymentTest/tbed.test.bash               |  8 ++--
 Script/cs/StorageBackends/noop.py           |  2 +-
 Script/cs/Threads/DeleteThread.py           | 43 ++++++++++++++-------
 Script/cs/Tools/ConfigParser.py             | 15 +++++++
 Script/cs/Tools/LogConfig.py                | 10 +++--
 UnitTests/ConfigParser_Test.py              |  5 +++
 7 files changed, 64 insertions(+), 21 deletions(-)

diff --git a/Configs/configparsertest_modifiedvalues.cfg b/Configs/configparsertest_modifiedvalues.cfg
index e0aa7a2..b985a2d 100644
--- a/Configs/configparsertest_modifiedvalues.cfg
+++ b/Configs/configparsertest_modifiedvalues.cfg
@@ -73,6 +73,8 @@ DeleteLowWaterMark: [1]
 DeleteIgnoreLock: True
 DeleteFileSystemUsagePeriod: 1
 DeleteMinFileAge: 1
+MigrationCheckMaxDurationHour: 64
+MigrationCheckMaxDurationLogLevel: 'ERROR'
 MigrationCheck: True
 MigrationCheckDelay: 1
 MigrationModuleName: 'cassette'
diff --git a/DeploymentTest/tbed.test.bash b/DeploymentTest/tbed.test.bash
index e6ecb50..25c3d58 100755
--- a/DeploymentTest/tbed.test.bash
+++ b/DeploymentTest/tbed.test.bash
@@ -82,8 +82,10 @@ else
   LISTING_COMMAND="find $TMP_DIR -maxdepth 1 -name ${BASEFILENAME}* | sort"
 fi
 
+echo "TERM=$TERM"
+
 LISTING_OUTPUT=`eval $LISTING_COMMAND`
-if [ x$TERM == x ]; then
+if [ x$TERM != x ]; then
   NL=`echo "$LISTING_OUTPUT" | wc -l`
   for i in `seq 0 $NL`; do echo; done
   for i in `seq 0 $NL`; do tput cuu1; done
@@ -96,13 +98,13 @@ while [ "x$LISTING_OUTPUT" != x ]; do
     echo "timeout"
     exit 4
   fi
-  if [ x$TERM == x ]; then
+  if [ x$TERM != x ]; then
     echo "$LISTING_OUTPUT"
   fi
 
   sleep 1
 
-  if [ x$TERM == x ]; then
+  if [ x$TERM != x ]; then
     for i in `seq 0 $NL`; do tput cuu1; done
     NCOLS=`tput cols`
     for i in `seq 0 $NL`; do
diff --git a/Script/cs/StorageBackends/noop.py b/Script/cs/StorageBackends/noop.py
index 20d0b4d..2192a7c 100644
--- a/Script/cs/StorageBackends/noop.py
+++ b/Script/cs/StorageBackends/noop.py
@@ -40,7 +40,7 @@ def listdir(directory, stager, logger=None):
 
 def migrated(filename, stager, logger=None):
     del filename, stager, logger
-    return True
+    return False
 
 def mkdir(directory, stager, logger=None):
     del directory, stager, logger
diff --git a/Script/cs/Threads/DeleteThread.py b/Script/cs/Threads/DeleteThread.py
index 8d3a3df..21bd3c1 100644
--- a/Script/cs/Threads/DeleteThread.py
+++ b/Script/cs/Threads/DeleteThread.py
@@ -2,7 +2,7 @@
 import threading
 import os
 import importlib
-from time import time
+import time
 import re
 import cs.Tools.Constants as Constants
 from cs.Tools.utils import thread_id_string
@@ -81,7 +81,7 @@ class DeleteThread(threading.Thread):
 
             # Do not delete file if it's too recent
             if self.conf.DeleteMinFileAge != 0:
-                if (time() - fmd.modification_time) < self.conf.DeleteMinFileAge:
+                if (time.time() - fmd.modification_time) < self.conf.DeleteMinFileAge:
                     self.logger.debug('file too young to be deleted yet: %s', filename)
                     self.deletequeue.put(fmd)
                     continue
@@ -125,10 +125,7 @@ class DeleteThread(threading.Thread):
                 self.deletequeue.put(fmd)
                 continue
 
-            self.logger.debug('checking migration: %s', filename)
-            remote_file = os.path.join(fmd.remote_dir, os.path.basename(filename))
-
-            if self.checkMigration(remote_file, fmd.eos_instance):
+            if self.checkMigration(fmd):
                 self.logger.debug('file migrated: %s', filename)
                 self.delete(fmd)
             else:
@@ -136,26 +133,45 @@ class DeleteThread(threading.Thread):
                 self.deletequeue.put(fmd)
 
 
-    def checkMigration(self, remotefile, eosinstance):
+    def checkMigration(self, fmd):
+        self.logger.debug('checking migration: %s', fmd.file_name)
+        remotefile = os.path.join(fmd.remote_dir, os.path.basename(fmd.file_name))
+
         if self.migration_module is None:
-            return self.conf.backend.migrated(remotefile, eosinstance, self.logger)
+            migrated = self.conf.backend.migrated(remotefile, fmd.eos_instance, self.logger)
         else:
-            return self.migration_module.isMigrated(remotefile)
+            migrated = self.migration_module.isMigrated(remotefile)
+
+        if migrated:
+            return True
+
+        # check if this file has been waiting for too long
+        if self.conf.MigrationCheckMaxDurationHour and \
+                ((time.time() - fmd.modification_time) / 3600) \
+                > self.conf.MigrationCheckMaxDurationHour:
+            self.logger.log(
+                    self.conf.MigrationCheckMaxDurationLogLevel,
+                    'old file still waiting for deletion: %s, mtime: %s',
+                    fmd.file_name,
+                    time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(fmd.modification_time)))
+
+        return False
 
 
     def checkMigrationDelayExpired(self, filename):
         if self.conf.MigrationCheckDelay == 0:
             return True
+        transfer_completed_time = 0
         if filename not in self.migration_waitroom:
             try:
                 transfer_completed_time = os.path.getctime(filename + Constants.copied_ext)
             except FileNotFoundError as exc:
                 self.logger.warning(exc)
-                transfer_completed_time = time()
+                transfer_completed_time = time.time()
             self.migration_waitroom[filename] = transfer_completed_time
         else:
             transfer_completed_time = self.migration_waitroom.get(filename)
-        if (time() - transfer_completed_time) > self.conf.MigrationCheckDelay:
+        if (time.time() - transfer_completed_time) > self.conf.MigrationCheckDelay:
             return True
         return False
 
@@ -233,11 +249,12 @@ class DeleteThread(threading.Thread):
 
 
     def getFileSystemUsage(self, filename):
-        diff = time() - self.last_fs_usage_check_time
+        now = time.time()
+        diff = now - self.last_fs_usage_check_time
         if diff >= self.conf.DeleteFileSystemUsagePeriod:
             self.logger.debug('getting file system usage after %d seconds', diff)
             self.fs_usages = self._getFileSystemUsage()
-            self.last_fs_usage_check_time = time()
+            self.last_fs_usage_check_time = now
             self.logger.debug('fs usages: %s', str(self.fs_usages))
 
         try:
diff --git a/Script/cs/Tools/ConfigParser.py b/Script/cs/Tools/ConfigParser.py
index 8704c09..6d642ed 100644
--- a/Script/cs/Tools/ConfigParser.py
+++ b/Script/cs/Tools/ConfigParser.py
@@ -1,9 +1,11 @@
 #!/bin/env python
 from cs.Tools.Libraries.config import Config
+from cs.Tools import LogConfig
 import importlib
 from collections import namedtuple
 import re
 import os
+import logging
 
 class REWrap:
 ###
@@ -425,6 +427,8 @@ class ConfigHolder(object):
         self.MigrationCheckDelay = 1800
         self.MigrationModuleName = None
         self.MigrationModuleConf = {}
+        self.MigrationCheckMaxDurationHour = None
+        self.MigrationCheckMaxDurationLogLevel = None
         self.DeleteExcludeFileRegex = None
 
         try: # Enable/disable deletion thread and functionality completely
@@ -481,6 +485,17 @@ class ConfigHolder(object):
                 # of DataFilePattern
                 self.DeleteExcludeFileRegex = cfg.DeleteExcludeFileRegex
             except AttributeError: pass
+
+            try:
+                # alert when a file waiting for migration delete is older than
+                # this value
+                # old is determined from the file's modification time
+                self.MigrationCheckMaxDurationHour = cfg.MigrationCheckMaxDurationHour
+                # default value when previous parameter is valued
+                self.MigrationCheckMaxDurationLogLevel = logging.CRITICAL
+                self.MigrationCheckMaxDurationLogLevel = \
+                        LogConfig.str_to_log_level(cfg.MigrationCheckMaxDurationLogLevel)
+            except AttributeError: pass
         # ENDIF DeleteEnabled
 
         # DDM MONITORING
diff --git a/Script/cs/Tools/LogConfig.py b/Script/cs/Tools/LogConfig.py
index 93177db..24aa201 100644
--- a/Script/cs/Tools/LogConfig.py
+++ b/Script/cs/Tools/LogConfig.py
@@ -30,15 +30,17 @@ def enable_mail_logging(config):
     set_log_level(config.EmailLogLevel, mail_handler)
     logging.getLogger().addHandler(mail_handler)
 
-
-def set_log_level(level, logger):
-    level = level.upper()
+def str_to_log_level(level_str):
+    level_str = level_str.upper()
     levelmap = {'DEBUG':logging.DEBUG,
                 'INFO':logging.INFO,
                 'WARNING':logging.WARNING,
                 'ERROR':logging.ERROR,
                 'CRITICAL':logging.CRITICAL}
-    logger.setLevel(levelmap[level])
+    return levelmap[level_str]
+
+def set_log_level(level, logger):
+    logger.setLevel(str_to_log_level(level))
 
 formatter = logging.Formatter('%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s')
 
diff --git a/UnitTests/ConfigParser_Test.py b/UnitTests/ConfigParser_Test.py
index 6666229..15067f7 100644
--- a/UnitTests/ConfigParser_Test.py
+++ b/UnitTests/ConfigParser_Test.py
@@ -1,6 +1,7 @@
 import unittest
 import os
 import re
+import logging
 
 if __name__ == '__main__':
     import sys
@@ -97,6 +98,8 @@ class TestConfigHolder(unittest.TestCase):
         self.assert_equal_del('DdmMonitoringProxy', 'atlasgw-exp:3128')
         self.assert_equal_del('DdmMonitoringEndpoint', 'https://rucio-lb-prod.cern.ch/traces/')
         self.assert_equal_del('DdmMonitoringQueueMaxSize', 200)
+        self.assert_equal_del('MigrationCheckMaxDurationHour', None)
+        self.assert_equal_del('MigrationCheckMaxDurationLogLevel', None)
 
         # test and remove parameters that are valued in the config
         self.assert_equal_del('FileName', 'minimal.cfg')
@@ -214,6 +217,8 @@ class TestConfigHolder(unittest.TestCase):
         self.assert_equal_del('DdmMonitoringProxy', 'ddmproxy')
         self.assert_equal_del('DdmMonitoringEndpoint', 'ddmendpoint')
         self.assert_equal_del('DdmMonitoringQueueMaxSize', 1)
+        self.assert_equal_del('MigrationCheckMaxDurationHour', 64)
+        self.assert_equal_del('MigrationCheckMaxDurationLogLevel', logging.ERROR)
 
         del self.c.backend
         self.assertEqual(len(self.c.__dict__), 0,
-- 
GitLab