Skip to content
Snippets Groups Projects
Commit 68f9ffe5 authored by Atlas-Software Librarian's avatar Atlas-Software Librarian Committed by Graeme Stewart
Browse files

'CMakeLists.txt' (TestPolicy-00-02-05)

	* Tagging TestPolicy-00-02-05
	* For some reason, the colon and hyphen chars were removed
	from the list of legal chars for userJobIds. Put them back.

2015-12-07  Atlas-Validation Runtimetester  <rtt@lxplus0068.cern.ch>
	* Tagging TestPolicy-00-02-04
	* Ran pep8 on the validateXML.py and test suite files.

2015-09-04  Atlas-Validation Runtimetester  <rtt@lxplus0009.cern.ch>
	* Refactored validateXML.py

	* Added a test suite.
	* Warning issued if atn/TEST/{author, mailto} deprecated tags found.
parent f35a1376
No related branches found
No related tags found
No related merge requests found
################################################################################
# Package: TestPolicy
################################################################################
# Declare the package name:
atlas_subdir( TestPolicy )
# Install files from the package:
atlas_install_headers( TestPolicy )
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
import os
import unittest
import validateXML
from functools import partial
class DTDTests(unittest.TestCase):
def setUp(self):
dtdname = 'unifiedTestConfiguration.dtd'
urlbase = 'http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/'
urlaltbase = 'https://atlas-rtt.cern.ch/prod/alternateDTD/'
self.dtd = os.path.join(urlbase, dtdname)
self.altdtd = os.path.join(urlaltbase, dtdname)
self.baddtd = os.path.join(urlbase, 'inexistant.dtd')
self.dtds = [self.dtd, self.altdtd]
def test_dtd_unreachable(self):
with validateXML.DTD(self.dtd, timeout=0) as dtd: # guarantee timeout
self.assertTrue(dtd.unreachable)
def test_dtd_inexistant(self):
with validateXML.DTD(self.baddtd, timeout=3) as dtd:
self.assertTrue(dtd.inexistant)
def test_retrieve_dtd_contents(self):
with validateXML.DTD(self.dtd, timeout=15) as dtd:
self.assertTrue(dtd.contents)
def test_use_fallback_dtd(self):
dtds = [self.baddtd, self.altdtd]
contents, url = validateXML.getFirstAvailableDTDContents(*dtds)
self.assertTrue(contents)
self.assertEqual(url, self.dtds[1])
def test_no_dtds_available(self):
bad = [self.baddtd, self.baddtd]
contents, url = validateXML.getFirstAvailableDTDContents(*bad)
self.assertEqual(contents, None)
self.assertEqual(url, None)
def test_contents_empty_for_bad_dtd(self):
contents = validateXML.getDTDContent(self.baddtd)
self.assertEqual(contents, None)
class XMLValidationTests(unittest.TestCase):
def setUp(self):
self.xmlfile = 'test.xml'
def tearDown(self):
try:
os.remove(self.xmlfile)
except OSError:
pass
def _write(self, xmlcontent):
with open(self.xmlfile, 'w') as f:
f.write(xmlcontent)
def test_xml_path_exists(self):
xmlsnippet = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd">
<unifiedTestConfiguration>
<atn>
<TEST type="script">
<author>_deprecated_</author>
<mailto>_deprecated_</mailto>
</TEST>
</atn>
</unifiedTestConfiguration>
"""
self._write(xmlsnippet)
with validateXML.XMLFile(self.xmlfile) as xmlfile:
el = xmlfile.docEl
pathExists = partial(validateXML.xmlPathExists, el)
self.assertTrue(pathExists('atn/TEST/mailto'))
self.assertFalse(pathExists('atn/TEST/inexistant'))
def test_get_tag(self):
xmlsnippet = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd">
<unifiedTestConfiguration>
<atn>
<TEST type="script">
<author>first</author>
<author>second</author>
</TEST>
</atn>
</unifiedTestConfiguration>
"""
self._write(xmlsnippet)
with validateXML.XMLFile(self.xmlfile) as xmlfile:
el = xmlfile.docEl
tag = validateXML.getTag(el, 'author')
badtag = validateXML.getTag(el, 'inexistant')
self.assertTrue(tag.firstChild.nodeValue == 'first')
self.assertTrue(badtag is None)
def test_deprecated_tags(self):
deprecatedATNtags = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd">
<unifiedTestConfiguration>
<atn>
<TEST type="script">
<author>_deprecated_</author>
<mailto>_deprecated_</mailto>
</TEST>
</atn>
</unifiedTestConfiguration>
"""
self._write(deprecatedATNtags)
with validateXML.XMLFile(self.xmlfile) as xmlfile:
self.assertTrue(xmlfile.isValid())
self.assertTrue(xmlfile.checkForDeprecatedTags())
def test_non_unique_jobids(self):
repeatedUserJobIds = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd">
<unifiedTestConfiguration>
<rtt xmlns="http://www.hep.ucl.ac.uk/atlas/AtlasTesting/rtt">
<rttContactPerson>contact_person</rttContactPerson>
<mailto>contact@example.com</mailto>
<jobList>
<jobTransform userJobId="theJobID">
<jobTransformJobName>name</jobTransformJobName>
<jobTransformCmd>ls</jobTransformCmd>
<group>TheJobGroup</group>
</jobTransform>
<jobTransform userJobId="theJobID">
<jobTransformJobName>name</jobTransformJobName>
<jobTransformCmd>ls</jobTransformCmd>
<group>TheJobGroup</group>
</jobTransform>
</jobList>
<jobGroups>
<jobGroup name="TheJobGroup" parent="Transform">
<keepFilePattern>*.out</keepFilePattern>
<auxFilePattern>*.py</auxFilePattern>
</jobGroup>
</jobGroups>
</rtt>
</unifiedTestConfiguration>
"""
self._write(repeatedUserJobIds)
with validateXML.XMLFile(self.xmlfile) as xmlfile:
self.assertFalse(validateXML.hasLegalUserJobIds(xmlfile.docEl))
def test_bad_chars_in_jobid(self):
badUserJobIds = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd">
<unifiedTestConfiguration>
<rtt xmlns="http://www.hep.ucl.ac.uk/atlas/AtlasTesting/rtt">
<rttContactPerson>contact_person</rttContactPerson>
<mailto>contact@example.com</mailto>
<jobList>
<jobTransform userJobId="the JobID">
<jobTransformJobName>name</jobTransformJobName>
<jobTransformCmd>ls</jobTransformCmd>
<group>TheJobGroup</group>
</jobTransform>
<jobTransform userJobId="theJobID">
<jobTransformJobName>name</jobTransformJobName>
<jobTransformCmd>ls</jobTransformCmd>
<group>TheJobGroup</group>
</jobTransform>
<jobTransform userJobId="//theJobID">
<jobTransformJobName>name</jobTransformJobName>
<jobTransformCmd>ls</jobTransformCmd>
<group>TheJobGroup</group>
</jobTransform>
</jobList>
<jobGroups>
<jobGroup name="TheJobGroup" parent="Transform">
<keepFilePattern>*.out</keepFilePattern>
<auxFilePattern>*.py</auxFilePattern>
</jobGroup>
</jobGroups>
</rtt>
</unifiedTestConfiguration>
"""
self._write(badUserJobIds)
with validateXML.XMLFile(self.xmlfile) as xmlfile:
self.assertFalse(validateXML.hasLegalUserJobIds(xmlfile.docEl))
ids = validateXML.getJobIds(xmlfile.docEl)
bad = [jobid for jobid in ids
if validateXML._idHasIllegalChars(jobid)]
self.assertTrue(len(bad) == 2)
def test_valid_xml(self):
validXML = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd">
<unifiedTestConfiguration>
<rtt xmlns="http://www.hep.ucl.ac.uk/atlas/AtlasTesting/rtt">
<rttContactPerson>contact_person</rttContactPerson>
<mailto>contact@example.com</mailto>
<jobList>
<jobTransform userJobId="theJobID">
<jobTransformJobName>name</jobTransformJobName>
<jobTransformCmd>ls</jobTransformCmd>
<group>TheJobGroup</group>
</jobTransform>
</jobList>
<jobGroups>
<jobGroup name="TheJobGroup" parent="Transform">
<keepFilePattern>*.out</keepFilePattern>
<auxFilePattern>*.py</auxFilePattern>
</jobGroup>
</jobGroups>
</rtt>
</unifiedTestConfiguration>
"""
self._write(validXML)
with validateXML.XMLFile(self.xmlfile) as xmlfile:
self.assertTrue(xmlfile.isValid())
if __name__ == '__main__':
unittest.main()
......@@ -14,239 +14,268 @@ If the referenced DTD is unavailable (site down) or
inexistant (404), a list of alternatives is tried.
"""
import os.path
import os
import re
import sys
import string
import traceback
import StringIO
import urllib2 as URL
import socket
from collections import defaultdict
from functools import partial
from xml.parsers.xmlproc import xmlproc, xmlval, xmldtd
from xml.dom.minidom import parse
def exc2string():
llist = sys.exc_info()
errmsg = str(llist[0])
errmsg += str(llist[1])
errmsg += ' '.join(traceback.format_tb(llist[2]))
return errmsg
altDTDs = [('https://atlas-rtt.cern.ch/prod/'
'alternateDTD/unifiedTestConfiguration.dtd')]
class Logger(object):
def __init__(self):
self.levels = ['info', 'error', 'fatal']
self.log = []
# -------------------------------------------------------------------
# --- LOGGING ---
# -------------------------------------------------------------------
def log(level, msg):
if msg.strip():
print('[{0}] {1}'.format(level.upper(), msg))
def __getattr__(self, n):
return lambda x: self.log.append((n, x)) if n in self.levels else None
def __str__(self):
o = [line for level, line in self.log]
return '' if not o else '%s\n' % ('\n'.join(o))
def devnull(msg):
pass
class Validator(object):
error = partial(log, 'ERROR')
warning = partial(log, 'WARNING')
info = partial(log, 'INFO')
debug = devnull
dtdname = 'unifiedTestConfiguration.dtd'
possibleURLS = ['https://atlas-rtt.cern.ch/prod/alternateDTD/']
alternateDTDs = [os.path.join(pu, dtdname) for pu in possibleURLS]
__altDTDs = {}
[__altDTDs.setdefault(dtdname, []).append(aDTD) for aDTD in alternateDTDs]
clargs = sys.argv[1:]
if '--verbose' in clargs:
debug = partial(log, 'DEBUG')
def __init__(self, pathToXMLfile):
self.file = pathToXMLfile
self.log = Logger()
# -------------------------------------------------------------------
# --- HELPERS ---
# -------------------------------------------------------------------
def validate(self):
isValid = self._do_validation()
self.cleanup()
return isValid
def _parseFile(self):
try:
self._dom = parse(self.file)
return True
except Exception, inst:
msg = 'Cannot parse:\n%s' % str(inst)
self.log.fatal(msg)
return False
except:
msg = 'Cannot parse:\n%s' % exc2string()
self.log.fatal(msg)
return False
def _hasExternalDTD(self):
return self._getDTDFromXMLFile() is not None
def _do_validation(self):
if self._parseFile():
if not self._hasExternalDTD():
m = 'No external DTD reference found in XML file, '
m += 'consider file valid.'
self.log.info(m)
return True
dtdContent = self._getDTDContent()
if dtdContent:
isXMLValid = self._validate(dtdContent)
if isXMLValid:
if self.ensureLegalUserJobIds():
return True
return False
def parseXML(xmlfile):
"""Return None if @xmlfile is unparseable,
else the dom representation."""
try:
return parse(xmlfile)
except Exception, inst:
msg = 'Unparseable:\n{0}'.format(str(inst))
except:
tb, tbType, stacktrace = sys.exc_info()
pt = traceback.format_tb(stacktrace)
msg = 'Unparseable:\n{0}\n{1}\n{2}'.format(tb, tbType, pt)
def cleanup(self):
try:
self._dom.unlink()
except:
pass
error(msg)
try:
del self._dom
except:
pass
def grabDTD(self, dtdURLs):
# Test if we can get DTD, if so read it
nDTDs = len(dtdURLs)
for i, dtdURL in enumerate(dtdURLs, start=1):
dtd = DTD(dtdURL)
contents = dtd.contents
if contents:
if i > 1: # we're using a fallback dtd
m = '%s: using fallback DTD' % dtdURL
self.log.info(m)
return contents
m = dtdURL
if dtd.inexistant:
m += ': inexistant'
self.log.error(m)
elif dtd.unreachable:
m += ': unreachable, timeout'
self.log.error(m)
self.log.fatal('No DTDs available/reachable, will stop')
return ''
def ensureLegalUserJobIds(self):
"""Check the package userJobIds.
These must:
1. be unique within a package XML file.
2. match the regex for acceptable names.
"""
def getAllUserJobIds(el):
ids = []
for job in ['athena', 'jobTransform']:
els = docEl.getElementsByTagName(job)
ids.extend([el.getAttribute('userJobId') for el in els])
return ids
def reportDuplicates(jids, messages):
dups = {}
[dups.setdefault(jid, []).append(jid) for jid in jids]
msgs = []
[msgs.append('userJobId "%s" used %d times' % (k, len(v))) for
k, v in dups.items() if len(v) > 1]
messages.extend(msgs)
return not msgs
def reportIllegal(userJobIds, messages):
acceptableFileName = r'^[a-zA-Z0-9_\.]+'
c_acceptableFileName = re.compile(acceptableFileName)
badIds = [jid for jid in userJobIds if
not c_acceptableFileName.search(jid)]
m = 'userjobID "%s" '
m += 'does not start with a letter, digit, underscore or dot'
messages.extend([m % jid for jid in badIds])
return not badIds
# Start of the method
rc = True
docEl = self._dom.documentElement
messages = []
jobids = getAllUserJobIds(docEl)
rc = rc and reportDuplicates(jobids, messages)
rc = rc and reportIllegal(jobids, messages)
if messages:
messages = '\n'.join(messages)
self.log.error(messages)
return bool(rc)
def _getDTDFromXMLFile(self):
doctype = self._dom.doctype
return doctype.systemId if doctype else None
def _getDTDContent(self):
"""Read DTD into a string. DTD location is given in
the configuration file. Use this, or backup DTDs if
unable to use the user-suggested DTD."""
# get system DTD path from XML file
xmlFileDTD = self._getDTDFromXMLFile()
dtdname = os.path.basename(xmlFileDTD)
dtds = [xmlFileDTD]
altDTDs = Validator.__altDTDs.get(dtdname, [])
dtds.extend(altDTDs)
return self.grabDTD(dtds) # grab from 1st available
def _validate(self, dtdContent):
"""Use read-in DTD to validate config file."""
# now load this string dtd
dtd = xmldtd.load_dtd_string(dtdContent)
parser = xmlproc.XMLProcessor()
parser.set_application(xmlval.ValidatingApp(dtd, parser))
parser.dtd = dtd
parser.ent = dtd
def swap(o1, o2):
return (o2, o1)
newOut = StringIO.StringIO()
newOut, sys.stdout = swap(newOut, sys.stdout)
errmsg = ''
def getTag(el, tagname):
"""If available, return the first element @tagname
below parent @el."""
tags = el.getElementsByTagName(tagname)
return tags[0] if tags else None
try:
parser.parse_resource(self.file)
except Exception, e:
newOut, sys.stdout = swap(newOut, sys.stdout)
errmsg = '%s\n%s' % (newOut.getvalue().strip(), str(e))
except: # Exceptions not deriving from Exception.....
newOut, sys.stdout = swap(newOut, sys.stdout)
errmsg = newOut.getvalue().strip()
else:
newOut, sys.stdout = swap(newOut, sys.stdout)
if errmsg:
self.log.error(errmsg)
def xmlPathExists(el, path):
"""Does the xml @path element exist below parent @el?"""
current = el
for tag in path.split('/'):
current = getTag(current, tag)
if not current:
break
else:
return True
return False
return not errmsg
def isValidAgainstDTD(xmlfile, dtdContent):
"""Validate the XML file at @xmfile against @dtdContent."""
if not dtdContent:
return False
# now load this string dtd
dtd = xmldtd.load_dtd_string(dtdContent)
parser = xmlproc.XMLProcessor()
parser.set_application(xmlval.ValidatingApp(dtd, parser))
parser.dtd = dtd
parser.ent = dtd
class DTD:
def swap(o1, o2):
return (o2, o1)
TIMEOUT = 15 # give up trying to reach the URL after this many seconds
newOut = StringIO.StringIO()
newOut, sys.stdout = swap(newOut, sys.stdout)
errmsg = ''
try:
parser.parse_resource(xmlfile)
except Exception, e:
newOut, sys.stdout = swap(newOut, sys.stdout)
errmsg = '%s\n%s' % (newOut.getvalue().strip(), str(e))
except:
newOut, sys.stdout = swap(newOut, sys.stdout)
errmsg = newOut.getvalue().strip()
else:
newOut, sys.stdout = swap(newOut, sys.stdout)
def __init__(self, dtdURL):
error(errmsg)
return not errmsg
def getFirstAvailableDTDContents(*dtdURLs):
"""Return the first available dtd contents from
the @dtdURLs passed in. If no DTDs exist or are
reachable, return None."""
for dtdURL in dtdURLs:
contents = getDTDContent(dtdURL)
if contents:
debug('Validating against {0}'.format(dtdURL))
break
else:
error('No DTDs available, cannot validate')
contents = dtdURL = None
return (contents, dtdURL)
def getDTDContent(dtdURL):
"""Fetch the contents of the DTD at @dtdURL, returning
None if the URL either does not exist or is not available.
Timeout to prevent unreachable URLs from blocking."""
with DTD(dtdURL, timeout=1) as dtd:
if dtd.inexistant:
warning('{0}: inexistant'.format(dtdURL))
elif dtd.unreachable:
warning('{0}: unreachable, timeout'.format(dtdURL))
return dtd.contents
def getUserJobIds(jobEls):
return [el.getAttribute('userJobId') for el in jobEls]
def _idOccursMoreThanOnce(jobID, count):
if count > 1:
error('JobID {0} occurs {1} times'.format(jobID, count))
return count > 1
def _idHasIllegalChars(jobID):
allowed = string.digits + string.letters + '._:-'
for ch in jobID.lower():
if ch not in allowed:
m = 'JobID "{0}" '.format(jobID)
m += 'contains the illegal character ("{0}"). '.format(ch)
m += 'Legal chars are letters, digits, '
m += ':(colon), -(hyphen), .(dot) and _(underscore).'
error(m)
return True
return False
def getJobIds(docEl):
athenaTags = docEl.getElementsByTagName('athena')
athenaIds = getUserJobIds(athenaTags)
transformTags = docEl.getElementsByTagName('jobTransform')
transformIds = getUserJobIds(transformTags)
return athenaIds + transformIds
def hasLegalUserJobIds(docEl):
"""Check the package userJobIds.
These must:
1. be unique within a package XML file.
2. match the regex for acceptable names.
"""
allIDs = getJobIds(docEl)
# count how many times each job id occurs
dd = defaultdict(int)
for jobID in allIDs:
dd[jobID] += 1
legal = True
for jobID, count in dd.items():
if _idOccursMoreThanOnce(jobID, count) or _idHasIllegalChars(jobID):
legal = False
return legal
# -------------------------------------------------------------------
class XMLFile(object):
def __init__(self, xmlfile):
self.xmlfile = xmlfile
self.dtdURL = None
self.docEl = None
self.parsedOK = False
def isValid(self):
return self.parsedOK and self._do_validate()
def __enter__(self):
dom = parseXML(self.xmlfile)
if dom:
self.parsedOK = True
self.docEl = dom.documentElement
self.checkForDeprecatedTags()
doctype = dom.doctype
self.dtdURL = doctype.systemId if doctype else None
return self
def __exit__(self, etype, evalue, stacktrace):
if etype is not None:
error('XMLFile raised a {0} exception'.format(etype))
return False # will cause re-raise
def checkForDeprecatedTags(self):
found = False
for path in ('atn/author', 'atn/mailto'):
if xmlPathExists(self.docEl, path):
warning('{0} tag is deprecated'.format(path))
found = True
return found
def _do_validate(self):
if not self.dtdURL:
debug('No external DTD reference found, considering file valid.')
return True
content, dtdURL = getFirstAvailableDTDContents(self.dtdURL, *altDTDs)
isValid = isValidAgainstDTD(self.xmlfile, content)
hasLegalIds = hasLegalUserJobIds(self.docEl)
return isValid and hasLegalIds
# -------------------------------------------------------------------
class DTD(object):
def __init__(self, dtdURL, timeout):
self.url = dtdURL
self.timeout = timeout # seconds before we should give up
self.inexistant = False
self.unreachable = False
self.contents = None
def __enter__(self):
resource = self._openDTD()
if resource:
self.contents = resource.read()
resource.close()
return self
def __exit__(self, etype, evalue, stacktrace):
if etype is not None:
error('DTD() raised a {0} exception'.format(etype))
return False # will cause re-raise
def exists(self):
return not self.inexistant
......@@ -258,7 +287,7 @@ class DTD:
# prevents unreachable URLs from hanging the request
resource = None
try:
resource = URL.urlopen(self.url, timeout=DTD.TIMEOUT)
resource = URL.urlopen(self.url, timeout=self.timeout)
except URL.HTTPError as e:
if e.code == 404:
self.inexistant = True
......@@ -267,25 +296,51 @@ class DTD:
return resource
# -------------------------------------------------------------------
if __name__ == '__main__':
import getopt
import os
xmlfiles = sys.argv[1:]
def validate(*xmlfiles):
if not xmlfiles:
print 'Please provide paths to one or more XML files.'
error('Please provide paths to one or more XML files.')
sys.exit(1)
exitcode = 0
for xmlfile in xmlfiles:
if not os.path.exists(xmlfile):
print 'Inexistant XML file: %s' % xmlfile
warning('{0}: inexistant'.format(xmlfile))
continue
v = Validator(xmlfile)
isValid = v.validate()
print '%s[%s] %s' % (str(v.log),
'OK' if isValid else 'FAIL',
xmlfile)
with XMLFile(xmlfile) as xf:
if xf.isValid():
txt = 'OK -'
else:
txt = 'FAIL -'
exitcode = 1
info('{0} {1}'.format(txt, xmlfile))
sys.exit(0)
return exitcode
def _getargs():
args = sys.argv[1:]
try:
args.remove('--verbose')
except ValueError:
pass
return args
def main():
args = _getargs()
sys.exit(validate(*args))
if __name__ == '__main__':
main()
else:
# just importing from test suite, switch off logging
error = devnull
warning = devnull
info = devnull
debug = devnull
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment