From 8de21ab522d45618edb955f6a5ec1161d5a37c9d Mon Sep 17 00:00:00 2001
From: Mohsen Rezaei Estabragh <mohsen.rezaei.estabragh@cern.ch>
Date: Fri, 18 Sep 2020 09:26:48 +0200
Subject: [PATCH] implementing slope cut and adding a message to the exit
 message. removing some unnecessary codes

---
 Tools/PyJobTransforms/python/trfExe.py        |  56 +++--
 Tools/PyJobTransforms/python/trfReports.py    |   7 +-
 Tools/PyJobTransforms/python/trfUtils.py      | 212 +++++++++---------
 Tools/PyJobTransforms/python/trfValidation.py |  11 -
 4 files changed, 153 insertions(+), 133 deletions(-)

diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py
index da85d51ee490..58eeaf07c1ae 100755
--- a/Tools/PyJobTransforms/python/trfExe.py
+++ b/Tools/PyJobTransforms/python/trfExe.py
@@ -38,7 +38,7 @@ msg = logging.getLogger(__name__)
 
 from PyJobTransforms.trfJobOptions import JobOptionsTemplate
 from PyJobTransforms.trfUtils import asetupReport, unpackDBRelease, setupDBRelease, cvmfsDBReleaseCheck, forceToAlphaNum
-from PyJobTransforms.trfUtils import ValgrindCommand, isInteractiveEnv, calcCpuTime, calcWallTime, calcMemFull
+from PyJobTransforms.trfUtils import ValgrindCommand, isInteractiveEnv, calcCpuTime, calcWallTime, analytic
 from PyJobTransforms.trfExitCodes import trfExit
 from PyJobTransforms.trfLogger import stdLogLevels
 from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
@@ -183,6 +183,8 @@ class transformExecutor(object):
         self._exeStart = self._exeStop = None
         self._valStart = self._valStop = None
         self._memStats = {}
+        self._memLeakResult = {}
+        self._memFullFile = None
         self._eventCount = None
         self._athenaMP = None
         self._athenaMT = None
@@ -381,11 +383,8 @@ class transformExecutor(object):
         return self._memStats
 
     @property
-    def memFullEval(self):
-        if self._memFullFile:
-            return calcMemFull(self._memFullFile)
-        else:
-            return 'cant read full mem file'
+    def memAnalysis(self):
+        return self._memLeakResult
 
     @property
     def postExeCpuTime(self):
@@ -810,15 +809,6 @@ class scriptExecutor(transformExecutor):
         self._valStop = os.times()
         msg.debug('valStop time is {0}'.format(self._valStop))
 
-        ## check memory monitor results
-        # add information to the exit message if an excess has seen
-        fitResult = trfValidation.memoryMonitorReport().fitToMem(self._memFullFile)
-        if fitResult and self._errMsg:
-            msg.error('Excess in memory monitor parameter(s)')
-            self._errMsg = self._errMsg + "; high slope : {0}".format(fitResult['slope'])
-        #raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
-        #                                                          'Fatal error in memory monitor: "{0}"'.format(exitErrorMessage))
-
 
 class athenaExecutor(scriptExecutor):
     _exitMessageLimit = 200 # Maximum error message length to report in the exitMsg
@@ -1217,6 +1207,25 @@ class athenaExecutor(scriptExecutor):
         self._valStop = os.times()
         msg.debug('valStop time is {0}'.format(self._valStop))
 
