From ec135bed541ec2505661a7c4b39d3df7aa6173dc Mon Sep 17 00:00:00 2001 From: Graeme Stewart <graemes.cern@gmail.com> Date: Fri, 15 Apr 2016 11:30:32 +0200 Subject: [PATCH] Tag PyJobTransforms-02-03-76-01 - Specifically to test fork after first event for memory savings - Also collects many other minor fixes - EXCLUDES lightweight output file metadata patch (r731518) (PyJobTransforms-02-03-76-01) wwwpackage PyJobTransforms 2016-04-15 Graeme Stewart <graeme.andrew.stewart@cern.ch> ** Merge in trunk changes from r740535 2016-04-15 Graeme Stewart <graeme.andrew.stewart@cern.ch> ** Manually patch in trunk fixes athenaMPMergeTargetSize handling (ATLASJT-298) *** WARNING - Do not attempt to merge r731518 onto this branch *** 2016-04-15 Graeme Stewart <graeme.andrew.stewart@cern.ch> ** Merge in trunk changes, excluding lightweight metadata fixes 2016-04-11 Azzah Alshehri<azzah.aziz.alshehri@cern.ch> ** Change the self merge method to save the merge information * python/trfExe.py - Change the stanalone variable 'myMerger'inside the method to a class variable. - Save the 'merge executor instance' you get back in the athenaExecutor _smartMerge method. - Add an accessor to the base executor class, that returns the self._myMerge value. * python/trfArgClasses.py - Re-set 'myMerger' to the real merger executor in the actual merge. - Return myMerger for each of the specific file mergers method. ... (Long ChangeLog diff - truncated) --- Tools/PyJobTransforms/CMakeLists.txt | 22 +++ Tools/PyJobTransforms/python/trfAMI.py | 2 +- Tools/PyJobTransforms/python/trfArgClasses.py | 36 +++-- Tools/PyJobTransforms/python/trfArgs.py | 49 +++--- Tools/PyJobTransforms/python/trfEnv.py | 8 +- Tools/PyJobTransforms/python/trfExe.py | 147 ++++++++++++------ Tools/PyJobTransforms/python/trfJobOptions.py | 12 +- Tools/PyJobTransforms/python/trfMPTools.py | 35 +++-- Tools/PyJobTransforms/python/trfReports.py | 39 +++-- Tools/PyJobTransforms/python/trfUtils.py | 67 ++++---- Tools/PyJobTransforms/python/trfValidation.py | 11 +- Tools/PyJobTransforms/scripts/GetTfCommand.py | 18 ++- Tools/PyJobTransforms/test/test_trfMPTools.py | 25 +-- Tools/PyJobTransforms/test/test_trfUtils.py | 23 +-- .../test/test_trfUtilsDBRelease.py | 78 +++++----- 15 files changed, 338 insertions(+), 234 deletions(-) create mode 100644 Tools/PyJobTransforms/CMakeLists.txt diff --git a/Tools/PyJobTransforms/CMakeLists.txt b/Tools/PyJobTransforms/CMakeLists.txt new file mode 100644 index 00000000000..df82eae883c --- /dev/null +++ b/Tools/PyJobTransforms/CMakeLists.txt @@ -0,0 +1,22 @@ +################################################################################ +# Package: PyJobTransforms +################################################################################ + +# Declare the package name: +atlas_subdir( PyJobTransforms ) + +# External dependencies: +find_package( PythonLibs ) + +# Install files from the package: +atlas_install_python_modules( python/*.py ) +atlas_install_joboptions( share/*.py ) +atlas_install_runtime( scripts/*_tf.py ) +atlas_install_runtime( test/PyJobTransforms_TestConfiguration.xml ) +atlas_install_generic( share/*.db + DESTINATION share + EXECUTABLE ) +atlas_install_generic( test/test_*.py + DESTINATION share/JobTransforms/test + EXECUTABLE ) + diff --git a/Tools/PyJobTransforms/python/trfAMI.py b/Tools/PyJobTransforms/python/trfAMI.py index b4d6ab6ab1a..9307fe97a67 100644 --- a/Tools/PyJobTransforms/python/trfAMI.py +++ b/Tools/PyJobTransforms/python/trfAMI.py @@ -229,7 +229,7 @@ def getAMIClient(): raise TransformAMIException(AMIerrorCode, 'Import of pyAMI modules failed.') msg.debug("Attempting to get AMI client for endpoint {0}".format(endpoint)) - amiclient = Client(endpoint) + amiclient = Client(endpoint, ignore_proxy = True) return amiclient ## @brief Get list of characters of ProdSys tags diff --git a/Tools/PyJobTransforms/python/trfArgClasses.py b/Tools/PyJobTransforms/python/trfArgClasses.py index fc02f12a22b..83cddeab46b 100644 --- a/Tools/PyJobTransforms/python/trfArgClasses.py +++ b/Tools/PyJobTransforms/python/trfArgClasses.py @@ -3,7 +3,7 @@ ## @package PyJobTransforms.trfArgClasses # @brief Transform argument class definitions # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfArgClasses.py 696789 2015-09-25 09:18:35Z graemes $ +# @version $Id: trfArgClasses.py 740512 2016-04-15 10:13:14Z graemes $ import argparse import bz2 @@ -641,6 +641,13 @@ class argFile(argList): if prodsysGlob and self._splitter is ',': msg.debug('Detected prodsys glob - normal splitting is disabled') self._value = [value] + elif value.lower().startswith('lfn'): + # Resolve physical filename using pool file catalog. + import PyUtils.AthFile as af + protocol, pfn = af.fname(value) + self._value = [pfn] + self._getDatasetFromFilename(reset = False) + self._resetMetadata() else: self._value = value.split(self._splitter) self._getDatasetFromFilename(reset = False) @@ -1209,12 +1216,13 @@ class argFile(argList): myargdict['checkEventCount'] = argSubstepBool('False', runarg=False) if 'athenaopts' in myargdict: # Need to ensure that "nprocs" is not passed to merger - newopts = [] - for opt in myargdict['athenaopts'].value: - if opt.startswith('--nprocs'): - continue - newopts.append(opt) - myargdict['athenaopts'] = argList(newopts, runarg=False) + for subStep in myargdict['athenaopts'].value: + newopts = [] + for opt in myargdict['athenaopts'].value[subStep]: + if opt.startswith('--nprocs'): + continue + newopts.append(opt) + myargdict['athenaopts'] = argSubstepList(newopts, runarg=False) return myargdict @@ -1244,8 +1252,8 @@ class argAthenaFile(argFile): elif self._type.upper() in ('TAG'): aftype = 'TAG' - # retrieve GUID and nentries without runMiniAthena subprocess for input POOL files - if aftype == 'POOL' and self._io == 'input': + # retrieve GUID and nentries without runMiniAthena subprocess for input POOL files or temporary files + if aftype == 'POOL' and (self._io == 'input' or self._io == 'temporary'): retrieveKeys = inpFileInterestingKeys # get G4Version for HITSFiles @@ -1393,7 +1401,7 @@ class argPOOLFile(argAthenaFile): msg.debug('Post self-merge files are: {0}'.format(self._value)) self._resetMetadata(inputs + [output]) - + return myMerger class argHITSFile(argPOOLFile): @@ -1430,7 +1438,7 @@ class argHITSFile(argPOOLFile): msg.debug('Post self-merge files are: {0}'.format(self._value)) self._resetMetadata(inputs + [output]) - + return myMerger class argRDOFile(argPOOLFile): @@ -1466,7 +1474,7 @@ class argRDOFile(argPOOLFile): msg.debug('Post self-merge files are: {0}'.format(self._value)) self._resetMetadata(inputs + [output]) - + return myMerger @@ -1529,7 +1537,7 @@ class argTAGFile(argPOOLFile): msg.debug('Post self-merge files are: {0}'.format(self._value)) self._resetMetadata(inputs + [output]) - + return myMerger @property def prodsysDescription(self): @@ -1648,7 +1656,7 @@ class argNTUPFile(argFile): msg.debug('Post self-merge files are: {0}'.format(self._value)) self._resetMetadata(inputs + [output]) - + return myMerger @property def prodsysDescription(self): diff --git a/Tools/PyJobTransforms/python/trfArgs.py b/Tools/PyJobTransforms/python/trfArgs.py index 6e19022161e..63468358c25 100644 --- a/Tools/PyJobTransforms/python/trfArgs.py +++ b/Tools/PyJobTransforms/python/trfArgs.py @@ -3,7 +3,7 @@ ## @Package PyJobTransforms.trfArgs # @brief Standard arguments supported by trf infrastructure # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfArgs.py 697822 2015-10-01 11:38:06Z graemes $ +# @version $Id: trfArgs.py 740532 2016-04-15 11:01:50Z graemes $ import logging msg = logging.getLogger(__name__) @@ -58,8 +58,10 @@ def addStandardTrfArgs(parser): # some special transforms). def addAthenaArguments(parser, maxEventsDefaultSubstep='first', addValgrind=True): parser.defineArgGroup('Athena', 'General Athena Options') - parser.add_argument('--athenaopts', group = 'Athena', type=argFactory(trfArgClasses.argList, splitter=' ', runarg=False), metavar='OPT1 OPT2 OPT3', - help='Extra options to pass to athena. Will split on spaces. Options starting with "-" must be given as --athenaopts=\'--opt1 --opt2[=foo] ...\'') + parser.add_argument('--athenaopts', group = 'Athena', type=argFactory(trfArgClasses.argSubstepList, splitter=' ', runarg=False), nargs="+", metavar='substep:ATHENAOPTS', + help='Extra options to pass to athena. Opts will split on spaces. ' + 'Multiple substep options can be given with --athenaopts=\'sutbstep1:--opt1 --opt2[=foo] ...\' \'substep2:--opt3\'' + 'Without substep specified, options will be used for all substeps.') parser.add_argument('--command', '-c', group = 'Athena', type=argFactory(trfArgClasses.argString, runarg=False), metavar='COMMAND', help='Run %(metavar)s before all else') parser.add_argument('--athena', group = 'Athena', type=argFactory(trfArgClasses.argString, runarg=False), metavar='ATHENA', @@ -75,7 +77,8 @@ def addAthenaArguments(parser, maxEventsDefaultSubstep='first', addValgrind=True help='Python code to execute after main job options are included (can be optionally limited to a single substep)') parser.add_argument('--postInclude', group = 'Athena', type=argFactory(trfArgClasses.argSubstepList, splitter=','), nargs='+', metavar='substep:POSTINCLUDE', - help='Python configuration fragment to include after main job options (can be optionally limited to a single substep). Will split on commas: frag1.py,frag2.py is understood.') + help='Python configuration fragment to include after main job options (can be optionally limited ' + 'to a single substep). Will split on commas: frag1.py,frag2.py is understood.') parser.add_argument('--maxEvents', group='Athena', type=argFactory(trfArgClasses.argSubstepInt, defaultSubstep=maxEventsDefaultSubstep), nargs='+', metavar='substep:maxEvents', help='Set maximum events for each processing step (default substep is "{0}")'.format(maxEventsDefaultSubstep)) @@ -89,22 +92,27 @@ def addAthenaArguments(parser, maxEventsDefaultSubstep='first', addValgrind=True metavar='dataType:targetSizeInMegaBytes', nargs='+', group='Athena', help='Set the target merge size for an AthenaMP output file type (give size in MB). ' 'Note that the special value 0 means do not merge this output file; negative values mean ' - 'always merge to a single file. Note that the datatype "ALL" will be used as a default ' - 'for all datatypes not explicitly given their own value.') - parser.add_argument('--athenaMPStrategy', type=trfArgClasses.argFactory(trfArgClasses.argSubstep), nargs='+', - metavar='substep:Strategy', group='Athena', + 'always merge to a single file. Globbing is supported, e.g. "DESD_*:500" is understood. ' + 'Special datatype "ALL" can be used as a default for all datatypes not explicitly ' + 'given their own value or glob matched.') + parser.add_argument('--athenaMPStrategy', type=trfArgClasses.argFactory(trfArgClasses.argSubstep, runarg=False), + nargs='+', metavar='substep:Strategy', group='Athena', help='Set the AthenaMP scheduling strategy for a particular substep. Default is unset, ' 'except when n_inputFiles = n_workers, when it is "FileScheduling" (useful for ' 'ephemeral outputs).') + parser.add_argument('--athenaMPUseEventOrders', type=trfArgClasses.argFactory(trfArgClasses.argBool, runarg=False), + metavar='BOOL', group='Athena', + help='Change AthenaMP setup to read event numbers from event orders files') + parser.add_argument('--athenaMPEventsBeforeFork', type=trfArgClasses.argFactory(trfArgClasses.argInt, runarg=False), + metavar='N', group='Athena', + help='Set AthenaMP to fork after processing N events (default is to fork immediately after ' + 'initialisation') if addValgrind: addValgrindArguments(parser) ## @brief Add Valgrind options def addValgrindArguments(parser): - parser.defineArgGroup( - 'Valgrind', - 'General Valgrind Options' - ) + parser.defineArgGroup('Valgrind', 'General Valgrind Options') parser.add_argument( '--valgrind', group = 'Valgrind', @@ -116,20 +124,17 @@ def addValgrindArguments(parser): help = 'Enable Valgrind' ) parser.add_argument( - '--valgrindbasicopts', + '--valgrindDefaultOpts', group = 'Valgrind', type = argFactory( - trfArgClasses.argList, - splitter = ',', + trfArgClasses.argBool, runarg = False ), - metavar = 'OPT1,OPT2,OPT3', - help = 'Basic options passed to Valgrind when running Athena. ' + - 'Options starting with "-" must be given as ' + - '--valgrindopts=\'--opt1=foo,--opt2=bar,...\'' + metavar = "substep:BOOL", + help = 'Enable default Valgrind options' ) parser.add_argument( - '--valgrindextraopts', + '--valgrindExtraOpts', group = 'Valgrind', type = argFactory( trfArgClasses.argList, @@ -504,9 +509,9 @@ def addValidationArguments(parser): def addTriggerArguments(parser, addTrigFilter=True): parser.defineArgGroup('Trigger', 'Trigger Related Options') parser.add_argument('--triggerConfig', - type=argFactory(trfArgClasses.argSubstep, defaultSubstep="RAWtoESD", separator='='), + type=argFactory(trfArgClasses.argSubstep, defaultSubstep="RDOtoRDOTrigger", separator='='), metavar='substep=triggerConf', - help='Trigger configuration string (substep aware argument - default is to run trigger in RAWtoESD step, ' + help='Trigger configuration string (substep aware argument - default is to run trigger in RDOtoRDOTrigger step, ' 'use syntax SUBSTEP=TRIGCONF if you want to run trigger somewhere else). ' 'N.B. This argument uses EQUALS (=) to separate the substep name from the value.', group='Trigger') diff --git a/Tools/PyJobTransforms/python/trfEnv.py b/Tools/PyJobTransforms/python/trfEnv.py index e8d87585d5a..432cf61b963 100644 --- a/Tools/PyJobTransforms/python/trfEnv.py +++ b/Tools/PyJobTransforms/python/trfEnv.py @@ -3,7 +3,7 @@ ## @Package PyJobTransforms.trfEnv # @brief Support for environemnt variable manipulation in the transforms # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfEnv.py 623865 2014-10-24 12:39:44Z graemes $ +# @version $Id: trfEnv.py 726697 2016-02-28 16:52:54Z uworlika $ import os import os.path as path @@ -39,12 +39,6 @@ class environmentUpdate(object): self._addIMFSettings() return - # OK, try and detect the release - if trfUtils.releaseIsOlderThan(17, 7): - msg.info('No IMF by default for this release') - return - - msg.info('Enabling IMF by default for release') self._addIMFSettings() diff --git a/Tools/PyJobTransforms/python/trfExe.py b/Tools/PyJobTransforms/python/trfExe.py index 9a919654552..2acc5cdb674 100755 --- a/Tools/PyJobTransforms/python/trfExe.py +++ b/Tools/PyJobTransforms/python/trfExe.py @@ -5,7 +5,7 @@ # @brief Transform execution functions # @details Standard transform executors # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfExe.py 697822 2015-10-01 11:38:06Z graemes $ +# @version $Id: trfExe.py 740532 2016-04-15 11:01:50Z graemes $ import copy import json @@ -52,7 +52,7 @@ class executorConfig(object): self._dataDictionary = dataDictionary self._firstExecutor = firstExecutor self._disableMP = disableMP - + @property def argdict(self): return self._argdict @@ -115,8 +115,9 @@ class transformExecutor(object): # @param outData List of outputs this transform can produce (list, tuple or set can be used) def __init__(self, name = 'Dummy', trf = None, conf = None, inData = set(), outData = set()): # Some information to produce helpful log messages + self._name = forceToAlphaNum(name) - + self._myMerger=None # Data this executor can start from and produce # Note we transform NULL to inNULL and outNULL as a convenience self._inData = set(inData) @@ -170,6 +171,9 @@ class transformExecutor(object): ## Now define properties for these data members @property + def myMerger(self): + return self._myMerger + @property def name(self): return self._name @@ -719,7 +723,7 @@ class athenaExecutor(scriptExecutor): # Setup JO templates if self._skeleton is not None: - self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 697822 2015-10-01 11:38:06Z graemes $') + self._jobOptionsTemplate = JobOptionsTemplate(exe = self, version = '$Id: trfExe.py 740532 2016-04-15 11:01:50Z graemes $') else: self._jobOptionsTemplate = None @@ -795,6 +799,11 @@ class athenaExecutor(scriptExecutor): if self._athenaMP: self._athenaMPWorkerTopDir = 'athenaMP-workers-{0}-{1}'.format(self._name, self._substep) self._athenaMPFileReport = 'athenaMP-outputs-{0}-{1}'.format(self._name, self._substep) + self._athenaMPEventOrdersFile = 'athenamp_eventorders.txt.{0}'.format(self._name) + if 'athenaMPUseEventOrders' in self.conf.argdict and self.conf._argdict['athenaMPUseEventOrders'].value is True: + self._athenaMPReadEventOrders = True + else: + self._athenaMPReadEventOrders = False # Decide on scheduling if ('athenaMPStrategy' in self.conf.argdict and (self.conf.argdict['athenaMPStrategy'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None)): @@ -809,14 +818,23 @@ class athenaExecutor(scriptExecutor): else: self._athenaMPStrategy = None # See if we have options for the target output file size - if 'athenaMPMergeTargetSize' in self.conf._argdict: - for dataType, targetSize in self.conf._argdict['athenaMPMergeTargetSize'].value.iteritems(): - if dataType in self.conf._dataDictionary: - self.conf._dataDictionary[dataType].mergeTargetSize = targetSize * 1000000 # Convert from MB to B + if 'athenaMPMergeTargetSize' in self.conf.argdict: + for dataType in self.conf._dataDictionary: + if dataType in self.conf.argdict['athenaMPMergeTargetSize'].value: + self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value[dataType] * 1000000 # Convert from MB to B msg.info('Set target merge size for {0} to {1}'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize)) - elif 'ALL' in self.conf._dataDictionary: - self.conf._dataDictionary['ALL'].mergeTargetSize = targetSize * 1000000 - msg.info('Set target merge size for {0} to {1} (from ALL value)'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize)) + else: + # Use a globbing strategy + matchedViaGlob = False + for mtsType, mtsSize in self.conf.argdict['athenaMPMergeTargetSize'].value.iteritems(): + if fnmatch(dataType, mtsType): + self.conf._dataDictionary[dataType].mergeTargetSize = mtsSize * 1000000 # Convert from MB to B + msg.info('Set target merge size for {0} to {1} from "{2}" glob'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize, mtsType)) + matchedViaGlob = True + break + if not matchedViaGlob and "ALL" in self.conf.argdict['athenaMPMergeTargetSize'].value: + self.conf._dataDictionary[dataType].mergeTargetSize = self.conf.argdict['athenaMPMergeTargetSize'].value["ALL"] * 1000000 # Convert from MB to B + msg.info('Set target merge size for {0} to {1} from "ALL" value'.format(dataType, self.conf._dataDictionary[dataType].mergeTargetSize)) else: self._athenaMPWorkerTopDir = self._athenaMPFileReport = None @@ -904,6 +922,7 @@ class athenaExecutor(scriptExecutor): # If this was an athenaMP run then we need to update output files if self._athenaMP: outputDataDictionary = dict([ (dataType, self.conf.dataDictionary[dataType]) for dataType in self._output ]) + ## @note Update argFile values to have the correct outputs from the MP workers athenaMPOutputHandler(self._athenaMPFileReport, self._athenaMPWorkerTopDir, outputDataDictionary, self._athenaMP) for dataType in self._output: if self.conf.dataDictionary[dataType].io == "output" and len(self.conf.dataDictionary[dataType].value) > 1: @@ -992,34 +1011,59 @@ class athenaExecutor(scriptExecutor): self._exe = self.conf.argdict['athena'].value self._cmd = [self._exe] + # Find options for the current substep. Name is prioritised (e.g. RAWtoESD) over alias (e.g. r2e). Last look for 'all' + currentSubstep = None + if 'athenaopts' in self.conf.argdict: + if self.name in self.conf.argdict['athenaopts'].value: + currentSubstep = self.name + if self.substep in self.conf.argdict['athenaopts'].value: + msg.info('Athenaopts found for {0} and {1}, joining options. ' + 'Consider changing your configuration to use just the name or the alias of the substep.' + .format(currentSubstep, self.substep)) + self.conf.argdict['athenaopts'].value[currentSubstep].extend(self.conf.argdict['athenaopts'].value[self.substep]) + del self.conf.argdict['athenaopts'].value[self.substep] + msg.debug('Athenaopts: {0}'.format(self.conf.argdict['athenaopts'].value)) + elif self.substep in self.conf.argdict['athenaopts'].value: + currentSubstep = self.substep + elif 'all' in self.conf.argdict['athenaopts'].value: + currentSubstep = 'all' + # See if there's a preloadlibs and a request to update LD_PRELOAD for athena + preLoadUpdated = dict() if 'LD_PRELOAD' in self._envUpdate._envdict: - preLoadUpdated = False + preLoadUpdated[currentSubstep] = False if 'athenaopts' in self.conf.argdict: - for athArg in self.conf.argdict['athenaopts'].value: - # This code is pretty ugly as the athenaopts argument contains - # strings which are really key/value pairs - if athArg.startswith('--preloadlib'): - try: - i = self.conf.argdict['athenaopts'].value.index(athArg) - v = athArg.split('=', 1)[1] - msg.info('Updating athena --preloadlib option with: {0}'.format(self._envUpdate.value('LD_PRELOAD'))) - newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":"))) - self.conf.argdict['athenaopts']._value[i] = '--preloadlib={0}'.format(newPreloads) - except Exception, e: - msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e)) - preLoadUpdated = True + if currentSubstep is not None: + for athArg in self.conf.argdict['athenaopts'].value[currentSubstep]: + # This code is pretty ugly as the athenaopts argument contains + # strings which are really key/value pairs + if athArg.startswith('--preloadlib'): + try: + i = self.conf.argdict['athenaopts'].value[currentSubstep].index(athArg) + v = athArg.split('=', 1)[1] + msg.info('Updating athena --preloadlib option for substep {1} with: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name)) + newPreloads = ":".join(set(v.split(":")) | set(self._envUpdate.value('LD_PRELOAD').split(":"))) + self.conf.argdict['athenaopts']._value[currentSubstep][i] = '--preloadlib={0}'.format(newPreloads) + except Exception, e: + msg.warning('Failed to interpret athena option: {0} ({1})'.format(athArg, e)) + preLoadUpdated[currentSubstep] = True break - if not preLoadUpdated: - msg.info('Setting athena preloadlibs to: {0}'.format(self._envUpdate.value('LD_PRELOAD'))) + if not preLoadUpdated[currentSubstep]: + msg.info('Setting athena preloadlibs for substep {1} to: {0}'.format(self._envUpdate.value('LD_PRELOAD'), self.name)) if 'athenaopts' in self.conf.argdict: - self.conf.argdict['athenaopts'].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))) + if currentSubstep is not None: + self.conf.argdict['athenaopts'].value[currentSubstep].append("--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))) + else: + self.conf.argdict['ahtenaopts'].value['all'] = ["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))] else: - self.conf.argdict['athenaopts'] = trfArgClasses.argList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]) + self.conf.argdict['athenaopts'] = trfArgClasses.argSubstepList(["--preloadlib={0}".format(self._envUpdate.value('LD_PRELOAD'))]) # Now update command line with the options we have (including any changes to preload) if 'athenaopts' in self.conf.argdict: - self._cmd.extend(self.conf.argdict['athenaopts'].value) + if currentSubstep is None: + self._cmd.extend(self.conf.argdict['athenaopts'].value['all']) + else: + self._cmd.extend(self.conf.argdict['athenaopts'].value[currentSubstep]) ## Add --drop-and-reload if possible (and allowed!) if self._tryDropAndReload: @@ -1028,11 +1072,17 @@ class athenaExecutor(scriptExecutor): elif 'athenaopts' in self.conf.argdict: athenaConfigRelatedOpts = ['--config-only','--drop-and-reload','--drop-configuration','--keep-configuration'] # Note for athena options we split on '=' so that we properly get the option and not the whole "--option=value" string - conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value])) - if len(conflictOpts) > 0: - msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts))) + if currentSubstep is None: + currentSubstep = 'all' + if currentSubstep in self.conf.argdict['athenaopts'].value: + conflictOpts = set(athenaConfigRelatedOpts).intersection(set([opt.split('=')[0] for opt in self.conf.argdict['athenaopts'].value[currentSubstep]])) + if len(conflictOpts) > 0: + msg.info('Not appending "--drop-and-reload" to athena command line because these options conflict: {0}'.format(list(conflictOpts))) + else: + msg.info('Appending "--drop-and-reload" to athena options') + self._cmd.append('--drop-and-reload') else: - msg.info('Appending "--drop-and-reload" to athena options') + msg.info('No Athenaopts for substep {0}, appending "--drop-and-reload" to athena options'.format(self.name)) self._cmd.append('--drop-and-reload') else: # This is the 'standard' case - so drop and reload should be ok @@ -1110,20 +1160,20 @@ class athenaExecutor(scriptExecutor): # Run Athena for generation of its serialised configuration. print >>wrapper, ' '.join(self._cmd), "--config-only={0}".format(AthenaSerialisedConfigurationFile) print >>wrapper, 'if [ $? != "0" ]; then exit 255; fi' - # Generate a Valgrind command, using default or basic + # Generate a Valgrind command, suppressing or ussing default # options as requested and extra options as requested. - if 'valgrindbasicopts' in self.conf._argdict: - basicOptionsList = self.conf._argdict['valgrindbasicopts'].value + if 'valgrindDefaultOpts' in self.conf._argdict: + defaultOptions = self.conf._argdict['valgrindDefaultOpts'].value else: - basicOptionsList = None - if 'valgrindextraopts' in self.conf._argdict: - extraOptionsList = self.conf._argdict['valgrindextraopts'].value + defaultOptions = True + if 'valgrindExtraOpts' in self.conf._argdict: + extraOptionsList = self.conf._argdict['valgrindExtraOpts'].value else: extraOptionsList = None - msg.debug("requested Valgrind command basic options: {options}".format(options = basicOptionsList)) + msg.debug("requested Valgrind command basic options: {options}".format(options = defaultOptions)) msg.debug("requested Valgrind command extra options: {options}".format(options = extraOptionsList)) command = ValgrindCommand( - basicOptionsList = basicOptionsList, + defaultOptions = defaultOptions, extraOptionsList = extraOptionsList, AthenaSerialisedConfigurationFile = \ AthenaSerialisedConfigurationFile @@ -1191,19 +1241,18 @@ class athenaExecutor(scriptExecutor): counter = 0 for mergeGroup in mergeCandidates: - counter += 1 - # If we only have one merge group, then preserve the original name (important for - # prodsys v1). Otherwise we use the new merged names. - if len(mergeCandidates) == 1: - mergeName = fileArg.originalName - else: + # Note that the individual worker files get numbered with 3 digit padding, + # so these non-padded merges should be fine + mergeName = fileArg.originalName + '_{0}'.format(counter) + while path.exists(mergeName): + counter += 1 mergeName = fileArg.originalName + '_{0}'.format(counter) msg.info('Want to merge files {0} to {1}'.format(mergeGroup, mergeName)) if len(mergeGroup) <= 1: msg.info('Skip merging for single file') else: ## We want to parallelise this part! - fileArg.selfMerge(output=mergeName, inputs=mergeGroup, argdict=self.conf.argdict) + self._myMerger = fileArg.selfMerge(output=mergeName, inputs=mergeGroup, argdict=self.conf.argdict) def _targzipJiveXML(self): diff --git a/Tools/PyJobTransforms/python/trfJobOptions.py b/Tools/PyJobTransforms/python/trfJobOptions.py index e43f20cb270..d0ac8136ca0 100644 --- a/Tools/PyJobTransforms/python/trfJobOptions.py +++ b/Tools/PyJobTransforms/python/trfJobOptions.py @@ -5,7 +5,7 @@ # @brief Contains functions related Athena Job Options files # @details Generates runArgs JobOptions and interfaces with skeleton # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfJobOptions.py 697822 2015-10-01 11:38:06Z graemes $ +# @version $Id: trfJobOptions.py 731249 2016-03-19 22:05:45Z graemes $ # import os @@ -113,6 +113,7 @@ class JobOptionsTemplate(object): # Add the input event count, if we know it if dataArg.isCached(metadataKeys = ['nentries']): print >>runargsFile, '{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArg.nentries) + print >>runargsFile, "{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, self._exe.conf.dataDictionary[dataType].io) print >>runargsFile, os.linesep, "# Output data" for dataType, dataArg in output.iteritems(): @@ -173,12 +174,21 @@ class JobOptionsTemplate(object): 'from AthenaMP.AthenaMPFlags import jobproperties as AthenaMPJobProps', 'AthenaMPJobProps.AthenaMPFlags.WorkerTopDir="{0}"'.format(self._exe._athenaMPWorkerTopDir), 'AthenaMPJobProps.AthenaMPFlags.OutputReportFile="{0}"'.format(self._exe._athenaMPFileReport), + 'AthenaMPJobProps.AthenaMPFlags.EventOrdersFile="{0}"'.format(self._exe._athenaMPEventOrdersFile), 'AthenaMPJobProps.AthenaMPFlags.CollectSubprocessLogs=True' )) if self._exe._athenaMPStrategy: # Beware of clobbering a non default value (a feature used by EventService) print >>runargsFile, 'if AthenaMPJobProps.AthenaMPFlags.Strategy.isDefault():' print >>runargsFile, '\tAthenaMPJobProps.AthenaMPFlags.Strategy="{0}"'.format(self._exe._athenaMPStrategy) + if self._exe._athenaMPReadEventOrders: + if os.path.isfile(self._exe._athenaMPEventOrdersFile): + print >>runargsFile, 'AthenaMPJobProps.AthenaMPFlags.ReadEventOrders=True' + else: + raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_RUNARGS_ERROR"), "Failed to find file: {0} required by athenaMP option: --athenaMPUseEventOrders true".format(self._exe._athenaMPEventOrdersFile)) + if 'athenaMPEventsBeforeFork' in self._exe.conf.argdict: + print >>runargsFile, 'AthenaMPJobProps.AthenaMPFlags.EventsBeforeFork={0}'.format(self._exe.conf.argdict['athenaMPEventsBeforeFork'].value) + msg.info('Successfully wrote runargs file {0}'.format(self._runArgsFile)) except (IOError, OSError) as e: diff --git a/Tools/PyJobTransforms/python/trfMPTools.py b/Tools/PyJobTransforms/python/trfMPTools.py index ca97386eddf..961eb3936ce 100644 --- a/Tools/PyJobTransforms/python/trfMPTools.py +++ b/Tools/PyJobTransforms/python/trfMPTools.py @@ -4,7 +4,7 @@ # # @brief Utilities for handling AthenaMP jobs # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfMPTools.py 677748 2015-06-23 20:29:35Z graemes $ +# @version $Id: trfMPTools.py 731249 2016-03-19 22:05:45Z graemes $ # __version__ = '$Revision' @@ -36,16 +36,17 @@ def detectAthenaMPProcs(argdict = {}): raise ValueError("ATHENA_PROC_NUMBER value was less than zero") msg.info('AthenaMP detected from ATHENA_PROC_NUMBER with {0} workers'.format(athenaMPProcs)) elif 'athenaopts' in argdict: - procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value if '--nprocs' in opt] - if len(procArg) == 0: - athenaMPProcs = 0 - elif len(procArg) == 1: - athenaMPProcs = int(procArg[0]) - if athenaMPProcs < 0: - raise ValueError("--nprocs was set to a value less than zero") - else: - raise ValueError("--nprocs was set more than once in 'athenaopts'") - msg.info('AthenaMP detected from "nprocs" setting with {0} workers'.format(athenaMPProcs)) + for substep in argdict['athenaopts'].value: + procArg = [opt.replace("--nprocs=", "") for opt in argdict['athenaopts'].value[substep] if '--nprocs' in opt] + if len(procArg) == 0: + athenaMPProcs = 0 + elif len(procArg) == 1: + athenaMPProcs = int(procArg[0]) + if athenaMPProcs < 0: + raise ValueError("--nprocs was set to a value less than zero") + else: + raise ValueError("--nprocs was set more than once in 'athenaopts'") + msg.info('AthenaMP detected from "nprocs" setting with {0} workers for substep {1}'.format(athenaMPProcs,substep)) except ValueError, errMsg: myError = 'Problem discovering AthenaMP setup: {0}'.format(errMsg) raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'), myError) @@ -127,15 +128,21 @@ def athenaMPoutputsLinkAndUpdate(newFullFilenames, fileArg): # Do we need to append worker dir suffixes? linkedNameList = [] uniqueSimpleNames = set([path.basename(fname) for fname in newFullFilenames]) + # Check here if MP created it's own unique names - otherwise we need to add suffixes + # so that each output file is unique if len(uniqueSimpleNames) != len(newFullFilenames): + indexesUsed = [] for fname in newFullFilenames: simpleName = path.basename(fname) workerIndexMatch = re.search(r'/worker_(\d+)/', fname) if workerIndexMatch: - workerIndex = workerIndexMatch.group(1) + fileIndex = int(workerIndexMatch.group(1)) + 1 else: - raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_OUTPUT_FILE_ERROR"), "Found output file ({0}) not in an AthenaMP worker directory".format(fname)) - simpleName += "._{0:03d}".format(int(workerIndex)) + fileIndex = 0 + if fileIndex in indexesUsed: + fileIndex = max(indexesUsed)+1 + indexesUsed.append(fileIndex) + simpleName += "_{0:03d}".format(int(fileIndex)) linkedNameList.append(simpleName) else: linkedNameList = [path.basename(fname) for fname in newFullFilenames] diff --git a/Tools/PyJobTransforms/python/trfReports.py b/Tools/PyJobTransforms/python/trfReports.py index 80c9f734b67..2d760dba7fa 100644 --- a/Tools/PyJobTransforms/python/trfReports.py +++ b/Tools/PyJobTransforms/python/trfReports.py @@ -6,10 +6,10 @@ # @details Classes whose instance encapsulates transform reports # at different levels, such as file, executor, transform # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfReports.py 696484 2015-09-23 17:20:28Z graemes $ +# @version $Id: trfReports.py 740537 2016-04-15 11:28:11Z graemes $ # -__version__ = '$Revision: 696484 $' +__version__ = '$Revision: 740537 $' import cPickle as pickle import json @@ -105,7 +105,7 @@ class trfReport(object): class trfJobReport(trfReport): ## @brief This is the version counter for transform job reports # any changes to the format @b must be reflected by incrementing this - _reportVersion = '1.0.6' + _reportVersion = '1.1.0' _metadataKeyMap = {'AMIConfig': 'AMI', } _maxMsgLen = 256 _truncationMsg = " (truncated)" @@ -171,22 +171,12 @@ class trfJobReport(trfReport): myDict['executor'].append(trfExecutorReport(exe).python(fast = fast)) # Executor resources are gathered here to unify where this information is held # and allow T0/PanDA to just store this JSON fragment on its own - exeResource = {'cpuTime': exe.cpuTime, - 'wallTime': exe.wallTime,} - if exe.memStats: - exeResource['memory'] = exe.memStats - if exe.eventCount: - exeResource['nevents'] = exe.eventCount - if exe.athenaMP: - exeResource['mpworkers'] = exe.athenaMP - if exe.dbMonitor: - exeResource['dbData'] = exe.dbMonitor['bytes'] - exeResource['dbTime'] = exe.dbMonitor['time'] - myDict['resource']['executor'][executionStep['name']] = exeResource - + myDict['resource']['executor'][exe.name] = exeResourceReport(exe) + if exe.myMerger: + myDict['resource']['executor'][exe.myMerger.name] = exeResourceReport(exe.myMerger) # Resource consumption reportTime = os.times() - + # Calculate total cpu time we used - myCpuTime = reportTime[0] + reportTime[1] childCpuTime = reportTime[2] + reportTime[3] @@ -605,3 +595,18 @@ def pyJobReportToFileDict(jobReport, io = 'all'): for filedata in jobReport['files'][iotype]: dataDict[filedata['type']] = filedata return dataDict + + +def exeResourceReport(exe): + exeResource = {'cpuTime': exe.cpuTime, + 'wallTime': exe.wallTime,} + if exe.memStats: + exeResource['memory'] = exe.memStats + if exe.eventCount: + exeResource['nevents'] = exe.eventCount + if exe.athenaMP: + exeResource['mpworkers'] = exe.athenaMP + if exe.dbMonitor: + exeResource['dbData'] = exe.dbMonitor['bytes'] + exeResource['dbTime'] = exe.dbMonitor['time'] + return exeResource diff --git a/Tools/PyJobTransforms/python/trfUtils.py b/Tools/PyJobTransforms/python/trfUtils.py index fddfa176d84..c4e1783b454 100644 --- a/Tools/PyJobTransforms/python/trfUtils.py +++ b/Tools/PyJobTransforms/python/trfUtils.py @@ -3,7 +3,7 @@ ## @package PyJobTransforms.trfUtils # @brief Transform utility functions # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfUtils.py 665892 2015-05-08 14:54:36Z graemes $ +# @version $Id: trfUtils.py 712411 2015-12-03 16:46:19Z mavogel $ import os import os.path as path @@ -1167,52 +1167,55 @@ class ParallelJobProcessor(object): # @detail This function returns a Valgrind command for use with Athena. The # command is returned as a string (by default) or a list, as requested using # the argument returnFormat. -# To return a default Valgrind command specification, the function is called -# with no command options specified. To compose a command from scratch, the -# argument optionsList is used. This causes the list of specified command -# options to be appended to the basic executable command. To append options to -# the default command specification, the argument extraOptionsList is used. -# This causes the list of extra specified command options to be appended to -# the default command specification (not simply the basic executable command). +# The function will return a default Valgrind command specification, unless +# the user suppress them through an option. To append additional options to +# the command specification the argument extraOptionsList is used. This +# causes the list of extra specified command options to be appended to +# the command specification, which will contain the default options unless +# these are suppressed. # The Athena serialised configuration file is specified using the argument # AthenaSerialisedConfigurationFile. -# @return command as string or command as list +# @return command as string def ValgrindCommand( - basicOptionsList = None, + defaultOptions = True, extraOptionsList = None, AthenaSerialisedConfigurationFile = "athenaConf.pkl", returnFormat = "string" ): + + # Access Valgrind suppressions files by finding the paths from + # environment variables. Append the files to the Valgrind suppressions + # options. + suppressionFilesAndCorrespondingPathEnvironmentVariables = { + "etc/valgrind-root.supp": "ROOTSYS", + "Gaudi.supp": "DATAPATH", + "oracleDB.supp": "DATAPATH", + "valgrindRTT.supp": "DATAPATH", + "root.supp": "DATAPATH" + } optionsList = ["valgrind"] - # If basic options are not specified, use default options. - if not basicOptionsList: + # If default options are not suppressed, use them. + if defaultOptions: optionsList.append("--num-callers=30") optionsList.append("--tool=memcheck") optionsList.append("--leak-check=full") - # Access Valgrind suppressions files by finding the paths from - # environment variables. Append the files to the Valgrind suppressions - # options. - suppressionFilesAndCorrespondingPathEnvironmentVariables = { - "etc/valgrind-root.supp": "ROOTSYS", - "Gaudi.supp/Gaudi.supp": "DATAPATH", - "oracleDB.supp": "DATAPATH", - "valgrindRTT.supp": "DATAPATH", - "root.supp/root.supp": "DATAPATH" - } - for suppressionFile, pathEnvironmentVariable in suppressionFilesAndCorrespondingPathEnvironmentVariables.iteritems(): - optionsList.append("--suppressions=" + - findFile(os.environ[pathEnvironmentVariable], suppressionFile)) - optionsList.append("$(which python)") - optionsList.append("$(which athena.py)") - optionsList.append(AthenaSerialisedConfigurationFile) - # If basic options are specified, append them to the existing options. - if basicOptionsList: - for option in basicOptionsList: - optionsList.append(option) + optionsList.append("--smc-check=all") # If extra options are specified, append them to the existing options. if extraOptionsList: for option in extraOptionsList: optionsList.append(option) + # Add suppression files and athena commands + for suppressionFile, pathEnvironmentVariable in suppressionFilesAndCorrespondingPathEnvironmentVariables.iteritems(): + suppFile = findFile(os.environ[pathEnvironmentVariable], suppressionFile) + if suppFile: + optionsList.append("--suppressions=" + suppFile) + else: + msg.warning("Bad path to suppression file: {sfile}, {path} not defined".format( + sfile = suppressionFile, path = pathEnvironmentVariable) + ) + optionsList.append("$(which python)") + optionsList.append("$(which athena.py)") + optionsList.append(AthenaSerialisedConfigurationFile) # Return the command in the requested format, string (by default) or list. if returnFormat is None or returnFormat == "string": return(" ".join(optionsList)) diff --git a/Tools/PyJobTransforms/python/trfValidation.py b/Tools/PyJobTransforms/python/trfValidation.py index 7f244845bab..da2abc8d969 100644 --- a/Tools/PyJobTransforms/python/trfValidation.py +++ b/Tools/PyJobTransforms/python/trfValidation.py @@ -6,7 +6,7 @@ # @details Contains validation classes controlling how the transforms # will validate jobs they run. # @author atlas-comp-transforms-dev@cern.ch -# @version $Id: trfValidation.py 679715 2015-07-02 11:28:03Z lerrenst $ +# @version $Id: trfValidation.py 740537 2016-04-15 11:28:11Z graemes $ # @note Old validation dictionary shows usefully different options: # <tt>self.validationOptions = {'testIfEmpty' : True, 'testIfNoEvents' : False, 'testIfExists' : True, # 'testIfCorrupt' : True, 'testCountEvents' : True, 'extraValidation' : False, @@ -557,6 +557,9 @@ def performStandardFileValidation(dictionary, io, parallelMode = False): raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname) elif arg.getSingleMetadata(fname, 'integrity') == 'UNDEFINED': msg.info('No corruption test defined.') + elif arg.getSingleMetadata(fname, 'integrity') is None: + msg.error('Could not check for file integrity') + raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s might be missing' % fname) else: msg.error('Unknown rc from corruption test.') raise trfExceptions.TransformValidationException(trfExit.nameToCode('TRF_EXEC_VALIDATION_FAIL'), 'File %s did not pass corruption test' % fname) @@ -713,9 +716,9 @@ class eventMatch(object): self._eventCountConf['EVNT_COSMICS'] = {'HITS': simEventEff} self._eventCountConf['EVNT_Stopped'] = {'HITS': simEventEff} self._eventCountConf['HITS'] = {'RDO':"match", "HITS_MRG":"match", 'HITS_FILT': simEventEff, "RDO_FILT": "filter"} - self._eventCountConf['BS'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "BS_MRG":"match", 'DESD_*': "filter"} - self._eventCountConf['RDO*'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "RDO_MRG":"match", "RDO_TRIG":"match"} - self._eventCountConf['ESD'] = {'ESD_MRG': "match", 'AOD':"match", 'DESD_*':"filter", 'DAOD_*':"filter", 'NTUP_*':"filter"} + self._eventCountConf['BS'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "BS_MRG":"match", 'DESD*': "filter", 'AOD':"match", 'DAOD*':"filter"} + self._eventCountConf['RDO*'] = {'ESD': "match", 'DRAW_*':"filter", 'NTUP_*':"filter", "RDO_MRG":"match", "RDO_TRIG":"match", 'AOD':"match", 'DAOD*':"filter"} + self._eventCountConf['ESD'] = {'ESD_MRG': "match", 'AOD':"match", 'DESD*':"filter", 'DAOD_*':"filter", 'NTUP_*':"filter"} self._eventCountConf['AOD'] = {'AOD_MRG' : "match", 'TAG':"match", "NTUP_*":"filter", "DAOD_*":"filter", 'NTUP_*':"filter"} self._eventCountConf['AOD_MRG'] = {'TAG':"match"} self._eventCountConf['DAOD_*'] = {'DAOD_*_MRG' : "match"} diff --git a/Tools/PyJobTransforms/scripts/GetTfCommand.py b/Tools/PyJobTransforms/scripts/GetTfCommand.py index 33c0917decc..7943f4481a5 100755 --- a/Tools/PyJobTransforms/scripts/GetTfCommand.py +++ b/Tools/PyJobTransforms/scripts/GetTfCommand.py @@ -9,7 +9,8 @@ import sys import argparse from PyJobTransforms.trfLogger import msg, stdLogLevels -msg.info('logging set in %s' % sys.argv[0]) +if not '--printOnlyCmdLine' in sys.argv: + msg.info('logging set in %s' % sys.argv[0]) from PyJobTransforms.trfAMI import TagInfo from PyJobTransforms.trfExceptions import TransformAMIException @@ -19,6 +20,7 @@ def main(): parser.add_argument('--AMI', '--AMIConfig', help = 'Production tag to be interpreted', required = True) parser.add_argument('--verbose', '--debug', action = 'store_true', help = 'set logging level to DEBUG') parser.add_argument('--doNotSuppressNonJobOptions', action = 'store_true', help = 'get full output from AMI') + parser.add_argument('--printOnlyCmdLine', action = 'store_true', help = 'simply put out the TRF command line, nothing else') args = vars(parser.parse_args(sys.argv[1:])) @@ -37,11 +39,19 @@ def main(): print 'Note that you need both suitable credentials to access AMI and access to the panda database (only works from inside CERN) for GetTfCommand.py to work.' sys.exit(1) - print tag + if not 'printOnlyCmdLine' in args: + print tag - if 'argdict' in args: - tag.dump(args['argdict']) + if 'argdict' in args: + tag.dump(args['argdict']) + else: + # only print the command line, allows stuff like + # pathena --trf "`GetTfCommand --AMI q1234 --printOnlyCmdLine` --inputFile bla.input --maxEvents 42" + trfCmdLine = tag.trfs[0].name + " " + tag.trfs[0]._argsToString(tag.trfs[0].physics) + trfCmdLine.replace('"', '\\' + '"') + print trfCmdLine if __name__ == '__main__': main() + diff --git a/Tools/PyJobTransforms/test/test_trfMPTools.py b/Tools/PyJobTransforms/test/test_trfMPTools.py index f842c4b7ad7..df8bc79595b 100755 --- a/Tools/PyJobTransforms/test/test_trfMPTools.py +++ b/Tools/PyJobTransforms/test/test_trfMPTools.py @@ -5,7 +5,7 @@ ## @Package test_trfMPTools.py # @brief Unittests for trfMPTools.py # @author graeme.andrew.stewart@cern.ch -# @version $Id: test_trfMPTools.py 677748 2015-06-23 20:29:35Z graemes $ +# @version $Id: test_trfMPTools.py 725493 2016-02-22 13:07:59Z mavogel $ import os import subprocess @@ -16,7 +16,7 @@ msg = logging.getLogger(__name__) # Allowable to import * from the package for which we are the test suite from PyJobTransforms.trfMPTools import * -from PyJobTransforms.trfArgClasses import argList, argFile +from PyJobTransforms.trfArgClasses import argList, argSubstepList, argFile import PyJobTransforms.trfExceptions as trfExceptions @@ -30,7 +30,7 @@ class AthenaMPProcTests(unittest.TestCase): self.assertEqual(detectAthenaMPProcs(), 0) def test_noMPwithArgdict(self): - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['some', 'random', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['some', 'random', 'values'])} self.assertEqual(detectAthenaMPProcs(argdict), 0) def test_MPfromEnv(self): @@ -48,33 +48,34 @@ class AthenaMPProcTests(unittest.TestCase): self.assertRaises(trfExceptions.TransformExecutionException, detectAthenaMPProcs) def test_MPfromArgdict(self): - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['--nprocs=8', 'random', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['--nprocs=8', 'random', 'values'])} self.assertEqual(detectAthenaMPProcs(argdict), 8) def test_MPfromArgdictEmpty(self): - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['--nprocs=0', 'random', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['--nprocs=0', 'random', 'values'])} self.assertEqual(detectAthenaMPProcs(argdict), 0) def test_MPfromArgdictBad(self): - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['--nprocs=-4', 'random', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['--nprocs=-4', 'random', 'values'])} self.assertRaises(trfExceptions.TransformExecutionException, detectAthenaMPProcs, argdict) - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['--nprocs=notAnInt', 'random', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['--nprocs=notAnInt', 'random', 'values'])} self.assertRaises(trfExceptions.TransformExecutionException, detectAthenaMPProcs, argdict) - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['--nprocs=4', '--nprocs=8', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['--nprocs=4', '--nprocs=8', 'values'])} self.assertRaises(trfExceptions.TransformExecutionException, detectAthenaMPProcs, argdict) def test_MPfromBoth(self): # Env should have priority os.environ["ATHENA_PROC_NUMBER"] = "4" - argdict={'movealong': argList('nothing to see here'), 'athenaopts': argList(['--nprocs=2', 'random', 'values'])} + argdict={'movealong': argList('nothing to see here'), 'athenaopts': argSubstepList(['--nprocs=2', 'random', 'values'])} self.assertEqual(detectAthenaMPProcs(argdict), 4) class AthenaMPOutputParseTests(unittest.TestCase): def setUp(self): - # Gah, this is a pest to setup! - cwd = os.getcwd() - outputStruct = [('.', [], ['data15_13TeV.00267167.physics_Main.merge.RAW._lb0176._SFO-1._0001.1']), ('athenaMP-workers-RAWtoESD-r2e', ['worker_3', 'worker_7', 'worker_4', 'worker_5', 'worker_2', 'worker_6', 'evt_counter', 'worker_1', 'worker_0'], []), ('athenaMP-workers-RAWtoESD-r2e/worker_3', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_7', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_4', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_5', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_2', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_6', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/evt_counter', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_1', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_0', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out'])] + # Gah, this is a pest to setup! Need to creat stub files for the mother outputs + # and the worker outputs + outputStruct = [('.', [], ['data15_13TeV.00267167.physics_Main.merge.RAW._lb0176._SFO-1._0001.1', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'tmp.HIST_ESD_INT', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002']), + ('athenaMP-workers-RAWtoESD-r2e', ['worker_3', 'worker_7', 'worker_4', 'worker_5', 'worker_2', 'worker_6', 'evt_counter', 'worker_1', 'worker_0'], []), ('athenaMP-workers-RAWtoESD-r2e/worker_3', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_7', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_4', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_5', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_2', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_6', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/evt_counter', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_1', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out']), ('athenaMP-workers-RAWtoESD-r2e/worker_0', [], ['tmp.HIST_ESD_INT', 'AthenaMP.log', 'data15_13TeV.00267167.physics_Main.recon.DRAW_TAUMUH.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.ESD.f594._lb0176._SFO-1._0002', 'eventLoopHeartBeat.txt', 'ntuple_RAWtoESD.pmon.gz', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EGZ.f594._lb0176._SFO-1._0002', 'FileManagerLog', 'PoolFileCatalog.xml.BAK', 'PoolFileCatalog.xml', 'data15_13TeV.00267167.physics_Main.recon.DRAW_ZMUMU.f594._lb0176._SFO-1._0002', 'data15_13TeV.00267167.physics_Main.recon.DRAW_EMU.f594._lb0176._SFO-1._0002', 'AtRanluxGenSvc.out'])] for delement in outputStruct: try: os.mkdir(delement[0]) diff --git a/Tools/PyJobTransforms/test/test_trfUtils.py b/Tools/PyJobTransforms/test/test_trfUtils.py index b6f8b30cadc..4be6920d863 100755 --- a/Tools/PyJobTransforms/test/test_trfUtils.py +++ b/Tools/PyJobTransforms/test/test_trfUtils.py @@ -5,7 +5,7 @@ ## @Package test_trfUtils.py # @brief Unittests for trfUtils.py # @author graeme.andrew.stewart@cern.ch -# @version $Id: test_trfUtils.py 594679 2014-04-29 14:15:19Z graemes $ +# @version $Id: test_trfUtils.py 711194 2015-11-27 14:44:03Z mavogel $ import unittest import os @@ -70,23 +70,10 @@ class trfUtilsInfanticide(unittest.TestCase): # print subprocess.check_output(['ps', 'guxw']) self.assertEqual(len(myWeans), 0) - # This is just too hard and too dangerous to test -# def test_orphanKiller(self): -# p = subprocess.Popen(["./{0}".format(self.exitWrapper)]) -# time.sleep(1) -# # print subprocess.check_output(['ps', 'ax', '-o', 'pid,ppid,pgid,args', '-m']) -# p.poll() -# myWeans = listChildren(listOrphans = True) -# self.assertGreaterEqual(len(myWeans), 1) -# infanticide(myWeans) -# p.wait() # This is important to clean up zombies -# myWeans = listChildren(listOrphans = True) -# # print subprocess.check_output(['ps', 'guxw']) -# self.assertGreaterEqual(len(myWeans), 0) - -# @timelimited(timeout=10, sleeptime=1) -# def test_timelimitedKiller(self): +class TestValgrindCommand(unittest.TestCase): + def test_valgrindarguments(self): + vgc=ValgrindCommand() + self.assertTrue(vgc.startswith('valgrind')) - if __name__ == '__main__': unittest.main() diff --git a/Tools/PyJobTransforms/test/test_trfUtilsDBRelease.py b/Tools/PyJobTransforms/test/test_trfUtilsDBRelease.py index c18132f52ae..3f1dabf013b 100755 --- a/Tools/PyJobTransforms/test/test_trfUtilsDBRelease.py +++ b/Tools/PyJobTransforms/test/test_trfUtilsDBRelease.py @@ -3,7 +3,7 @@ # Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration # # Test the various DBRelease scenarios -# $Id: test_trfUtilsDBRelease.py 665892 2015-05-08 14:54:36Z graemes $ +# $Id: test_trfUtilsDBRelease.py 740537 2016-04-15 11:28:11Z graemes $ # import json @@ -67,44 +67,44 @@ class DBReleasetest(unittest.TestCase): sys.stdout.write(line) self.assertEqual(p.returncode, 0) - # Test using a DBRelease file which exists, absolute path (this should _not_ rerun the setup script, of course) - def test_tarballAbsPath(self): - cmd = ['Athena_tf.py', '--DBRelease', '/afs/cern.ch/work/g/graemes/ddm/ddo.000001.Atlas.Ideal.DBRelease.v220701/DBRelease-22.7.1.tar.gz'] - msg.info('Will run this transform: {0}'.format(cmd)) - p = subprocess.Popen(cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1) - while p.poll() is None: - line = p.stdout.readline() - sys.stdout.write(line) - # Hoover up remaining buffered output lines - for line in p.stdout: - sys.stdout.write(line) - self.assertEqual(p.returncode, 0) - - # Test using the next Pcache release DBRelease file, a soft link to a candidate - def test_tarballPcacheCurrent(self): - cmd = ['Athena_tf.py', '--DBRelease', '/afs/cern.ch/atlas/www/GROUPS/DATABASE/pacman4/DBRelease/DBRelease-pcache-current.tar.gz'] - msg.info('Will run this transform: {0}'.format(cmd)) - p = subprocess.Popen(cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1) - while p.poll() is None: - line = p.stdout.readline() - sys.stdout.write(line) - # Hoover up remaining buffered output lines - for line in p.stdout: - sys.stdout.write(line) - self.assertEqual(p.returncode, 0) - - # Test using a DBRelease file which doesn't exist, but should fallback to CVMFS - def test_tarballFallback(self): - cmd = ['Athena_tf.py', '--DBRelease', 'DBRelease-23.3.1.tar.gz'] - msg.info('Will run this transform: {0}'.format(cmd)) - p = subprocess.Popen(cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1) - while p.poll() is None: - line = p.stdout.readline() - sys.stdout.write(line) - # Hoover up remaining buffered output lines - for line in p.stdout: - sys.stdout.write(line) - self.assertEqual(p.returncode, 0) +# # Test using a DBRelease file which exists, absolute path (this should _not_ rerun the setup script, of course) +# def test_tarballAbsPath(self): +# cmd = ['Athena_tf.py', '--DBRelease', '/afs/cern.ch/work/g/graemes/ddm/ddo.000001.Atlas.Ideal.DBRelease.v220701/DBRelease-22.7.1.tar.gz'] +# msg.info('Will run this transform: {0}'.format(cmd)) +# p = subprocess.Popen(cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1) +# while p.poll() is None: +# line = p.stdout.readline() +# sys.stdout.write(line) +# # Hoover up remaining buffered output lines +# for line in p.stdout: +# sys.stdout.write(line) +# self.assertEqual(p.returncode, 0) +# +# # Test using the next Pcache release DBRelease file, a soft link to a candidate +# def test_tarballPcacheCurrent(self): +# cmd = ['Athena_tf.py', '--DBRelease', '/afs/cern.ch/atlas/www/GROUPS/DATABASE/pacman4/DBRelease/DBRelease-pcache-current.tar.gz'] +# msg.info('Will run this transform: {0}'.format(cmd)) +# p = subprocess.Popen(cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1) +# while p.poll() is None: +# line = p.stdout.readline() +# sys.stdout.write(line) +# # Hoover up remaining buffered output lines +# for line in p.stdout: +# sys.stdout.write(line) +# self.assertEqual(p.returncode, 0) +# +# # Test using a DBRelease file which doesn't exist, but should fallback to CVMFS +# def test_tarballFallback(self): +# cmd = ['Athena_tf.py', '--DBRelease', 'DBRelease-23.3.1.tar.gz'] +# msg.info('Will run this transform: {0}'.format(cmd)) +# p = subprocess.Popen(cmd, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = 1) +# while p.poll() is None: +# line = p.stdout.readline() +# sys.stdout.write(line) +# # Hoover up remaining buffered output lines +# for line in p.stdout: +# sys.stdout.write(line) +# self.assertEqual(p.returncode, 0) # Negative test - use an illegal name format def test_illegalName(self): -- GitLab