From 68f9ffe5b8aa7d9ad9affab10b77018b057d7751 Mon Sep 17 00:00:00 2001 From: Atlas-Software Librarian <Atlas-Software.Librarian@cern.ch> Date: Fri, 8 Apr 2016 16:32:13 +0200 Subject: [PATCH] 'CMakeLists.txt' (TestPolicy-00-02-05) * Tagging TestPolicy-00-02-05 * For some reason, the colon and hyphen chars were removed from the list of legal chars for userJobIds. Put them back. 2015-12-07 Atlas-Validation Runtimetester <rtt@lxplus0068.cern.ch> * Tagging TestPolicy-00-02-04 * Ran pep8 on the validateXML.py and test suite files. 2015-09-04 Atlas-Validation Runtimetester <rtt@lxplus0009.cern.ch> * Refactored validateXML.py * Added a test suite. * Warning issued if atn/TEST/{author, mailto} deprecated tags found. --- TestPolicy/CMakeLists.txt | 10 + TestPolicy/python/test_validateXML.py | 221 ++++++++++++ TestPolicy/python/validateXML.py | 469 ++++++++++++++------------ 3 files changed, 493 insertions(+), 207 deletions(-) create mode 100644 TestPolicy/CMakeLists.txt create mode 100644 TestPolicy/python/test_validateXML.py diff --git a/TestPolicy/CMakeLists.txt b/TestPolicy/CMakeLists.txt new file mode 100644 index 00000000000..336c995f8c5 --- /dev/null +++ b/TestPolicy/CMakeLists.txt @@ -0,0 +1,10 @@ +################################################################################ +# Package: TestPolicy +################################################################################ + +# Declare the package name: +atlas_subdir( TestPolicy ) + +# Install files from the package: +atlas_install_headers( TestPolicy ) + diff --git a/TestPolicy/python/test_validateXML.py b/TestPolicy/python/test_validateXML.py new file mode 100644 index 00000000000..f2cc8ac57bf --- /dev/null +++ b/TestPolicy/python/test_validateXML.py @@ -0,0 +1,221 @@ +# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration + +import os +import unittest +import validateXML +from functools import partial + + +class DTDTests(unittest.TestCase): + def setUp(self): + dtdname = 'unifiedTestConfiguration.dtd' + urlbase = 'http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/' + urlaltbase = 'https://atlas-rtt.cern.ch/prod/alternateDTD/' + self.dtd = os.path.join(urlbase, dtdname) + self.altdtd = os.path.join(urlaltbase, dtdname) + self.baddtd = os.path.join(urlbase, 'inexistant.dtd') + self.dtds = [self.dtd, self.altdtd] + + def test_dtd_unreachable(self): + with validateXML.DTD(self.dtd, timeout=0) as dtd: # guarantee timeout + self.assertTrue(dtd.unreachable) + + def test_dtd_inexistant(self): + with validateXML.DTD(self.baddtd, timeout=3) as dtd: + self.assertTrue(dtd.inexistant) + + def test_retrieve_dtd_contents(self): + with validateXML.DTD(self.dtd, timeout=15) as dtd: + self.assertTrue(dtd.contents) + + def test_use_fallback_dtd(self): + dtds = [self.baddtd, self.altdtd] + contents, url = validateXML.getFirstAvailableDTDContents(*dtds) + self.assertTrue(contents) + self.assertEqual(url, self.dtds[1]) + + def test_no_dtds_available(self): + bad = [self.baddtd, self.baddtd] + contents, url = validateXML.getFirstAvailableDTDContents(*bad) + self.assertEqual(contents, None) + self.assertEqual(url, None) + + def test_contents_empty_for_bad_dtd(self): + contents = validateXML.getDTDContent(self.baddtd) + self.assertEqual(contents, None) + + +class XMLValidationTests(unittest.TestCase): + def setUp(self): + self.xmlfile = 'test.xml' + + def tearDown(self): + try: + os.remove(self.xmlfile) + except OSError: + pass + + def _write(self, xmlcontent): + with open(self.xmlfile, 'w') as f: + f.write(xmlcontent) + + def test_xml_path_exists(self): + xmlsnippet = """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd"> + <unifiedTestConfiguration> + <atn> + <TEST type="script"> + <author>_deprecated_</author> + <mailto>_deprecated_</mailto> + </TEST> + </atn> + </unifiedTestConfiguration> + """ + self._write(xmlsnippet) + with validateXML.XMLFile(self.xmlfile) as xmlfile: + el = xmlfile.docEl + pathExists = partial(validateXML.xmlPathExists, el) + self.assertTrue(pathExists('atn/TEST/mailto')) + self.assertFalse(pathExists('atn/TEST/inexistant')) + + def test_get_tag(self): + xmlsnippet = """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd"> + <unifiedTestConfiguration> + <atn> + <TEST type="script"> + <author>first</author> + <author>second</author> + </TEST> + </atn> + </unifiedTestConfiguration> + """ + self._write(xmlsnippet) + with validateXML.XMLFile(self.xmlfile) as xmlfile: + el = xmlfile.docEl + tag = validateXML.getTag(el, 'author') + badtag = validateXML.getTag(el, 'inexistant') + self.assertTrue(tag.firstChild.nodeValue == 'first') + self.assertTrue(badtag is None) + + def test_deprecated_tags(self): + deprecatedATNtags = """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd"> + <unifiedTestConfiguration> + <atn> + <TEST type="script"> + <author>_deprecated_</author> + <mailto>_deprecated_</mailto> + </TEST> + </atn> + </unifiedTestConfiguration> + """ + self._write(deprecatedATNtags) + with validateXML.XMLFile(self.xmlfile) as xmlfile: + self.assertTrue(xmlfile.isValid()) + self.assertTrue(xmlfile.checkForDeprecatedTags()) + + def test_non_unique_jobids(self): + repeatedUserJobIds = """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd"> + <unifiedTestConfiguration> + <rtt xmlns="http://www.hep.ucl.ac.uk/atlas/AtlasTesting/rtt"> + <rttContactPerson>contact_person</rttContactPerson> + <mailto>contact@example.com</mailto> + <jobList> + <jobTransform userJobId="theJobID"> + <jobTransformJobName>name</jobTransformJobName> + <jobTransformCmd>ls</jobTransformCmd> + <group>TheJobGroup</group> + </jobTransform> + <jobTransform userJobId="theJobID"> + <jobTransformJobName>name</jobTransformJobName> + <jobTransformCmd>ls</jobTransformCmd> + <group>TheJobGroup</group> + </jobTransform> + </jobList> + <jobGroups> + <jobGroup name="TheJobGroup" parent="Transform"> + <keepFilePattern>*.out</keepFilePattern> + <auxFilePattern>*.py</auxFilePattern> + </jobGroup> + </jobGroups> + </rtt> + </unifiedTestConfiguration> + """ + self._write(repeatedUserJobIds) + with validateXML.XMLFile(self.xmlfile) as xmlfile: + self.assertFalse(validateXML.hasLegalUserJobIds(xmlfile.docEl)) + + def test_bad_chars_in_jobid(self): + badUserJobIds = """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd"> + <unifiedTestConfiguration> + <rtt xmlns="http://www.hep.ucl.ac.uk/atlas/AtlasTesting/rtt"> + <rttContactPerson>contact_person</rttContactPerson> + <mailto>contact@example.com</mailto> + <jobList> + <jobTransform userJobId="the JobID"> + <jobTransformJobName>name</jobTransformJobName> + <jobTransformCmd>ls</jobTransformCmd> + <group>TheJobGroup</group> + </jobTransform> + <jobTransform userJobId="theJobID"> + <jobTransformJobName>name</jobTransformJobName> + <jobTransformCmd>ls</jobTransformCmd> + <group>TheJobGroup</group> + </jobTransform> + <jobTransform userJobId="//theJobID"> + <jobTransformJobName>name</jobTransformJobName> + <jobTransformCmd>ls</jobTransformCmd> + <group>TheJobGroup</group> + </jobTransform> + </jobList> + <jobGroups> + <jobGroup name="TheJobGroup" parent="Transform"> + <keepFilePattern>*.out</keepFilePattern> + <auxFilePattern>*.py</auxFilePattern> + </jobGroup> + </jobGroups> + </rtt> + </unifiedTestConfiguration> + """ + self._write(badUserJobIds) + with validateXML.XMLFile(self.xmlfile) as xmlfile: + self.assertFalse(validateXML.hasLegalUserJobIds(xmlfile.docEl)) + + ids = validateXML.getJobIds(xmlfile.docEl) + bad = [jobid for jobid in ids + if validateXML._idHasIllegalChars(jobid)] + self.assertTrue(len(bad) == 2) + + def test_valid_xml(self): + validXML = """<?xml version="1.0" encoding="UTF-8"?> + <!DOCTYPE unifiedTestConfiguration SYSTEM "http://www.hep.ucl.ac.uk/atlas/AtlasTesting/DTD/unifiedTestConfiguration.dtd"> + <unifiedTestConfiguration> + <rtt xmlns="http://www.hep.ucl.ac.uk/atlas/AtlasTesting/rtt"> + <rttContactPerson>contact_person</rttContactPerson> + <mailto>contact@example.com</mailto> + <jobList> + <jobTransform userJobId="theJobID"> + <jobTransformJobName>name</jobTransformJobName> + <jobTransformCmd>ls</jobTransformCmd> + <group>TheJobGroup</group> + </jobTransform> + </jobList> + <jobGroups> + <jobGroup name="TheJobGroup" parent="Transform"> + <keepFilePattern>*.out</keepFilePattern> + <auxFilePattern>*.py</auxFilePattern> + </jobGroup> + </jobGroups> + </rtt> + </unifiedTestConfiguration> + """ + self._write(validXML) + with validateXML.XMLFile(self.xmlfile) as xmlfile: + self.assertTrue(xmlfile.isValid()) + + +if __name__ == '__main__': + unittest.main() diff --git a/TestPolicy/python/validateXML.py b/TestPolicy/python/validateXML.py index 5542bb2d537..cd2d80da060 100755 --- a/TestPolicy/python/validateXML.py +++ b/TestPolicy/python/validateXML.py @@ -14,239 +14,268 @@ If the referenced DTD is unavailable (site down) or inexistant (404), a list of alternatives is tried. """ - -import os.path +import os import re import sys +import string import traceback import StringIO import urllib2 as URL import socket +from collections import defaultdict +from functools import partial from xml.parsers.xmlproc import xmlproc, xmlval, xmldtd from xml.dom.minidom import parse -def exc2string(): - llist = sys.exc_info() - errmsg = str(llist[0]) - errmsg += str(llist[1]) - errmsg += ' '.join(traceback.format_tb(llist[2])) - return errmsg +altDTDs = [('https://atlas-rtt.cern.ch/prod/' + 'alternateDTD/unifiedTestConfiguration.dtd')] -class Logger(object): - def __init__(self): - self.levels = ['info', 'error', 'fatal'] - self.log = [] +# ------------------------------------------------------------------- +# --- LOGGING --- +# ------------------------------------------------------------------- +def log(level, msg): + if msg.strip(): + print('[{0}] {1}'.format(level.upper(), msg)) - def __getattr__(self, n): - return lambda x: self.log.append((n, x)) if n in self.levels else None - def __str__(self): - o = [line for level, line in self.log] - return '' if not o else '%s\n' % ('\n'.join(o)) +def devnull(msg): + pass -class Validator(object): +error = partial(log, 'ERROR') +warning = partial(log, 'WARNING') +info = partial(log, 'INFO') +debug = devnull - dtdname = 'unifiedTestConfiguration.dtd' - possibleURLS = ['https://atlas-rtt.cern.ch/prod/alternateDTD/'] - alternateDTDs = [os.path.join(pu, dtdname) for pu in possibleURLS] - __altDTDs = {} - [__altDTDs.setdefault(dtdname, []).append(aDTD) for aDTD in alternateDTDs] +clargs = sys.argv[1:] +if '--verbose' in clargs: + debug = partial(log, 'DEBUG') - def __init__(self, pathToXMLfile): - self.file = pathToXMLfile - self.log = Logger() +# ------------------------------------------------------------------- +# --- HELPERS --- +# ------------------------------------------------------------------- - def validate(self): - isValid = self._do_validation() - self.cleanup() - return isValid - def _parseFile(self): - try: - self._dom = parse(self.file) - return True - except Exception, inst: - msg = 'Cannot parse:\n%s' % str(inst) - self.log.fatal(msg) - return False - except: - msg = 'Cannot parse:\n%s' % exc2string() - self.log.fatal(msg) - return False - - def _hasExternalDTD(self): - return self._getDTDFromXMLFile() is not None - - def _do_validation(self): - if self._parseFile(): - if not self._hasExternalDTD(): - m = 'No external DTD reference found in XML file, ' - m += 'consider file valid.' - self.log.info(m) - return True - - dtdContent = self._getDTDContent() - if dtdContent: - isXMLValid = self._validate(dtdContent) - if isXMLValid: - if self.ensureLegalUserJobIds(): - return True - return False +def parseXML(xmlfile): + """Return None if @xmlfile is unparseable, + else the dom representation.""" + try: + return parse(xmlfile) + except Exception, inst: + msg = 'Unparseable:\n{0}'.format(str(inst)) + except: + tb, tbType, stacktrace = sys.exc_info() + pt = traceback.format_tb(stacktrace) + msg = 'Unparseable:\n{0}\n{1}\n{2}'.format(tb, tbType, pt) - def cleanup(self): - try: - self._dom.unlink() - except: - pass + error(msg) - try: - del self._dom - except: - pass - - def grabDTD(self, dtdURLs): - # Test if we can get DTD, if so read it - nDTDs = len(dtdURLs) - for i, dtdURL in enumerate(dtdURLs, start=1): - dtd = DTD(dtdURL) - contents = dtd.contents - if contents: - if i > 1: # we're using a fallback dtd - m = '%s: using fallback DTD' % dtdURL - self.log.info(m) - return contents - - m = dtdURL - if dtd.inexistant: - m += ': inexistant' - self.log.error(m) - elif dtd.unreachable: - m += ': unreachable, timeout' - self.log.error(m) - - self.log.fatal('No DTDs available/reachable, will stop') - return '' - - def ensureLegalUserJobIds(self): - """Check the package userJobIds. - These must: - 1. be unique within a package XML file. - 2. match the regex for acceptable names. - """ - - def getAllUserJobIds(el): - ids = [] - for job in ['athena', 'jobTransform']: - els = docEl.getElementsByTagName(job) - ids.extend([el.getAttribute('userJobId') for el in els]) - return ids - - def reportDuplicates(jids, messages): - dups = {} - [dups.setdefault(jid, []).append(jid) for jid in jids] - - msgs = [] - [msgs.append('userJobId "%s" used %d times' % (k, len(v))) for - k, v in dups.items() if len(v) > 1] - messages.extend(msgs) - return not msgs - - def reportIllegal(userJobIds, messages): - acceptableFileName = r'^[a-zA-Z0-9_\.]+' - c_acceptableFileName = re.compile(acceptableFileName) - - badIds = [jid for jid in userJobIds if - not c_acceptableFileName.search(jid)] - - m = 'userjobID "%s" ' - m += 'does not start with a letter, digit, underscore or dot' - messages.extend([m % jid for jid in badIds]) - - return not badIds - - # Start of the method - rc = True - docEl = self._dom.documentElement - messages = [] - jobids = getAllUserJobIds(docEl) - rc = rc and reportDuplicates(jobids, messages) - rc = rc and reportIllegal(jobids, messages) - - if messages: - messages = '\n'.join(messages) - self.log.error(messages) - - return bool(rc) - - def _getDTDFromXMLFile(self): - doctype = self._dom.doctype - return doctype.systemId if doctype else None - - def _getDTDContent(self): - """Read DTD into a string. DTD location is given in - the configuration file. Use this, or backup DTDs if - unable to use the user-suggested DTD.""" - - # get system DTD path from XML file - xmlFileDTD = self._getDTDFromXMLFile() - dtdname = os.path.basename(xmlFileDTD) - dtds = [xmlFileDTD] - altDTDs = Validator.__altDTDs.get(dtdname, []) - dtds.extend(altDTDs) - return self.grabDTD(dtds) # grab from 1st available - - def _validate(self, dtdContent): - """Use read-in DTD to validate config file.""" - - # now load this string dtd - dtd = xmldtd.load_dtd_string(dtdContent) - parser = xmlproc.XMLProcessor() - parser.set_application(xmlval.ValidatingApp(dtd, parser)) - parser.dtd = dtd - parser.ent = dtd - - def swap(o1, o2): - return (o2, o1) - - newOut = StringIO.StringIO() - newOut, sys.stdout = swap(newOut, sys.stdout) - errmsg = '' +def getTag(el, tagname): + """If available, return the first element @tagname + below parent @el.""" + tags = el.getElementsByTagName(tagname) + return tags[0] if tags else None - try: - parser.parse_resource(self.file) - except Exception, e: - newOut, sys.stdout = swap(newOut, sys.stdout) - errmsg = '%s\n%s' % (newOut.getvalue().strip(), str(e)) - except: # Exceptions not deriving from Exception..... - newOut, sys.stdout = swap(newOut, sys.stdout) - errmsg = newOut.getvalue().strip() - else: - newOut, sys.stdout = swap(newOut, sys.stdout) - if errmsg: - self.log.error(errmsg) +def xmlPathExists(el, path): + """Does the xml @path element exist below parent @el?""" + current = el + for tag in path.split('/'): + current = getTag(current, tag) + if not current: + break + else: + return True + return False - return not errmsg +def isValidAgainstDTD(xmlfile, dtdContent): + """Validate the XML file at @xmfile against @dtdContent.""" + if not dtdContent: + return False + + # now load this string dtd + dtd = xmldtd.load_dtd_string(dtdContent) + parser = xmlproc.XMLProcessor() + parser.set_application(xmlval.ValidatingApp(dtd, parser)) + parser.dtd = dtd + parser.ent = dtd -class DTD: + def swap(o1, o2): + return (o2, o1) - TIMEOUT = 15 # give up trying to reach the URL after this many seconds + newOut = StringIO.StringIO() + newOut, sys.stdout = swap(newOut, sys.stdout) + + errmsg = '' + try: + parser.parse_resource(xmlfile) + except Exception, e: + newOut, sys.stdout = swap(newOut, sys.stdout) + errmsg = '%s\n%s' % (newOut.getvalue().strip(), str(e)) + except: + newOut, sys.stdout = swap(newOut, sys.stdout) + errmsg = newOut.getvalue().strip() + else: + newOut, sys.stdout = swap(newOut, sys.stdout) - def __init__(self, dtdURL): + error(errmsg) + return not errmsg + + +def getFirstAvailableDTDContents(*dtdURLs): + """Return the first available dtd contents from + the @dtdURLs passed in. If no DTDs exist or are + reachable, return None.""" + for dtdURL in dtdURLs: + contents = getDTDContent(dtdURL) + if contents: + debug('Validating against {0}'.format(dtdURL)) + break + else: + error('No DTDs available, cannot validate') + contents = dtdURL = None + + return (contents, dtdURL) + + +def getDTDContent(dtdURL): + """Fetch the contents of the DTD at @dtdURL, returning + None if the URL either does not exist or is not available. + Timeout to prevent unreachable URLs from blocking.""" + with DTD(dtdURL, timeout=1) as dtd: + if dtd.inexistant: + warning('{0}: inexistant'.format(dtdURL)) + elif dtd.unreachable: + warning('{0}: unreachable, timeout'.format(dtdURL)) + return dtd.contents + + +def getUserJobIds(jobEls): + return [el.getAttribute('userJobId') for el in jobEls] + + +def _idOccursMoreThanOnce(jobID, count): + if count > 1: + error('JobID {0} occurs {1} times'.format(jobID, count)) + return count > 1 + + +def _idHasIllegalChars(jobID): + allowed = string.digits + string.letters + '._:-' + for ch in jobID.lower(): + if ch not in allowed: + m = 'JobID "{0}" '.format(jobID) + m += 'contains the illegal character ("{0}"). '.format(ch) + m += 'Legal chars are letters, digits, ' + m += ':(colon), -(hyphen), .(dot) and _(underscore).' + error(m) + return True + return False + + +def getJobIds(docEl): + athenaTags = docEl.getElementsByTagName('athena') + athenaIds = getUserJobIds(athenaTags) + transformTags = docEl.getElementsByTagName('jobTransform') + transformIds = getUserJobIds(transformTags) + return athenaIds + transformIds + + +def hasLegalUserJobIds(docEl): + """Check the package userJobIds. + These must: + 1. be unique within a package XML file. + 2. match the regex for acceptable names. + """ + + allIDs = getJobIds(docEl) + + # count how many times each job id occurs + dd = defaultdict(int) + for jobID in allIDs: + dd[jobID] += 1 + + legal = True + for jobID, count in dd.items(): + if _idOccursMoreThanOnce(jobID, count) or _idHasIllegalChars(jobID): + legal = False + + return legal + +# ------------------------------------------------------------------- + + +class XMLFile(object): + def __init__(self, xmlfile): + self.xmlfile = xmlfile + self.dtdURL = None + self.docEl = None + self.parsedOK = False + + def isValid(self): + return self.parsedOK and self._do_validate() + + def __enter__(self): + dom = parseXML(self.xmlfile) + if dom: + self.parsedOK = True + self.docEl = dom.documentElement + self.checkForDeprecatedTags() + doctype = dom.doctype + self.dtdURL = doctype.systemId if doctype else None + + return self + + def __exit__(self, etype, evalue, stacktrace): + if etype is not None: + error('XMLFile raised a {0} exception'.format(etype)) + return False # will cause re-raise + + def checkForDeprecatedTags(self): + found = False + for path in ('atn/author', 'atn/mailto'): + if xmlPathExists(self.docEl, path): + warning('{0} tag is deprecated'.format(path)) + found = True + return found + + def _do_validate(self): + if not self.dtdURL: + debug('No external DTD reference found, considering file valid.') + return True + + content, dtdURL = getFirstAvailableDTDContents(self.dtdURL, *altDTDs) + isValid = isValidAgainstDTD(self.xmlfile, content) + hasLegalIds = hasLegalUserJobIds(self.docEl) + return isValid and hasLegalIds + +# ------------------------------------------------------------------- + + +class DTD(object): + def __init__(self, dtdURL, timeout): self.url = dtdURL + self.timeout = timeout # seconds before we should give up self.inexistant = False self.unreachable = False self.contents = None + + def __enter__(self): resource = self._openDTD() if resource: self.contents = resource.read() resource.close() + return self + + def __exit__(self, etype, evalue, stacktrace): + if etype is not None: + error('DTD() raised a {0} exception'.format(etype)) + return False # will cause re-raise def exists(self): return not self.inexistant @@ -258,7 +287,7 @@ class DTD: # prevents unreachable URLs from hanging the request resource = None try: - resource = URL.urlopen(self.url, timeout=DTD.TIMEOUT) + resource = URL.urlopen(self.url, timeout=self.timeout) except URL.HTTPError as e: if e.code == 404: self.inexistant = True @@ -267,25 +296,51 @@ class DTD: return resource +# ------------------------------------------------------------------- -if __name__ == '__main__': - import getopt - import os - xmlfiles = sys.argv[1:] +def validate(*xmlfiles): if not xmlfiles: - print 'Please provide paths to one or more XML files.' + error('Please provide paths to one or more XML files.') sys.exit(1) + exitcode = 0 for xmlfile in xmlfiles: if not os.path.exists(xmlfile): - print 'Inexistant XML file: %s' % xmlfile + warning('{0}: inexistant'.format(xmlfile)) continue - v = Validator(xmlfile) - isValid = v.validate() - print '%s[%s] %s' % (str(v.log), - 'OK' if isValid else 'FAIL', - xmlfile) + with XMLFile(xmlfile) as xf: + if xf.isValid(): + txt = 'OK -' + else: + txt = 'FAIL -' + exitcode = 1 + + info('{0} {1}'.format(txt, xmlfile)) - sys.exit(0) + return exitcode + + +def _getargs(): + args = sys.argv[1:] + try: + args.remove('--verbose') + except ValueError: + pass + return args + + +def main(): + args = _getargs() + sys.exit(validate(*args)) + + +if __name__ == '__main__': + main() +else: + # just importing from test suite, switch off logging + error = devnull + warning = devnull + info = devnull + debug = devnull -- GitLab