+        ## Get results of memory monitor analysis (slope and chi2)
+        # the analysis is a linear fit to 'Time' vs 'pss' (fit to at least 5 data points)
+        # to obtain a good fit, tails are excluded from data
+        # if the slope of 'pss' is high (>5MB/s) and an error is already caught,
+        # a message will be added to the exit message
+        if self._memFullFile:
+            msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
+            self._memLeakResult = analytic().getFittedData(self._memFullFile)
+            if self._memLeakResult:
+                if self._memLeakResult['slope'] > 5000:
+                    msg.warning('Possible memory leak; abnormal high values in memory monitor parameters (ignore this message if the job has finished successfully)')
+                    if deferredException is not None:
+                        deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
+            else:
+                msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
+        else:
+            msg.info('No memory monitor file to be analysed')
+
+
     ## @brief Prepare the correct command line to be used to invoke athena
     def _prepAthenaCommandLine(self):
         ## Start building up the command line
@@ -1797,6 +1806,23 @@ class DQMergeExecutor(scriptExecutor):
         self._valStop = os.times()
         msg.debug('valStop time is {0}'.format(self._valStop))
 
+        ## Get results of memory monitor analysis (slope and chi2)
+        # the analysis is a linear fit to 'Time' vs 'pss' (fit to at least 5 data points)
+        # if the slope of 'pss' is high (>5MB/s) and an error is already caught,
+        # a message will be added to the exit message
+        if self._memFullFile:
+            msg.info('Analysing memory monitor output file {0} for possible memory leak'.format(self._memFullFile))
+            self._memLeakResult = analytic().getFittedData(self._memFullFile)
+            if self._memLeakResult:
+                if self._memLeakResult['slope'] > 5000:
+                    msg.warning('Possible memory leak; abnormal high values in memory monitor parameters (ignore this message if the job has finished successfully)')
+                    if deferredException is not None:
+                        deferredException.errMsg = deferredException.errMsg + "; Possible memory leak: 'pss' slope: {0} KB/s".format(self._memLeakResult['slope'])
+            else:
+                msg.warning('Failed to analyse the memory monitor file {0}'.format(self._memFullFile))
+        else:
+            msg.info('No memory monitor file to be analysed')
+
 
 ## @brief Specialist execution class for merging NTUPLE files
 class NTUPMergeExecutor(scriptExecutor):
diff --git a/Tools/PyJobTransforms/python/trfReports.py b/Tools/PyJobTransforms/python/trfReports.py
index a6006d009b47..5f2b4b27d5ee 100644
--- a/Tools/PyJobTransforms/python/trfReports.py
+++ b/Tools/PyJobTransforms/python/trfReports.py
@@ -656,11 +656,8 @@ def exeResourceReport(exe, report):
 
     if exe.memStats:
         exeResource['memory'] = exe.memStats
-    #it failes when it is aligned with the other
-    #if exe.memFullEval:
-    #   exeResource['memoryAnalyse'] = exe.memFullEval
-        if exe.memFullEval:
-            exeResource['memoryAnalyse'] = exe.memFullEval
+    if exe.memAnalysis:
+        exeResource['memoryAnalysis'] = exe.memAnalysis
     if exe.eventCount:
         exeResource['nevents'] = exe.eventCount
     if exe.athenaMP:
diff --git a/Tools/PyJobTransforms/python/trfUtils.py b/Tools/PyJobTransforms/python/trfUtils.py
index f641af0642bf..0fc4ca850ffa 100644
--- a/Tools/PyJobTransforms/python/trfUtils.py
+++ b/Tools/PyJobTransforms/python/trfUtils.py
@@ -1212,7 +1212,6 @@ class analytic():
     #  @param x list of input data (list of floats or ints).
     #  @param y: list of input data (list of floats or ints).
     #  @param model: model name (string).
-    #  raises UnknownException: in case Fit() fails.
     def fit(self, x, y, model='linear'):
         try:
             self._fit = Fit(x=x, y=y, model=model)
@@ -1233,66 +1232,73 @@ class analytic():
         return slope
 
     # Return a properly formatted job metrics string with analytics data.
-    # Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks.
+    # Currently the function returns a fit for 'pss' vs 'time', whose slope measures memory leaks.
     # @param filename: memory monitor output file (string).
     # @param x_name: optional string, name selector for table column.
     # @param y_name: optional string, name selector for table column.
     # @param precision: optional precision for fitted slope parameter, default 2.
