diff --git a/Trigger/TrigValidation/TrigAnalysisTest/python/TrigAnalysisSteps.py b/Trigger/TrigValidation/TrigAnalysisTest/python/TrigAnalysisSteps.py index 3c483eb12015eccf556e745a38dfcb3716d48bd0..247d9d51b9d9deeb8c1aa757418735a8cd186080 100644 --- a/Trigger/TrigValidation/TrigAnalysisTest/python/TrigAnalysisSteps.py +++ b/Trigger/TrigValidation/TrigAnalysisTest/python/TrigAnalysisSteps.py @@ -29,6 +29,7 @@ class AthenaCheckerStep(ExecStep, InputDependentStep): self.input = '' self.explicit_input = True self.perfmon = False + self.prmon = False def configure(self, test): self.args += ' -c \'fileList=["{:s}"]\''.format(self.input_file) diff --git a/Trigger/TrigValidation/TrigValTools/bin/trig-test-json.py b/Trigger/TrigValidation/TrigValTools/bin/trig-test-json.py index d9152030597cc0b7a21952bad9d5bd647a5146c9..071286f549358e6401c9672364ebf597ad4c6668 100755 --- a/Trigger/TrigValidation/TrigValTools/bin/trig-test-json.py +++ b/Trigger/TrigValidation/TrigValTools/bin/trig-test-json.py @@ -11,9 +11,17 @@ import json import re import sys import logging -import os.path -from collections import OrderedDict +import os import six +import numpy as np +from collections import OrderedDict +from pandas import read_csv +from scipy.optimize import curve_fit +from TrigValTools.TrigARTUtils import first_existing_file, newest_file + +import matplotlib +matplotlib.use('PDF') +import matplotlib.pyplot as plt class LastUpdatedOrderedDict(OrderedDict): @@ -71,7 +79,7 @@ def convert_to_megabytes(number, unit): return None -def extract_mem(line): +def extract_mem_perfmon(line): words = line[0].split() mem_end = words[5:7] logging.debug("mem_end = {}".format(mem_end)) @@ -111,8 +119,8 @@ def analyse_perfmon(filename): logging.warning("Cannot extract VMem information from {}".format(filename)) if len(rss_line) == 0: logging.warning("Cannot extract RSS information from {}".format(filename)) - vmem, dvmem = extract_mem(vmem_line) - rss, drss = extract_mem(rss_line) + vmem, dvmem = extract_mem_perfmon(vmem_line) + rss, drss = extract_mem_perfmon(rss_line) data = LastUpdatedOrderedDict() data['vmem'] = "{0:.3f}".format(vmem) data['delta-vmem'] = "{0:.3f}".format(dvmem) @@ -121,11 +129,61 @@ def analyse_perfmon(filename): return data -def first_existing_file(file_list): - for file_name in file_list: - if os.path.isfile(file_name): - return file_name - return None +def mem_func(x_arr, x_trans, init_slope, exec_slope): + retval = [] + for x in x_arr: + if x < x_trans: + retval.append(init_slope * x) + else: + retval.append(exec_slope * x + (init_slope - exec_slope) * x_trans) + return retval + + +def find_dmem_prmon(xdata, ydata, label, filename): + popt, pcov = curve_fit(mem_func, xdata, ydata, bounds=(0, [0.9*max(xdata), np.inf, np.inf])) + logging.debug("Fit result: %s", str(popt)) + plot_prmon_fit(xdata, ydata, popt, label, filename) + x_trans = popt[0] + x_last = xdata.iloc[-1] + dmem_v = mem_func([x_last, x_trans], popt[0], popt[1], popt[2]) + return dmem_v[0] - dmem_v[1] + + +def plot_prmon_fit(xdata, ydata, params, name, filename): + plt.plot(xdata, ydata, 'b-', label=name) + plt.plot(xdata, mem_func(xdata, *params), 'r-', label='{:s} fit, exec slope={:.2f} kB/s'.format(name, params[2])) + plt.xlabel('wtime [s]') + plt.ylabel(name+' [kB]') + plt.legend() + plt.title('{:s} from {:s}'.format(name, filename)) + plt.savefig('prmon_memfit_{:s}.pdf'.format(name), bbox_inches='tight') + plt.clf() + + +def analyse_prmon(filename): + try: + prmon_data = read_csv(filename, sep='\t') + except IOError: + logging.warning("Cannot open file {}".format(filename)) + return None + time_v = prmon_data['wtime'] + pss_v = prmon_data['pss'] + rss_v = prmon_data['rss'] + vmem_v = prmon_data['vmem'] + data = LastUpdatedOrderedDict() + data['vmem'] = "{0:.3f}".format(convert_to_megabytes(max(vmem_v), 'kB')) + data['rss'] = "{0:.3f}".format(convert_to_megabytes(max(rss_v), 'kB')) + data['pss'] = "{0:.3f}".format(convert_to_megabytes(max(pss_v), 'kB')) + if len(time_v) < 10: + logging.info('Not enough prmon data points, skipping memory slope fitting') + return data + d_pss = find_dmem_prmon(time_v, pss_v, 'pss', filename) + d_rss = find_dmem_prmon(time_v, rss_v, 'rss', filename) + d_vmem = find_dmem_prmon(time_v, vmem_v, 'vmem', filename) + data['delta-vmem'] = "{0:.3f}".format(convert_to_megabytes(d_vmem, 'kB')) + data['delta-rss'] = "{0:.3f}".format(convert_to_megabytes(d_rss, 'kB')) + data['delta-pss'] = "{0:.3f}".format(convert_to_megabytes(d_pss, 'kB')) + return data def main(): @@ -165,14 +223,35 @@ def main(): else: data['num-histograms'] = nh - # Get memory usage information - perfmon_log = first_existing_file(['ntuple.perfmon.summary.txt']) - perfmon_data = analyse_perfmon(perfmon_log) if perfmon_log else None - if perfmon_data is None: - logging.warning("Failed to read memory usage information from the log") + # Get memory usage information from prmon + prmon_log = newest_file(r'prmon\..*\.txt') + if not prmon_log: + prmon_log = first_existing_file(['prmon.full.RDOtoRDOTrigger', 'prmon.full.RAWtoESD', 'prmon.full.ESDtoAOD']) + if not prmon_log: + logging.info("No prmon output found, the result will be empty") + data['prmon'] = 'n/a' + else: + logging.info("Analysing prmon output from %s", prmon_log) + prmon_data = analyse_prmon(prmon_log) + if prmon_data is None: + logging.warning("Could not analyse prmon output, the result will be empty") + data['prmon'] = 'n/a' + else: + data['prmon'] = prmon_data + + # Get memory usage information from PerfMon + perfmon_log = newest_file(r'.*perfmon\.summary\.txt') + if not perfmon_log: + logging.info("No PerfMon output found, the result will be empty") data['memory-usage'] = 'n/a' else: - data['memory-usage'] = perfmon_data + logging.info("Analysing PerfMon output from %s", perfmon_log) + perfmon_data = analyse_perfmon(perfmon_log) + if perfmon_data is None: + logging.warning("Could not analyse PerfMon output, the result will be empty") + data['memory-usage'] = 'n/a' + else: + data['memory-usage'] = perfmon_data # Save data to JSON file with open('extra-results.json', 'w') as outfile: diff --git a/Trigger/TrigValidation/TrigValTools/python/TrigARTUtils.py b/Trigger/TrigValidation/TrigValTools/python/TrigARTUtils.py index 036a3ffdb1b4c8108c84381936b4df102114d653..0ad5a6d9e02530f99f55dfb0bc4579acf0375860 100644 --- a/Trigger/TrigValidation/TrigValTools/python/TrigARTUtils.py +++ b/Trigger/TrigValidation/TrigValTools/python/TrigARTUtils.py @@ -8,6 +8,7 @@ import os import re from contextlib import contextmanager + def package_prefix(package): '''Returns a prefix included in names of all tests from the given package''' from TrigValTools.TrigValSteering.Common import package_prefix_dict @@ -44,6 +45,7 @@ def find_scripts(patterns): scripts.sort() return scripts + @contextmanager def remember_cwd(): '''Simple pushd/popd replacement from https://stackoverflow.com/a/169112''' @@ -52,3 +54,28 @@ def remember_cwd(): yield finally: os.chdir(curdir) + + +def first_existing_file(file_list): + ''' + Returns the first file name from the list which corresponds to an existing file. + Returns None if none of the files in the list exist. + ''' + for file_name in file_list: + if os.path.isfile(file_name): + return file_name + return None + + +def newest_file(pattern): + ''' + Returns the newest file (by modification date) in the current directory + with a name matching the pattern. Returns None if no file is matched. + ''' + all_files = os.listdir('.') + rx = re.compile(pattern) + matched_files = [f for f in all_files if re.search(rx, f)] + if not matched_files: + return None + matched_files.sort(key=lambda f: os.stat(f).st_mtime) + return matched_files[-1] diff --git a/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/ExecStep.py b/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/ExecStep.py index dc6029ee6d5dfa6cb3f2e6810961055768115b58..f1e6078bade16141b3b7cc97538d65e3a3bb5b8d 100644 --- a/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/ExecStep.py +++ b/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/ExecStep.py @@ -33,6 +33,7 @@ class ExecStep(Step): self.skip_events = None self.imf = True self.perfmon = True + self.prmon = True self.auto_report_result = True self.required = True self.depends_on_previous = True @@ -162,6 +163,10 @@ class ExecStep(Step): if self.perfmon: athenaopts += ' --perfmon' + # Disable prmon for Reco_tf because it is already started inside the transform + if self.type == 'Reco_tf': + self.prmon = False + # Default threads/concurrent_events/forks if test.package_name == 'TrigUpgradeTest': if self.threads is None: @@ -200,6 +205,13 @@ class ExecStep(Step): else: self.max_events = 1000 + # Set prmon interval based on max events + if self.prmon: + if self.max_events < 100: + self.prmon_interval = 5 + else: + self.prmon_interval = 10 + # Append max/skip events if self.type == 'athena': self.args += ' --evtMax={}'.format(self.max_events) diff --git a/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/Step.py b/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/Step.py index 9153f03130eb569b76b6e86914f0469be8fc849d..881ea89002603b25ec1ebec5bb4b1737ebdfa940 100644 --- a/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/Step.py +++ b/Trigger/TrigValidation/TrigValTools/python/TrigValSteering/Step.py @@ -10,6 +10,7 @@ import os import sys import signal import subprocess +import time from enum import Enum from threading import Timer from TrigValTools.TrigValSteering.Common import get_logger, art_result @@ -37,6 +38,8 @@ class Step(object): self.required = False self.depends_on_previous = False self.timeout = None + self.prmon = False + self.prmon_interval = 5 # monitoring interval in seconds def get_log_file_name(self): return self.log_file_name or self.name+'.log' @@ -125,6 +128,24 @@ class Step(object): return proc.returncode + def __start_prmon(self): + self.log.debug('Starting prmon for pid %d', os.getpid()) + prmon_cmd = 'prmon --pid {:d} --interval {:d}'.format(os.getpid(), self.prmon_interval) + prmon_cmd +=' --filename prmon.{name:s}.txt --json-summary prmon.summary.{name:s}.json'.format(name=self.name) + return subprocess.Popen(prmon_cmd, shell=True) + + def __stop_prmon(self, prmon_proc): + self.log.debug('Stopping prmon') + try: + prmon_proc.send_signal(signal.SIGUSR1) + countWait = 0 + while (not prmon_proc.poll()) and countWait < 10: + time.sleep(0.1) + countWait += 1 + except OSError as err: + self.log.warning('Error while stopping prmon: %s', err) + pass + def run(self, dry_run=False): cmd = '{} {}'.format(self.executable, self.args) if self.output_stream == self.OutputStream.NO_PRINT: @@ -140,10 +161,15 @@ class Step(object): if dry_run: self.result = 0 else: + if self.prmon: + prmon_proc = self.__start_prmon() if self.timeout: self.result = self.__execute_with_timeout(cmd, self.timeout) else: self.result = subprocess.call(cmd, shell=True) + if self.prmon: + self.__stop_prmon(prmon_proc) + if self.auto_report_result: self.report_result()