diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py index 4852574ab83d0df13cef04cff2baa678469f6234..da85d51ee4907f2d489845ec7d3d65b79d8e8d42 100755 --- a/Tools/PyJobTransforms/python/trfExe.py +++ b/Tools/PyJobTransforms/python/trfExe.py @@ -39,7 +39,6 @@ msg = logging.getLogger(__name__) from PyJobTransforms.trfJobOptions import JobOptionsTemplate from PyJobTransforms.trfUtils import asetupReport, unpackDBRelease, setupDBRelease, cvmfsDBReleaseCheck, forceToAlphaNum from PyJobTransforms.trfUtils import ValgrindCommand, isInteractiveEnv, calcCpuTime, calcWallTime, calcMemFull -from PyJobTransforms.trfUtils import bind_port from PyJobTransforms.trfExitCodes import trfExit from PyJobTransforms.trfLogger import stdLogLevels from PyJobTransforms.trfMPTools import detectAthenaMPProcs, athenaMPOutputHandler @@ -184,7 +183,6 @@ class transformExecutor(object): self._exeStart = self._exeStop = None self._valStart = self._valStop = None self._memStats = {} - self._memFullStats = '' self._eventCount = None self._athenaMP = None self._athenaMT = None @@ -384,8 +382,8 @@ class transformExecutor(object): @property def memFullEval(self): - if self._memFullStats: - return calcMemFull(self._memFullStats) + if self._memFullFile: + return calcMemFull(self._memFullFile) else: return 'cant read full mem file' @@ -773,12 +771,6 @@ class scriptExecutor(transformExecutor): msg.warning('Failed to load JSON memory summmary file {0}: {1}'.format(self._memSummaryFile, e)) self._memMonitor = False self._memStats = {} - try: - self._memFullStats = open(self._memFullFile) - except Exception as e: - msg.warning('Failed to load JSON memory full file {0}: {1}'.format(self._memFullFile, e)) - self._memMonitor = False - self._memFullStats = 'could not open mem.full file!!' def validate(self): if self._valStart is None: @@ -818,6 +810,14 @@ class scriptExecutor(transformExecutor): self._valStop = os.times() msg.debug('valStop time is {0}'.format(self._valStop)) + ## check memory monitor results + # add information to the exit message if an excess has seen + fitResult = trfValidation.memoryMonitorReport().fitToMem(self._memFullFile) + if fitResult and self._errMsg: + msg.error('Excess in memory monitor parameter(s)') + self._errMsg = self._errMsg + "; high slope : {0}".format(fitResult['slope']) + #raise trfExceptions.TransformLogfileErrorException(trfExit.nameToCode('TRF_EXEC_LOGERROR'), + # 'Fatal error in memory monitor: "{0}"'.format(exitErrorMessage)) class athenaExecutor(scriptExecutor): diff --git a/Tools/PyJobTransforms/python/trfReports.py b/Tools/PyJobTransforms/python/trfReports.py index 9d7fdee6c11054f530d0d682d8fd2886bc2977d6..a6006d009b471eadb9de0f811611dffdb6b9e2ca 100644 --- a/Tools/PyJobTransforms/python/trfReports.py +++ b/Tools/PyJobTransforms/python/trfReports.py @@ -656,10 +656,11 @@ def exeResourceReport(exe, report): if exe.memStats: exeResource['memory'] = exe.memStats + #it failes when it is aligned with the other + #if exe.memFullEval: + # exeResource['memoryAnalyse'] = exe.memFullEval if exe.memFullEval: - exeResource['FullMem'] = exe.memFullEval - else: - exeResource['FullMem'] = 'at least report this' + exeResource['memoryAnalyse'] = exe.memFullEval if exe.eventCount: exeResource['nevents'] = exe.eventCount if exe.athenaMP: diff --git a/Tools/PyJobTransforms/python/trfUtils.py b/Tools/PyJobTransforms/python/trfUtils.py index 43918b56ed8b1c760a7b9fe98f1c7ebb3ebb2536..17ff61ff8671d472e753ee7dca2b7e68960ac848 100644 --- a/Tools/PyJobTransforms/python/trfUtils.py +++ b/Tools/PyJobTransforms/python/trfUtils.py @@ -1199,6 +1199,378 @@ class ParallelJobProcessor(object): )) msg.debug(self.statusReport()) + +class analytic(): + """ + Analytics service class. + """ + + _fit = None + + def __init__(self, **kwargs): + """ + Init function. + :param kwargs: + """ + self._fit = None + self._memTable = memFullFileToTable() + + def fit(self, x, y, model='linear'): + """ + Fitting function. + For a linear model: y(x) = slope * x + intersect + :param x: list of input data (list of floats or ints). + :param y: list of input data (list of floats or ints). + :param model: model name (string). + :raises UnknownException: in case Fit() fails. + :return: + """ + + try: + self._fit = Fit(x=x, y=y, model=model) + except Exception as e: + msg.warning('fit failed! {0}'.format(e)) + + return self._fit + + def slope(self): + """ + Return the slope of a linear fit, y(x) = slope * x + intersect. + :return: slope (float). + """ + + slope = None + + if self._fit: + slope = self._fit.slope() + else: + msg.warning('Fit has not been defined') + + return slope + + def get_fitted_data(self, filename, x_name='Time', y_name='pss+swap', precision=2, tails=True): + """ + Return a properly formatted job metrics string with analytics data. + Currently the function returns a fit for PSS+Swap vs time, whose slope measures memory leaks. + :param filename: full path to memory monitor output (string). + :param x_name: optional string, name selector for table column. + :param y_name: optional string, name selector for table column. + :param precision: optional precision for fitted slope parameter, default 2. + :param tails: should tails (first and last values) be used? (boolean). + :return: {"slope": slope, "chi2": chi2} (float strings with desired precision). + """ + self._math = math() + slope = "" + chi2 = "" + table = self._memTable.get_table_from_file(filename,header=None, separator="\t", convert_to_float=True) + + if table: + # extract data to be fitted + x, y = self.extract_from_table(table, x_name, y_name) + + # remove tails if desired + # this is useful e.g. for memory monitor data where the first and last values + # represent allocation and de-allocation, ie not interesting + if not tails and len(x) > 7 and len(y) > 7: + msg.debug('removing tails from data to be fitted') + x = x[5:] + x = x[:-2] + y = y[5:] + y = y[:-2] + + if len(x) > 7 and len(y) > 7: + msg.info('fitting {0} vs {1}'.format(y_name, x_name)) + try: + fit = self.fit(x, y) + _slope = self.slope() + print(_slope) + except Exception as e: + msg.warning('failed to fit data, x={0}, y={1}: {2}'.format(x, y, e)) + else: + if _slope: + slope = round(fit.slope(), 2) + chi2 = fit.chi2() + if slope != "": + msg.info('current memory leak: {0} B/s (using {1} data points, chi2={2})'.format(slope, len(x), chi2)) + else: + msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>=4)'.format(x, y)) + + return {"slope": slope, "chi2": chi2} + + def extract_from_table(self, table, x_name, y_name): + """ + :param table: dictionary with columns. + :param x_name: column name to be extracted (string). + :param y_name: column name to be extracted (may contain '+'-sign) (string). + :return: x (list), y (list). + """ + + x = table.get(x_name, []) + if '+' not in y_name: + y = table.get(y_name, []) + else: + try: + y1_name = y_name.split('+')[0] + y2_name = y_name.split('+')[1] + y1_value = table.get(y1_name, []) + y2_value = table.get(y2_name, []) + except Exception as e: + msg.warning('exception caught: {0}'.format(e)) + x = [] + y = [] + else: + # create new list with added values (1,2,3) + (4,5,6) = (5,7,9) + y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)] + + return x, y + + +class Fit(): + """ + Low-level fitting class. + """ + _model = 'linear' # fitting model + _x = None # x values + _y = None # y values + _xm = None # x mean + _ym = None # y mean + _ss = None # sum of square deviations + _ss2 = None # sum of deviations + _slope = None # slope + _intersect = None # intersect + _chi2 = None # chi2 + + def __init__(self, **kwargs): + """ + Init function. + :param kwargs: + """ + self._math = math() + # extract parameters + self._model = kwargs.get('model', 'linear') + self._x = kwargs.get('x', None) + self._y = kwargs.get('y', None) + + if not self._x or not self._y: + msg.warning('input data not defined') + + if len(self._x) != len(self._y): + msg.warning('input data (lists) have different lengths') + + # base calculations + if self._model == 'linear': + self._ss = self._math.sum_square_dev(self._x) + self._ss2 = self._math.sum_dev(self._x, self._y) + self.set_slope() + self._xm = self._math.mean(self._x) + self._ym = self._math.mean(self._y) + self.set_intersect() + self.set_chi2() + + else: + msg.warning("\'{0}\' model is not implemented".format(self._model)) + + def fit(self): + """ + Return fitting object. + :return: fitting object. + """ + + return self + + def value(self, t): + """ + Return the value y(x=t) of a linear fit y(x) = slope * x + intersect. + :return: intersect (float). + """ + + return self._slope * t + self._intersect + + def set_chi2(self): + """ + Calculate and set the chi2 value. + :return: + """ + + y_observed = self._y + y_expected = [] + #i = 0 + for x in self._x: + #y_expected.append(self.value(x) - y_observed[i]) + y_expected.append(self.value(x)) + #i += 1 + if y_observed and y_observed != [] and y_expected and y_expected != []: + self._chi2 = self._math.chi2(y_observed, y_expected) + else: + self._chi2 = None + + def chi2(self): + """ + Return the chi2 value. + :return: chi2 (float). + """ + + return self._chi2 + + def set_slope(self): + """ + Calculate and set the slope of the linear fit. + :return: + """ + + if self._ss2 and self._ss and self._ss != 0: + self._slope = self._ss2 / self._ss + else: + self._slope = None + + def slope(self): + """ + Return the slope value. + :return: slope (float). + """ + + return self._slope + + def set_intersect(self): + """ + Calculate and set the intersect of the linear fit. + :return: + """ + + if self._ym and self._slope and self._xm: + self._intersect = self._ym - self._slope * self._xm + else: + self._intersect = None + + def intersect(self): + """ + Return the intersect value. + :return: intersect (float). + """ + + return self._intersect + + +class math(): + + def mean(self, data): + n = len(data) + if n < 1: + msg.warning('mean requires at least one data point') + + return sum(data)/n + + def sum_square_dev(self, data): + c = self.mean(data) + return sum((x - c) ** 2 for x in data) + + def sum_dev(self, x, y): + c1 = self.mean(x) + c2 = self.mean(y) + return sum((_x - c1) * (_y - c2) for _x, _y in zip(x, y)) + + def chi2(self, observed, expected): + """ + Return the chi2 sum of the provided observed and expected values. + :param observed: list of floats. + :param expected: list of floats. + :return: chi2 (float). + """ + if 0 in expected: + return 0.0 + return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected)) + + +class memFullFileToTable(): + + def get_table_from_file(self, filename, header=None, separator="\t", convert_to_float=None): + """ + Extract a table of data from a txt file. + E.g. + header="Time VMEM PSS RSS Swap rchar wchar rbytes wbytes" + or the first line in the file is + Time VMEM PSS RSS Swap rchar wchar rbytes wbytes + each of which will become keys in the dictionary, whose corresponding values are stored in lists, with the entries + corresponding to the values in the rows of the input file. + The output dictionary will have the format + {'Time': [ .. data from first row .. ], 'VMEM': [.. data from second row], ..} + :param filename: name of input text file, full path (string). + :param header: header string. + :param separator: separator character (char). + :param convert_to_float: boolean, if True, all values will be converted to floats. + :return: dictionary. + """ + tabledict = {} + keylist = [] # ordered list of dictionary key names + try: + f = open(filename, 'r') + except Exception as e: + msg.warning("failed to open file: {0}, {1}".format(filename, e)) + else: + firstline = True + for line in f: + fields = line.split(separator) + if firstline: + firstline = False + tabledict, keylist = self._define_tabledict_keys(header, fields, separator) + if not header: + continue + + # from now on, fill the dictionary fields with the input data + i = 0 + for field in fields: + # get the corresponding dictionary key from the keylist + key = keylist[i] + # store the field value in the correct list + # TODO: do we need converting to float? + if convert_to_float: + try: + field = float(field) + except Exception as e: + msg.warning("failed to convert {0} to float: {1} (aborting)".format(field, e)) + return None + tabledict[key].append(field) + i += 1 + f.close() + return tabledict + + def _define_tabledict_keys(self, header, fields, separator): + """ + Define the keys for the tabledict dictionary. + Note: this function is only used by parse_table_from_file(). + :param header: header string. + :param fields: header content string. + :param separator: separator character (char). + :return: tabledict (dictionary), keylist (ordered list with dictionary key names). + """ + + tabledict = {} + keylist = [] + + if not header: + # get the dictionary keys from the header of the file + for key in fields: + # first line defines the header, whose elements will be used as dictionary keys + if key == '': + continue + if key.endswith('\n'): + key = key[:-1] + tabledict[key] = [] + keylist.append(key) + else: + # get the dictionary keys from the provided header + keys = header.split(separator) + for key in keys: + if key == '': + continue + if key.endswith('\n'): + key = key[:-1] + tabledict[key] = [] + keylist.append(key) + + return tabledict, keylist + + ### @brief return Valgrind command # @detail This function returns a Valgrind command for use with Athena. The # command is returned as a string (by default) or a list, as requested using @@ -1297,9 +1669,6 @@ def bind_port(host, port): s.close() return ret -def calcMemFull(memFullStats): - reportFull = 'in utiles: can not read lines!!' - for line in memFullStats: - reportFull = line - break - return reportFull +def calcMemFull(memFullStatFile): + result = analytic().get_fitted_data(memFullStatFile, x_name='Time', y_name='pss+swap', precision=2, tails=True) + return result diff --git a/Tools/PyJobTransforms/python/trfValidation.py b/Tools/PyJobTransforms/python/trfValidation.py index 38abe22d18fbef9f01c1684f78fd9d5e558c4009..8ee7aeb053094c31e62d60e2fd7781b792ccacce 100644 --- a/Tools/PyJobTransforms/python/trfValidation.py +++ b/Tools/PyJobTransforms/python/trfValidation.py @@ -106,6 +106,17 @@ def corruptionTestBS(filename): rc = p.returncode return rc +class memoryMonitorReport(): + ''' + def __init__(self): + self._memFile = memFullStatFile + self._math = trfUtils.math() + self._fit = None + self._memTable = trfUtils.memFullFileToTable() + ''' + def fitToMem(self, memFullStatFile): + result = trfUtils.analytic().get_fitted_data(memFullStatFile, x_name='Time', y_name='pss+swap', precision=2, tails=True) + return result ## @brief Class of patterns that can be ignored from athena logfiles class ignorePatterns(object):