-    # @param tails: should tails (first and last values) be used? (boolean).
-    # @return: {"slope": slope, "chi2": chi2} (float strings with desired precision).
-    def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True):
-        self._math = math()
-        slope = ""
-        chi2 = ""
-        table = get_table_from_file(filename,header=None, separator="\t", convert_to_float=True)
-
+    # @param tails: should tails be used? (boolean).
+    # @param minPoints: minimun desired points of data to be fitted (after removing tail)
+    # @return: {"slope": slope, "chi2": chi2}
+    def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5):
+        _memFileToTable = memFileToTable()
+        fitResult = {}
+        table = _memFileToTable.getTable(filename, header=None, separator="\t")
         if table:
             # extract data to be fitted
-            x, y = self.extract_from_table(table, x_name, y_name)
-
+            x, y = self.extractFromTable(table, x_name, y_name)
             # remove tails if desired
             # this is useful e.g. for memory monitor data where the first and last values
             # represent allocation and de-allocation, ie not interesting
-            if not tails and len(x) > 7 and len(y) > 7:
-                msg.debug('removing tails from data to be fitted')
-                x = x[5:]
-                x = x[:-2]
-                y = y[5:]
-                y = y[:-2]
-
-            if len(x) > 7 and len(y) > 7:
+            # here tail is defined to be first and last 20% of data
+            if not tails:
+                tail = int(len(x)/5)
+                msg.info('removing tails from the memory monitor data; 20% from each side')
+                x = x[tail:]
+                x = x[:-tail]
+                y = y[tail:]
+                y = y[:-tail]
+
+            if len(x) > minPoints and len(y) > minPoints:
                 msg.info('fitting {0} vs {1}'.format(y_name, x_name))
                 try:
                     fit = self.fit(x, y)
                     _slope = self.slope()
-                    print(_slope)
                 except Exception as e:
                     msg.warning('failed to fit data, x={0}, y={1}: {2}'.format(x, y, e))
                 else:
                     if _slope:
-                        slope = round(fit.slope(), 2)
-                        chi2 = fit.chi2()
-                        if slope != "":
-                            msg.info('current memory leak: {0} B/s (using {1} data points, chi2={2})'.format(slope, len(x), chi2))
+                        slope = round(fit.slope(), precision)
+                        chi2 = round(fit.chi2(), precision)
+                        fitResult = {"slope": slope, "chi2": chi2}
+                        if slope:
+                            HRslope, unit = self.formatBytes(slope)
+                            msg.info('current memory leak: {0} {1} (using {2} data points, chi2={3})'.format(HRslope, unit, len(x), chi2))
             else:
-                msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>=4)'.format(x, y))
+                msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>={2})'.format(x, y, minPoints))
 
-        return {"slope": slope, "chi2": chi2}
+        return fitResult
 
     # Extrcat wanted columns. e.g. x: Time , y: pss+swap
     # @param x_name: column name to be extracted (string).
     # @param y_name: column name to be extracted (may contain '+'-sign) (string).
     # @return: x (list), y (list).
-    def extract_from_table(self, table, x_name, y_name):
+    def extractFromTable(self, table, x_name, y_name):
+        headerUpperVersion = {'pss':'PSS', 'swap':'Swap', 'rss':'RSS', 'vmem':'VMEM'}
         x = table.get(x_name, [])
         if '+' not in y_name:
             y = table.get(y_name, [])
+            if len(y)==0:
+                y = table.get(headerUpperVersion[y_name], [])
         else:
             try:
                 y1_name = y_name.split('+')[0]
                 y2_name = y_name.split('+')[1]
                 y1_value = table.get(y1_name, [])
                 y2_value = table.get(y2_name, [])
