Skip to content
Snippets Groups Projects

atlas_gen_sherpa-ma: phython-based parser to extract numbers from jobReport.json

Closed Walter Lampl requested to merge wlampl/hep-workloads:atlas_gen_sherpa_parser into master
5 files
+ 523
59
Compare changes
  • Side-by-side
  • Inline
Files
5
# Copyright 2019-2020 CERN. See the COPYRIGHT file at the top-level
# directory of this distribution. For licensing information, see the
# COPYING file at the top-level directory of this distribution.
from __future__ import print_function
import os
import json
import sys
import time
import re
from collections import OrderedDict, defaultdict
if (sys.version_info[0] == 2):
from commands import getstatusoutput
else: #py3
from subprocess import getstatusoutput
def get_number(dirname):
"""
Checks the number of proc_* directory so that these directories can be sorted numerically.
This fix is needed to avoid random Test-parsers pipeline failures due to different order wrt reference file.
"""
if dirname.find('_') != -1:
name, number = os.path.splitext(dirname)[0].split('_')
return (name, int(number))
else:
return dirname
def check(infile,injsonfile):
"""
Checks if run was successful. If successful returns True.
"""
print ("%s INFO : Checking presence of %s" %(basename,injsonfile))
if not os.access(injsonfile,os.R_OK):
# trying to check if the tar.gz exists
if os.access(injsonfile+".tar.gz",os.R_OK):
untar_infile(injsonfile+".tar.gz")
else:
print ("%s ERROR : File %s not found!" %(basename,injsonfile))
return False
print ("%s INFO : Checking that the run was successful from %s" %(basename,infile))
try:
successfulrun = False
datafile = open(infile, 'r')
for line in datafile:
if 'successful run' in line:
successfulrun = True
break
if successfulrun:
return True
else:
return False
except IOError:
print ("%s ERROR : File %s not found!" %(basename,infile))
return False
def untar_infile(infile):
print("%s INFO untar_infile %s" % (basename, infile))
if infile.endswith('.tar.gz'):
(rc,outlog)=getstatusoutput("tar -zvxf "+infile + " -C " + infile[:infile.rfind('/')])
if (rc!=0):
print ("%s ERROR Failed to un-tar file %s!" % (basename,infile) )
print(outlog)
sys.exit(1) # the script has to exit
def read_values(infile):
import json
"""
Read information about CPU, memory and number of processed events from PerfMonMT json file.
"""
print ("%s INFO : reading file %s" %(basename,infile))
metrics_dict = defaultdict(float)
with open(infile,'r') as f:
inDict=json.load(f)
metrics_dict["cputime"] = 1000*inDict["resource"]["executor"]["generate"]["cpuTime"] #convert to milliseconds
metrics_dict["vmemPeak"] = inDict["resource"]["executor"]["generate"]["memory"]["Max"]['maxVMEM']
metrics_dict["vmemRSS"] = inDict["resource"]["executor"]["generate"]["memory"]["Max"]['maxRSS']
metrics_dict["walltime"] = 1000*inDict["resource"]["executor"]["generate"]["wallTime"] #convert to milliseconds
metrics_dict["swap"] = inDict["resource"]["executor"]["generate"]["memory"]["Max"]['maxSwap']
#extract number of events form command line:
import re
mg=re.findall("--maxEvents[ ,=][0-9]+",inDict['cmdLine'])
metrics_dict["nbevents"] = int(mg[-1][12:])
print (metrics_dict)
#print('%s' % metrics_dict)
return metrics_dict
def generate_summary(runstatus_,unitvalue,metrics_dict_list_):
"""
Generate a dictionary with all the information.
"""
# compute the score as (Nevt - 1) / walltime, where the -1 removes the first event
# walltime is the walltime of the Execute that does not include the first event
scores = [scale * x[0] / x[1] for x in
zip(metrics_dict_list_["nbevents"],metrics_dict_list_["walltime"])]
score_stats = compute_stats(scores)
summary = OrderedDict()
summary["wl-scores"] = {"gen": float(score_stats["score"])}
summary["wl-stats"] = {"avg":float(score_stats["avg"]),
"median":float(score_stats["median"]),
"min":float(score_stats["min"]),
"max":float(score_stats["max"])
}
summary["custom"] = {
"events_proc_Athena(MT)": [int(val) for val in metrics_dict_list_["nbevents"]],
"score_unit":unitvalue,
"score_proc":[float(val) for val in score_stats["scores_formatted"]],
"vmem":[val for val in metrics_dict_list_["vmemPeak"]],
"RSS" :[val for val in metrics_dict_list_["vmemRSS"]],
"swap" :[val for val in metrics_dict_list_["swap"]],
"walltime_proc" : [float(val) for val in metrics_dict_list_["walltime"]],
"cputime_proc" : [float(val) for val in metrics_dict_list_["cputime"]]
}
return summary
def compute_stats(scores):
finalscore = 0.
avg = 0.
median = 0.
minimum = 0.
maximum = 0.
scores_sorted = sorted(scores)
n = len(scores)
if n > 0:
minimum = min(scores)
maximum = max(scores)
for i in range(n):
finalscore += scores[i]
avg = finalscore/n
if (n % 2 != 0):
median = scores_sorted[(n+1)//2-1]
else:
median = (scores_sorted[(n//2)-1] + scores_sorted[(n//2)])/2.
score_stats = {
"score" : '%.6f' %(finalscore),
"avg" : '%.6f' %(avg),
"median" : '%.6f' %(median),
"min" : '%.6f' %(minimum),
"max" : '%.6f' %(maximum),
"scores_formatted" : [ '%.6f' % elem for elem in scores ]
}
return score_stats
def save_output(data,outfile):
"""
Save output to a JSON file.
"""
jsonfile = json.dumps(data)
fjson = open(outfile,"w")
fjson.write(jsonfile)
fjson.close()
print ("%s INFO : Summary placed in %s" %(basename,outfile))
def main():
"""
Main function where we parse results from a logfile and save them to JSON.
"""
# Exit code of this python script: 0=success, 1=failure (BMK-129)
pythonstatus = 0
# Environment variables
global basename
basename = os.path.basename(__file__)
global basewdir
basewdir = os.environ['baseWDir']
global bmkdir
bmkdir = os.environ['BMKDIR']
global app
app = os.environ['APP']
global resJSON
resJSON = os.path.join(basewdir,"parser_output.json")
# Global variables
injsonfile = "jobReport.json"
logfile = "log.generate"
global scale
scale = 1000.0
if scale == 1:
unit = "evt/ms"
elif scale == 1000:
unit = "evt/s"
elif scale == 1000000:
unit = "evt/ks"
else:
unit = ""
print ("%s WARNING : Scale %i does not have predefined unit! Please define it." %(basename,scale))
# Find logfiles
dirs = []
for (dirpath, dirnames, filenames) in os.walk(basewdir):
dirs.extend([x for x in dirnames if re.match(r"^proc_[0-9]+$",x)]) #BMK-890
break
if len(dirs) == 0:
print ("%s ERROR : no proc_* directories found. Failing" %(basename))
sys.exit(1)
# Parse results from logfiles
#print ("Parsing results from %s" %([os.path.join(basewdir,d,logfile) for d in dirs]))
#runstatus, nevt, scores, scores_wAvg, cpus, cpusError, cpusSys, evtMaxCpus, vmems, RSSs, swaps, walltimes = ([] for i in range(12))
metrics_dict_list = defaultdict(list)
runstatus=[]
for d in sorted(dirs, key=get_number):
logfilepath = os.path.join(basewdir,d,logfile)
injsonfilepath = os.path.join(basewdir,d,injsonfile)
if check(logfilepath,injsonfilepath):
print ("%s INFO : Run was successful." %(basename))
runstatus.append(1)
else:
print ("%s ERROR : Run was not successful!" %(basename))
runstatus.append(0)
pythonstatus = 1
continue
metrics_dict = defaultdict(float)
try:
metrics_dict = read_values(injsonfilepath+'.tar.gz')
except:
# apparently in the CI the file injsonfilepath is not tar.gz
metrics_dict = read_values(injsonfilepath)
for k, v in metrics_dict.items():
metrics_dict_list[k].append(v)
# Generate summary and save to JSON file
jsonSummary = generate_summary(runstatus, unit, metrics_dict_list)
save_output(jsonSummary,resJSON)
print ("%s INFO : json result %s" %(basename,jsonSummary))
# Check if JSON file was created
if not os.path.isfile(resJSON):
print ("%s ERROR : Something went wrong in parsing the CPU score. File path %s does not exist!" %(basename,resJSON))
pythonstatus = 1
# Exit code of this python script: 0=success, 1=failure (BMK-129)
sys.exit(pythonstatus)
if '__main__' in __name__:
main()
Loading