Commit 9f8eec9e authored by Mohsen Rezaei Estabragh
codes from Pilot work properly

parent 92ecf89f
......@@ -39,7 +39,6 @@ msg = logging.getLogger(__name__)
from PyJobTransforms.trfJobOptions import JobOptionsTemplate
from PyJobTransforms.trfUtils import asetupReport, unpackDBRelease, setupDBRelease, cvmfsDBReleaseCheck, forceToAlphaNum
from PyJobTransforms.trfUtils import ValgrindCommand, isInteractiveEnv, calcCpuTime, calcWallTime, calcMemFull
from PyJobTransforms.trfUtils import bind_port
from PyJobTransforms.trfExitCodes import trfExit
from PyJobTransforms.trfLogger import stdLogLevels
from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler
......@@ -184,7 +183,6 @@ class transformExecutor(object):
self._exeStart = self._exeStop = None
self._valStart = self._valStop = None
self._memStats = {}
self._memFullStats = ''
self._eventCount = None
self._athenaMP = None
self._athenaMT = None
......@@ -384,8 +382,8 @@ class transformExecutor(object):
def memFullEval(self):
if self._memFullStats:
return calcMemFull(self._memFullStats)
if self._memFullFile:
return calcMemFull(self._memFullFile)
return 'cant read full mem file'
......@@ -773,12 +771,6 @@ class scriptExecutor(transformExecutor):
msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e))
self._memMonitor = False
self._memStats = {}
self._memFullStats = open(self._memFullFile)
except Exception as e:
msg.warning('Failed to load JSON memory full file {0}: {1}'.format(self._memFullFile, e))
self._memMonitor = False
self._memFullStats = 'could not open mem.full file!!'
def validate(self):
if self._valStart is None:
......@@ -818,6 +810,14 @@ class scriptExecutor(transformExecutor):
self._valStop = os.times()
msg.debug('valStop time is {0}'.format(self._valStop))
## check memory monitor results
# add information to the exit message if an excess has seen
fitResult = trfValidation.memoryMonitorReport().fitToMem(self._memFullFile)
if fitResult and self._errMsg:
msg.error('Excess in memory monitor parameter(s)')
self._errMsg = self._errMsg + "; high slope : {0}".format(fitResult['slope'])
#raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'),
# 'Fatal error in memory monitor: "{0}"'.format(exitErrorMessage))
class athenaExecutor(scriptExecutor):
......@@ -656,10 +656,11 @@ def exeResourceReport(exe, report):
if exe.memStats:
exeResource['memory'] = exe.memStats
#it failes when it is aligned with the other
#if exe.memFullEval:
# exeResource['memoryAnalyse'] = exe.memFullEval
if exe.memFullEval:
exeResource['FullMem'] = exe.memFullEval
exeResource['FullMem'] = 'at least report this'
exeResource['memoryAnalyse'] = exe.memFullEval
if exe.eventCount:
exeResource['nevents'] = exe.eventCount
if exe.athenaMP:
......@@ -1199,6 +1199,378 @@ class ParallelJobProcessor(object):
class analytic():
Analytics service class.
_fit = None
def __init__(self, **kwargs):
Init function.
:param kwargs:
self._fit = None
self._memTable = memFullFileToTable()
def fit(self, x, y, model='linear'):
Fitting function.
For a linear model: y(x) = slope * x + intersect
:param x: list of input data (list of floats or ints).
:param y: list of input data (list of floats or ints).
:param model: model name (string).
:raises UnknownException: in case Fit() fails.
self._fit = Fit(x=x, y=y, model=model)
except Exception as e:
msg.warning('fit failed! {0}'.format(e))
return self._fit
def slope(self):
Return the slope of a linear fit, y(x) = slope * x + intersect.
:return: slope (float).
slope = None
if self._fit:
slope = self._fit.slope()
msg.warning('Fit has not been defined')
return slope
def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True):
Return a properly formatted job metrics string with analytics data.
Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks.
:param filename: full path to memory monitor output (string).
:param x_name: optional string, name selector for table column.
:param y_name: optional string, name selector for table column.
:param precision: optional precision for fitted slope parameter, default 2.
:param tails: should tails (first and last values) be used? (boolean).
:return: {"slope": slope, "chi2": chi2} (float strings with desired precision).
self._math = math()
slope = ""
chi2 = ""
table = self._memTable.get_table_from_file(filename,header=None, separator="\t", convert_to_float=True)
if table:
# extract data to be fitted
x, y = self.extract_from_table(table, x_name, y_name)
# remove tails if desired
# this is useful e.g. for memory monitor data where the first and last values
# represent allocation and de-allocation, ie not interesting
if not tails and len(x) > 7 and len(y) > 7:
msg.debug('removing tails from data to be fitted')
x = x[5:]
x = x[:-2]
y = y[5:]
y = y[:-2]
if len(x) > 7 and len(y) > 7:'fitting {0} vs {1}'.format(y_name, x_name))
fit =, y)
_slope = self.slope()
except Exception as e:
msg.warning('failed to fit data, x={0}, y={1}: {2}'.format(x, y, e))
if _slope:
slope = round(fit.slope(), 2)
chi2 = fit.chi2()
if slope != "":'current memory leak: {0} B/s (using {1} data points, chi2={2})'.format(slope, len(x), chi2))
msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>=4)'.format(x, y))
return {"slope": slope, "chi2": chi2}
def extract_from_table(self, table, x_name, y_name):
:param table: dictionary with columns.
:param x_name: column name to be extracted (string).
:param y_name: column name to be extracted (may contain '+'-sign) (string).
:return: x (list), y (list).
x = table.get(x_name, [])
if '+' not in y_name:
y = table.get(y_name, [])
y1_name = y_name.split('+')[0]
y2_name = y_name.split('+')[1]
y1_value = table.get(y1_name, [])
y2_value = table.get(y2_name, [])
except Exception as e:
msg.warning('exception caught: {0}'.format(e))
x = []
y = []
# create new list with added values (1,2,3) + (4,5,6) = (5,7,9)
y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)]
return x, y
class Fit():
Low-level fitting class.
_model = 'linear' # fitting model
_x = None # x values
_y = None # y values
_xm = None # x mean
_ym = None # y mean
_ss = None # sum of square deviations
_ss2 = None # sum of deviations
_slope = None # slope
_intersect = None # intersect
_chi2 = None # chi2
def __init__(self, **kwargs):
Init function.
:param kwargs:
self._math = math()
# extract parameters
self._model = kwargs.get('model', 'linear')
self._x = kwargs.get('x', None)
self._y = kwargs.get('y', None)
if not self._x or not self._y:
msg.warning('input data not defined')
if len(self._x) != len(self._y):
msg.warning('input data (lists) have different lengths')
# base calculations
if self._model == 'linear':
self._ss = self._math.sum_square_dev(self._x)
self._ss2 = self._math.sum_dev(self._x, self._y)
self._xm = self._math.mean(self._x)
self._ym = self._math.mean(self._y)
msg.warning("\'{0}\' model is not implemented".format(self._model))
def fit(self):
Return fitting object.
:return: fitting object.
return self
def value(self, t):
Return the value y(x=t) of a linear fit y(x) = slope * x + intersect.
:return: intersect (float).
return self._slope * t + self._intersect
def set_chi2(self):
Calculate and set the chi2 value.
y_observed = self._y
y_expected = []
#i = 0
for x in self._x:
#y_expected.append(self.value(x) - y_observed[i])
#i += 1
if y_observed and y_observed != [] and y_expected and y_expected != []:
self._chi2 = self._math.chi2(y_observed, y_expected)
self._chi2 = None
def chi2(self):
Return the chi2 value.
:return: chi2 (float).
return self._chi2
def set_slope(self):
Calculate and set the slope of the linear fit.
if self._ss2 and self._ss and self._ss != 0:
self._slope = self._ss2 / self._ss
self._slope = None
def slope(self):
Return the slope value.
:return: slope (float).
return self._slope
def set_intersect(self):
Calculate and set the intersect of the linear fit.
if self._ym and self._slope and self._xm:
self._intersect = self._ym - self._slope * self._xm
self._intersect = None
def intersect(self):
Return the intersect value.
:return: intersect (float).
return self._intersect
class math():
def mean(self, data):
n = len(data)
if n < 1:
msg.warning('mean requires at least one data point')
return sum(data)/n
def sum_square_dev(self, data):
c = self.mean(data)
return sum((x - c) ** 2 for x in data)
def sum_dev(self, x, y):
c1 = self.mean(x)
c2 = self.mean(y)
return sum((_x - c1) * (_y - c2) for _x, _y in zip(x, y))
def chi2(self, observed, expected):
Return the chi2 sum of the provided observed and expected values.
:param observed: list of floats.
:param expected: list of floats.
:return: chi2 (float).
if 0 in expected:
return 0.0
return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected))
class memFullFileToTable():
def get_table_from_file(self, filename, header=None, separator="\t", convert_to_float=None):
Extract a table of data from a txt file.
header="Time VMEM PSS RSS Swap rchar wchar rbytes wbytes"
or the first line in the file is
Time VMEM PSS RSS Swap rchar wchar rbytes wbytes
each of which will become keys in the dictionary, whose corresponding values are stored in lists, with the entries
corresponding to the values in the rows of the input file.
The output dictionary will have the format
{'Time': [ .. data from first row .. ], 'VMEM': [.. data from second row], ..}
:param filename: name of input text file, full path (string).
:param header: header string.
:param separator: separator character (char).
:param convert_to_float: boolean, if True, all values will be converted to floats.
:return: dictionary.
tabledict = {}
keylist = [] # ordered list of dictionary key names
f = open(filename, 'r')
except Exception as e:
msg.warning("failed to open file: {0}, {1}".format(filename, e))
firstline = True
for line in f:
fields = line.split(separator)
if firstline:
firstline = False
tabledict, keylist = self._define_tabledict_keys(header, fields, separator)
if not header:
# from now on, fill the dictionary fields with the input data
i = 0
for field in fields:
# get the corresponding dictionary key from the keylist
key = keylist[i]
# store the field value in the correct list
# TODO: do we need converting to float?
if convert_to_float:
field = float(field)
except Exception as e:
msg.warning("failed to convert {0} to float: {1} (aborting)".format(field, e))
return None
i += 1
return tabledict
def _define_tabledict_keys(self, header, fields, separator):
Define the keys for the tabledict dictionary.
Note: this function is only used by parse_table_from_file().
:param header: header string.
:param fields: header content string.
:param separator: separator character (char).
:return: tabledict (dictionary), keylist (ordered list with dictionary key names).
tabledict = {}
keylist = []
if not header:
# get the dictionary keys from the header of the file
for key in fields:
# first line defines the header, whose elements will be used as dictionary keys
if key == '':
if key.endswith('\n'):
key = key[:-1]
tabledict[key] = []
# get the dictionary keys from the provided header
keys = header.split(separator)
for key in keys:
if key == '':
if key.endswith('\n'):
key = key[:-1]
tabledict[key] = []
return tabledict, keylist
### @brief return Valgrind command
# @detail This function returns a Valgrind command for use with Athena. The
# command is returned as a string (by default) or a list, as requested using
......@@ -1297,9 +1669,6 @@ def bind_port(host, port):
return ret
def calcMemFull(memFullStats):
reportFull = 'in utiles: can not read lines!!'
for line in memFullStats:
reportFull = line
return reportFull
def calcMemFull(memFullStatFile):
result = analytic().get_fitted_data(memFullStatFile, x_name='Time', y_name='pss+swap', precision=2, tails=True)
return result
......@@ -106,6 +106,17 @@ def corruptionTestBS(filename):
rc = p.returncode
return rc
class memoryMonitorReport():
def __init__(self):
self._memFile = memFullStatFile
self._math = trfUtils.math()
self._fit = None
self._memTable = trfUtils.memFullFileToTable()
def fitToMem(self, memFullStatFile):
result = trfUtils.analytic().get_fitted_data(memFullStatFile, x_name='Time', y_name='pss+swap', precision=2, tails=True)
return result
## @brief Class of patterns that can be ignored from athena logfiles
class ignorePatterns(object):
0% Loading or .