+                if len(y1_value)==0 or len(y2_value)==0:
+                    y1_value = table.get(headerUpperVersion[y1_name], [])
+                    y2_value = table.get(headerUpperVersion[y2_name], [])
             except Exception as e:
                 msg.warning('exception caught: {0}'.format(e))
                 x = []
@@ -1303,6 +1309,19 @@ class analytic():
 
         return x, y
 
+    # Make the result of slope human readable (HR)
+    # default unit is KB
+    def formatBytes(self, size):
+        # decimal system
+        power = 1000
+        n = 1
+        power_labels = {1: 'K', 2: 'M', 3: 'G', 4: 'T'}
+        while size > power:
+            size /= power
+            n += 1
+        return round(size, 2), power_labels[n]+'B/s'
+
+
 ## @breif Low-level fitting class
 class Fit():
     _model = 'linear'  # fitting model
@@ -1321,7 +1340,6 @@ class Fit():
         self._model = kwargs.get('model', 'linear')
         self._x = kwargs.get('x', None)
         self._y = kwargs.get('y', None)
-
         self._math = math()
 
         if not self._x or not self._y:
@@ -1397,7 +1415,6 @@ class math():
         n = len(data)
         if n < 1:
             msg.warning('mean requires at least one data point')
-
         return sum(data)/n
 
     # Return sum of square deviations of sequence data.
@@ -1407,7 +1424,7 @@ class math():
         return sum((x - c) ** 2 for x in data)
 
     # Return sum of deviations of sequence data.
-    # Sum (x - x_mean)**(y - y_mean)
+    # Sum (x - x_mean)*(y - y_mean)
     def sum_dev(self, x, y):
         c1 = self.mean(x)
         c2 = self.mean(y)
@@ -1419,10 +1436,10 @@ class math():
             return 0.0
         return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected))
 
+
 ## @brief  Extract a table of data from a txt file
-#  @details E.g. header="Time VMEM PSS RSS Swap rchar wchar rbytes wbytes"
-#  or the first line in the file is
-#  Time VMEM PSS RSS Swap rchar wchar rbytes wbytes
+#  @details E.g. header="Time    nprocs  nthreads    wtime   stime   utime   pss rss swap    vmem"
+#  or the first line in the file
 #  each of which will become keys in the dictionary, whose corresponding values are stored in lists, with the entries
 #  corresponding to the values in the rows of the input file.
 #  The output dictionary will have the format
@@ -1430,73 +1447,68 @@ class math():
 #  @param filename name of input text file, full path (string).
 #  @param header header string.
 #  @param separator separator character (char).
-#  @param convert_to_float boolean, if True, all values will be converted to floats.
 #  @return dictionary.
-def get_table_from_file(filename, header=None, separator="\t", convert_to_float=None):
-    try:
-        f = open(filename, 'r')
-    except Exception as e:
-        msg.warning("failed to open file: {0}, {1}".format(filename, e))
-    else:
-        firstline = True
-        for line in f:
-            fields = line.split(separator)
-            if firstline:
-                firstline = False
-                tabledict, keylist = _define_tabledict_keys(header, fields, separator)
-                if not header:
-                    continue
+class memFileToTable():
 
