Commit 81efea8f authored by Andrea Sciaba's avatar Andrea Sciaba
Browse files

First version

parent 73a934c3
#!/usr/bin/env python
"""
_CatalogParser_
Read an XML Pool Catalog and create a list of file information from it.
Eventually this should go away as this info gets inserted straight into the
job report, but for now we need to extract it from the catalog.
Note that this parser is not designed for large catalogs. If you are dealing
with large catalogs:
1. Consider using the CachedParser in CMSGLIDE
2. Consider who is repsonsible for creating large XML catalogs and devise a
way to cause them pain.
"""
from xml.sax.handler import ContentHandler
from xml.sax import make_parser
class FileEntry(dict):
"""
_FileEntry_
Dictionary container object for file entry
"""
def __init__(self):
self.setdefault('GUID', None)
self.setdefault('LFN', None)
self.setdefault('PFN', [])
class CatalogHandler(ContentHandler):
"""
_CatalogHandler_
Trip through the catalog and compile a list of file entries from
it.
"""
def __init__(self):
ContentHandler.__init__(self)
self.files = []
self.currentFile = FileEntry()
def startElement(self, name, attrs):
"""
_startElement_
"""
if name == "File":
self.currentFile = FileEntry()
# get ID from attrs
idValue = attrs.get("ID", None)
if idValue != None:
self.currentFile['GUID'] = str(idValue)
return
if name == "pfn":
pfnValue = attrs.get("name", None)
if pfnValue != None:
self.currentFile['PFN'].append(str(pfnValue))
return
if name == "lfn":
lfnValue = attrs.get("name", None)
if lfnValue != None:
self.currentFile['LFN'] = str(lfnValue)
return
def endElement(self, name):
"""
_endElement_
"""
if name == "File":
self.files.append(self.currentFile)
self.currentFile == None
return
def readCatalog(catalogXMLFile):
"""
_readCatalog_
Read the catalog and extract the list of files from it.
Returns a list of FileEntry (dictionary) objects containing
details of each file
"""
# //
# // Hack out the goddammed InMemory DTD crap that POOL uses
#// so we can actually read the XML with the parser without
# //it blowing up.
# //
#//
catalogContent = file(catalogXMLFile).read()
catalogContent = catalogContent.replace(
"<!DOCTYPE POOLFILECATALOG SYSTEM \"InMemory\">", "")
handler = CatalogHandler()
parser = make_parser()
parser.setContentHandler(handler)
parser.feed(catalogContent)
result = handler.files
del handler, parser, catalogContent
return result
#!/usr/bin/env python
"""
_FileInfo_
Container object for file information.
Contains information about a single file as a dictionary
"""
from IMProv.IMProvNode import IMProvNode
class FileInfo(dict):
"""
_FileInfo_
Dictionary based container for holding details about a
file.
Majority of keys are key:string single value, however a
few need to be list based
"""
def __init__(self):
dict.__init__(self)
self.setdefault("LFN", None)
self.setdefault("PFN", None)
self.setdefault("GUID", None)
self.setdefault("Size", None)
self.setdefault("TotalEvents", None)
self.setdefault("EventsRead", None)
# //
# // Is this an input or output file?
#//
self.isInput = False
# //
# // open/closed state
#//
self.state = "closed"
# //
# // Output files is a list of input files which contain
#// the LFN and PFN of all contributing inputs
self.inputFiles = []
# //
# // List of Branch names
#//
self.branches = []
# //
# // List of Runs
#//
self.runs = []
# //
# // Dataset is a dictionary and will have the same key
#// structure as the MCPayloads.DatasetInfo object
self.dataset = []
# //
# // Checksums include a flag indicating which kind of
#// checksum alg was used.
self.checksums = {}
def addInputFile(self, pfn, lfn):
"""
_addInputFile_
Add an input file LFN and event range used as input to produce the
file described by this instance.
NOTE: May need to allow multiple ranges per file later on for skimming
etc. However care must be taken to ensure we dont end up with event
lists, since these will be potentially huge.
"""
self.inputFiles.append({"PFN" : pfn,
"LFN" : lfn})
return
def newDataset(self):
"""
_newDataset_
Add a new dataset that this file is associated with and return
the dictionary to be populated
"""
newDS = {}
self.dataset.append(newDS)
return newDS
def addChecksum(self, algorithm, value):
"""
_addChecksum_
Add a Checksum to this file. Eg:
"cksum", 12345657
"""
self.checksums[algorithm] = value
return
def save(self):
"""
_save_
Return an improvNode structure containing details
of this object so it can be saved to a file
"""
if self.isInput:
improvNode = IMProvNode("InputFile")
else:
improvNode = IMProvNode("File")
# //
# // General keys
#//
for key, val in self.items():
if val == None:
continue
node = IMProvNode(str(key), str(val))
improvNode.addNode(node)
# //
# // Checksums
#//
for key, val in self.checksums.items():
improvNode.addNode(IMProvNode("Checksum", val, Algorithm = key) )
# //
# // State
#//
improvNode.addNode(IMProvNode("State", None, Value = self.state))
# //
# // Inputs
#//
if not self.isInput:
inputs = IMProvNode("Inputs")
improvNode.addNode(inputs)
for inputFile in self.inputFiles:
inpNode = IMProvNode("Input")
for key, value in inputFile.items():
inpNode.addNode(IMProvNode(key, value))
inputs.addNode(inpNode)
# //
# // Runs
#//
runs = IMProvNode("Runs")
improvNode.addNode(runs)
for run in self.runs:
runs.addNode(IMProvNode("Run", run))
# //
# // Dataset info
#//
if not self.isInput:
for datasetEntry in self.dataset:
dataset = IMProvNode("Dataset")
improvNode.addNode(dataset)
for key, val in datasetEntry.items():
dataset.addNode(IMProvNode(key, str(val)))
# //
# // Branches
#//
branches = IMProvNode("Branches")
improvNode.addNode(branches)
for branch in self.branches:
branches.addNode(IMProvNode("Branch", branch))
return improvNode
#!/usr/bin/env python
"""
_FwkJobReport_
Toplevel object for representing a Framework Job Report and
manipulating the bits and pieces of it.
"""
from FwkJobRep.FileInfo import FileInfo
from IMProv.IMProvNode import IMProvNode
class FwkJobReport:
"""
_FwkJobReport_
Framework Job Report container and interface object
"""
def __init__(self, name = None):
self.name = name
self.status = None
self.jobSpecId = None
self.jobType = None
self.workflowSpecId = None
self.files = []
self.inputFiles = []
self.errors = []
self.skippedEvents = []
self.skippedFiles = []
self.exitCode = 0
self.siteDetails = {}
self.timing = {}
self.storageStatistics = None
self.generatorInfo = {}
def wasSuccess(self):
"""
_wasSuccess_
Generate a boolean expression from this report to indicate if
it comes from a successful job or not.
This method will return True if:
exitCode == 0 AND status = "Success"
Otherwise it will return false
"""
return (self.exitCode == 0) and (self.status == "Success")
def newFile(self):
"""
_newFile_
Insert a new file into the Framework Job Report object.
returns a FwkJobRep.FileInfo
object by reference that can be populated with extra details of
the file.
"""
fileInfo = FileInfo()
self.files.append(fileInfo)
return fileInfo
def newInputFile(self):
"""
_newInputFile_
Insert an new Input File into this job report and return the
corresponding FileInfo instance so that it can be populated
"""
fileInfo = FileInfo()
fileInfo.isInput = True
self.inputFiles.append(fileInfo)
return fileInfo
def addSkippedEvent(self, runNumber, eventNumber):
"""
_addSkippedEvent_
Add a skipped event record run/event number pair
"""
self.skippedEvents.append(
{"Run" : runNumber, "Event" : eventNumber}
)
return
def addSkippedFile(self, pfn, lfn):
"""
_addSkippedFile_
Add a skipped file record to this report
"""
self.skippedFiles.append(
{ "Pfn" : pfn, "Lfn" : lfn}
)
return
def addError(self, status, errType):
"""
_addError_
Add a new Error dictionary to this report, return it to be populated
"""
newError = {"ExitStatus" : status,
"Type" : errType,
"Description": ""}
self.errors.append(newError)
return newError
def save(self):
"""
_save_
Save the Framework Job Report by converting it into
an XML IMProv Object
"""
result = IMProvNode("FrameworkJobReport")
if self.name != None:
result.attrs['Name'] = self.name
if self.status != None:
result.attrs['Status'] = str(self.status)
if self.jobSpecId != None:
result.attrs['JobSpecID'] = self.jobSpecId
if self.workflowSpecId != None:
result.attrs['WorkflowSpecID'] = self.workflowSpecId
if self.jobType != None:
result.attrs['JobType'] = self.jobType
# //
# // Save ExitCode
#//
result.addNode(
IMProvNode("ExitCode",
None,
Value = str(self.exitCode)
)
)
# //
# // Save Site details
#//
for key, value in self.siteDetails.items():
siteDetail = IMProvNode("SiteDetail", None,
Parameter = key,
Value = str(value))
result.addNode(siteDetail)
# //
# // Save Files
#//
for fileInfo in self.files:
result.addNode(fileInfo.save())
# //
# // Save Input Files
#//
for fileInfo in self.inputFiles:
result.addNode(fileInfo.save())
# //
# // Save Skipped Events
#//
for skipped in self.skippedEvents:
result.addNode(IMProvNode("SkippedEvent", None,
Run = skipped['Run'],
Event = skipped['Event']))
# //
# // Save Skipped Files
#//
for skipped in self.skippedFiles:
result.addNode(IMProvNode("SkippedFile", None,
Pfn = skipped['Pfn'],
Lfn = skipped['Lfn']))
# //
# // Save Errors
#//
for error in self.errors:
result.addNode(
IMProvNode("FrameworkError", error['Description'],
ExitStatus = error['ExitStatus'],
Type = error['Type'])
)
# //
# // Save Timing Info
#//
timing = IMProvNode("TimingService")
result.addNode(timing)
for key, value in self.timing.items():
timing.addNode(IMProvNode(key, None, Value=str(value) ))
# //
# // Save Storage Statistics
#//
if self.storageStatistics != None:
result.addNode(
IMProvNode("StorageStatistics", self.storageStatistics))
genInfo = IMProvNode("GeneratorInfo")
result.addNode(genInfo)
for key, val in self.generatorInfo.items():
genInfo.addNode(IMProvNode("Data", None, Name = key,
Value = str(val)))
return result
def write(self, filename):
"""
_write_
Write the job report to an XML file
"""
handle = open(filename, 'w')
handle.write(self.save().makeDOMElement().toprettyxml())
handle.close()
return
def __str__(self):
"""strin representation of instance"""
return str(self.save())
#!/usr/bin/env python
"""
_MergeReports_
Given two FrameworkJobReport XML files, concatenate them into
a single file.
"""
import os
from IMProv.IMProvDoc import IMProvDoc
from FwkJobRep.ReportParser import readJobReport
def mergeReports(reportFile1, reportFile2):
"""
_mergeReports_
Load job reports from both files, and combine them into a
single file.
The output will be written to the first file provided.
(IE JobReports from reportFile2 will be added to reportFile1)
If reportFile1 does not exist, a new report will be created, containing
the contents of reportFile2.
If reportFile2 does not exist, then a RuntimeError is thrown.
"""
if not os.path.exists(reportFile1):
reports1 = []
else:
reports1 = readJobReport(reportFile1)
if not os.path.exists(reportFile2):
msg = "Report file to be merged does not exist:\n"
msg += reportFile2
raise RuntimeError, msg
reports2 = readJobReport(reportFile2)
reports1.extend(reports2)
output = IMProvDoc("JobReports")
for item in reports1:
output.addNode(item.save())
handle = open(reportFile1, 'w')
handle.write(output.makeDOMDocument().toprettyxml())
handle.close()
return
def updateReport(reportFile, newReportInstance):
"""
_updateReport_
Given a file containing several reports: reportFile,
find the report in there whose name matches the newReportInstance's
name and replace that report with the new Report instance.