-            # from now on, fill the dictionary fields with the input data
-            i = 0
-            for field in fields:
-                # get the corresponding dictionary key from the keylist
-                key = keylist[i]
-                # store the field value in the correct list
-                # TODO: do we need converting to float?
-                if convert_to_float:
-                    try:
-                        field = float(field)
-                    except Exception as e:
-                        msg.warning("failed to convert {0} to float: {1} (aborting)".format(field, e))
-                        return None
-                tabledict[key].append(field)
-                i += 1
-        f.close()
-    return tabledict
-
-## @brief Define the keys for the tabledict dictionary.
-# @note should be called by get_table_from_file()
-# @param header header string.
-# @param fields header content string.
-# @param separator separator character (char).
-# @return tabledict (dictionary), keylist (ordered list with dictionary key names).
-def _define_tabledict_keys(header, fields, separator):
-    tabledict = {}
-    keylist = []
-
-    if not header:
-        # get the dictionary keys from the header of the file
-        for key in fields:
-            # first line defines the header, whose elements will be used as dictionary keys
-            if key == '':
-                continue
-            if key.endswith('\n'):
-                key = key[:-1]
-            tabledict[key] = []
-            keylist.append(key)
-    else:
-        # get the dictionary keys from the provided header
-        keys = header.split(separator)
-        for key in keys:
-            if key == '':
-                continue
-            if key.endswith('\n'):
-                key = key[:-1]
-            tabledict[key] = []
-            keylist.append(key)
+    def getTable(self, filename, header=None, separator="\t"):
+        tabledict = {}
+        keylist = []
+        try:
+            f = open(filename, 'r')
+        except Exception as e:
+            msg.warning("failed to open file: {0}, {1}".format(filename, e))
+        else:
+            firstline = True
+            for line in f:
+                fields = line.split(separator)
+                if firstline:
+                    firstline = False
+                    tabledict, keylist = self._defineTableDictKeys(header, fields, separator)
+                    if not header:
+                        continue
+                # from now on, fill the dictionary fields with the input data
+                i = 0
+                for field in fields:
+                    # get the corresponding dictionary key from the keylist
+                    key = keylist[i]
+                    # store the field value in the correct list
+                    tabledict[key].append(float(field))
+                    i += 1
+            f.close()
+
+        return tabledict
+
+    ## @brief Define the keys for the tabledict dictionary.
+    # @param header header string.
+    # @param fields header content string.
+    # @param separator separator character (char).
+    # @return tabledict (dictionary), keylist (ordered list with dictionary key names).
+    def _defineTableDictKeys(self, header, fields, separator):
+        tabledict = {}
+        keylist = []
+
+        if not header:
+            # get the dictionary keys from the header of the file
+            for key in fields:
+                # first line defines the header, whose elements will be used as dictionary keys
+                if key == '':
+                    continue
+                if key.endswith('\n'):
+                    key = key[:-1]
+                tabledict[key] = []
+                keylist.append(key)
+        else:
+            # get the dictionary keys from the provided header
+            keys = header.split(separator)
+            for key in keys:
+                if key == '':
+                    continue
+                if key.endswith('\n'):
+                    key = key[:-1]
+                tabledict[key] = []
+                keylist.append(key)
 
-    return tabledict, keylist
+        return tabledict, keylist
 
 
 ### @brief return Valgrind command
@@ -1596,7 +1608,3 @@ def bind_port(host, port):
         ret=1
     s.close()
     return ret
-
-def calcMemFull(memFullStatFile):
-    result = analytic().get_fitted_data(memFullStatFile, x_name='Time', y_name='pss+swap', precision=2, tails=True)
-    return result
diff --git a/Tools/PyJobTransforms/python/trfValidation.py b/Tools/PyJobTransforms/python/trfValidation.py
index 8ee7aeb05309..38abe22d18fb 100644
--- a/Tools/PyJobTransforms/python/trfValidation.py
+++ b/Tools/PyJobTransforms/python/trfValidation.py
@@ -106,17 +106,6 @@ def corruptionTestBS(filename):
     rc = p.returncode
     return rc
 
-class memoryMonitorReport():
-    '''
-    def __init__(self):
-        self._memFile = memFullStatFile
-        self._math = trfUtils.math()
-        self._fit = None
-        self._memTable = trfUtils.memFullFileToTable()
-    '''
-    def fitToMem(self, memFullStatFile):
-        result = trfUtils.analytic().get_fitted_data(memFullStatFile, x_name='Time', y_name='pss+swap', precision=2, tails=True)
-        return result
 
 ## @brief Class of patterns that can be ignored from athena logfiles
 class ignorePatterns(object):
-- 
GitLab