Commit 37af8d63 authored by Graeme Stewart's avatar Graeme Stewart
Browse files

Improved JSON transform signatures; added "group" field; more robust script.

New way to elevate std::bad_alloc message to "CATASTROPHE" (level = fatal + 10)
Improved GetTfCommand.py
Wider test coverage
* Tag PyJobTransforms-02-03-37 (PyJobTransforms-02-03-37)

	* Tag PyJobTransforms-02-03-37

2014-12-18  Azzah Alshehri <azzah.aziz.alshehri@cern.ch>
	* python/trfArgClasses.py
	- Convert the member variable self._desc in prodsysDescription method to be a local variable.
	- Pass the argument groups into the json signature so that the AMI can use it to layout arguments sensibly when defining 	tags

2014-12-17 Graeme Stewart <graeme.andrew.stewart@cern.ch>
	* python/trfArgs.py
	- Add new argument group 'Metadata'
	- Add task, job, attempt arguments in metadata group
	- Make metadata arguments a part of standard transform arguments
	- Fix missing group for athenaMPMergeTargetSize
	* test/test_trfValidation.py
	- Clean up all test logfiles

2014-12-17 Graeme Stewart <graeme.andrew.stewart@cern.ch>
	* python/transform.py
	- Small change in startup message
	* python/trfExe.py
...
(Long ChangeLog diff - truncated)
parent e712da97
......@@ -5,23 +5,17 @@
# @brief Main package for new style ATLAS job transforms
# @details Core class for ATLAS job transforms
# @author atlas-comp-transforms-dev@cern.ch
# @version $Id: transform.py 609252 2014-07-29 16:20:33Z wbreaden $
# @version $Id: transform.py 636429 2014-12-17 09:48:38Z graemes $
#
__version__ = '$Revision'
__doc__ = 'Core class for transforms'
import argparse
import os
import os.path
import pprint
import os.path as path
import re
import sys
import time
import traceback
import unittest
from xml.etree import ElementTree
import logging
msg = logging.getLogger(__name__)
......@@ -32,10 +26,9 @@ import PyJobTransforms.trfExceptions as trfExceptions
from PyJobTransforms.trfSignal import setTrfSignalHandlers, resetTrfSignalHandlers
from PyJobTransforms.trfArgs import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
from PyJobTransforms.trfLogger import setRootLoggerLevel, stdLogLevels
from PyJobTransforms.trfJobOptions import JobOptionsTemplate
from PyJobTransforms.trfArgClasses import trfArgParser, argFile, argHISTFile, argument
from PyJobTransforms.trfExitCodes import trfExit
from PyJobTransforms.trfUtils import shQuoteStrings, listChildren, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
from PyJobTransforms.trfUtils import shQuoteStrings, infanticide, pickledDump, JSONDump, cliToKey, convertToStr
from PyJobTransforms.trfReports import trfJobReport, defaultFileReport
from PyJobTransforms.trfExe import transformExecutor
from PyJobTransforms.trfGraph import executorGraph
......@@ -51,10 +44,10 @@ class transform(object):
# @param trfName Name of the transform. Default is executable name with .py rstripped.
# @param executor Executor list
def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
trfName = os.path.basename(sys.argv[0]).rsplit('.py', 1)[0],
trfName = path.basename(sys.argv[0]).rsplit('.py', 1)[0],
executor = set([transformExecutor(),]), exeArgs = None, description = ''):
'''Transform class initialiser'''
msg.debug('Welcome to new transforms')
msg.debug('Welcome to ATLAS job transforms')
## @brief Get starting timestamp as early as possible
self._transformStart = os.times()
......@@ -90,7 +83,7 @@ class transform(object):
# If we were passed executors at construction time then append them to the set:
if executor is not None:
self.appendToExecutorSet(executor)
self.appendToExecutorSet(executor)
## Transform exit code/message holders
self._exitCode = None
......@@ -161,7 +154,8 @@ class transform(object):
executor.trf = self
if executor.name in self._executorDictionary:
raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
'Transform has been initialised with two executors with the same name ({0}) - executor names must be unique'.format(value.name))
'Transform has been initialised with two executors with the same name ({0})'
' - executor names must be unique'.format(executor.name))
self._executors.add(executor)
self._executorDictionary[executor.name] = executor
......@@ -250,6 +244,12 @@ class transform(object):
self._report.fast = True
self.generateReport()
sys.exit(self._exitCode)
except trfExceptions.TransformAMIException, e:
msg.critical('AMI failure: {0!s}'.format(e))
self._exitCode = e.errCode
self._exitMsg = e.errMsg
sys.exit(self._exitCode)
self.setGlobalLogLevel()
......@@ -351,6 +351,13 @@ class transform(object):
self.validateOutFiles()
except trfExceptions.TransformNeedCheckException as e:
msg.warning('Transform executor signaled NEEDCHECK condition: {0}'.format(e.errMsg))
self._exitCode = e.errCode
self._exitMsg = e.errMsg
self.generateReport(fast=False)
sys.exit(self._exitCode)
except trfExceptions.TransformException as e:
msg.critical('Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
self._exitCode = e.errCode
......@@ -403,34 +410,7 @@ class transform(object):
# setup the graph
if 'steering' in self._argdict.keys():
msg.debug('Now applying steering to graph: {0}'.format(self._argdict['steering'].value))
for substep, steeringValues in self._argdict['steering'].value.iteritems():
foundSubstep = False
for executor in self._executors:
if executor.name == substep or executor.substep == substep:
foundSubstep = True
msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
# Steering consists of tuples with (in/out, +/-, datatype)
for steeringValue in steeringValues:
if steeringValue[0] == 'in':
startSet = executor.inData
else:
startSet = executor.outData
origLen = len(startSet)
msg.debug('Data values to be modified are: {0}'.format(startSet))
if steeringValue[1] is '+':
startSet.add(steeringValue[2])
if len(startSet) != origLen + 1:
raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
else:
startSet.discard(steeringValue[2])
if len(startSet) != origLen - 1:
raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
msg.debug('Updated data values to: {0}'.format(startSet))
if not foundSubstep:
raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
'This transform has no executor/substep {0}'.format(substep))
self._doSteering()
# Setup the graph and topo sort it
self._executorGraph = executorGraph(self._executors, self._inputData, self._outputData)
......@@ -452,6 +432,42 @@ class transform(object):
# Tell the first executor that they are the first
self._executorDictionary[self._executorPath[0]['name']].conf.firstExecutor = True
## @brief Setup steering, which manipulates the graph before we trace the path
# for this transform
def _doSteering(self):
steeringAliases = {'doRDO_TRIG': {'RAWtoESD' : [('in', '-', 'RDO'), ('in', '+', 'RDO_TRIG')]},
}
for substep, steeringValues in self._argdict['steering'].value.iteritems():
foundSubstep = False
for executor in self._executors:
if executor.name == substep or executor.substep == substep:
foundSubstep = True
msg.debug('Updating {0} with {1}'.format(executor.name, steeringValues))
# Steering consists of tuples with (in/out, +/-, datatype)
for steeringValue in steeringValues:
if steeringValue[0] == 'in':
startSet = executor.inData
else:
startSet = executor.outData
origLen = len(startSet)
msg.debug('Data values to be modified are: {0}'.format(startSet))
if steeringValue[1] is '+':
startSet.add(steeringValue[2])
if len(startSet) != origLen + 1:
raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
else:
startSet.discard(steeringValue[2])
if len(startSet) != origLen - 1:
raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.format(steeringValue[2], executor.name, steeringValue[1], startSet))
msg.debug('Updated data values to: {0}'.format(startSet))
if not foundSubstep:
raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_GRAPH_STEERING_ERROR'),
'This transform has no executor/substep {0}'.format(substep))
## @brief Return the last executor which actually executed
# @return Last executor which has @c _hasExecuted == @c True, or the very first executor if we didn't even start yet
@property
......@@ -618,9 +634,9 @@ class transform(object):
else:
msg.info('Validating input files')
if 'parallelFileValidation' in self._argdict:
trfValidation.performStandardFileValidation(dict=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input', parallelMode=self._argdict['parallelFileValidation'].value )
else:
trfValidation.performStandardFileValidation(dict=self._dataDictionary, io='input')
trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='input')
def validateOutFiles(self):
if (('skipFileValidation' in self._argdict and self._argdict['skipFileValidation'] is True) or
......@@ -629,6 +645,6 @@ class transform(object):
else:
msg.info('Validating output files')
if 'parallelFileValidation' in self._argdict:
trfValidation.performStandardFileValidation(dict=self._dataDictionary, io='output', parallelMode=self._argdict['parallelFileValidation'].value )
trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output', parallelMode=self._argdict['parallelFileValidation'].value )
else:
trfValidation.performStandardFileValidation(dict=self._dataDictionary, io='output')
\ No newline at end of file
trfValidation.performStandardFileValidation(dictionary=self._dataDictionary, io='output')
\ No newline at end of file
......@@ -8,6 +8,7 @@
import ast
import json
import os
import traceback
......@@ -18,7 +19,7 @@ from PyJobTransforms.trfExceptions import TransformAMIException
from PyJobTransforms.trfDefaultFiles import getInputFileName, getOutputFileName
from PyJobTransforms.trfExitCodes import trfExit
errCode=trfExit.nameToCode('TRF_AMI_ERROR')
AMIerrorCode=trfExit.nameToCode('TRF_AMI_ERROR')
## @brief Stores the configuration of a transform
......@@ -29,6 +30,7 @@ class TrfConfig:
self.physics={}
self.inFiles={}
self.outFiles={}
self.outputs={}
self.inDS=None
self.outfmts=[]
self.newTransform=False
......@@ -42,34 +44,77 @@ class TrfConfig:
def __str__(self):
string = 'asetup '+self.release+'\n'+self.name
string += self._str_to_dict(self.physics) +'\n'
string += self._argsToString(self.physics) +'\n'
string +='\nInput file arguments:\n'
if self.inFiles:
string += self._str_to_dict(self.inFiles) +'\n'
string += self._argsToString(self.inFiles) +'\n'
if self.inDS:
string +='\nExample input dataset: '+ self.inDS + '\n'
string +='\nOutput file arguments:\n'
if self.outFiles:
string += self._str_to_dict(self.outFiles) + '\n'
string += self._argsToString(self.outFiles) + '\n'
string +='\nAMI outputs:\n'
if self.outputs != {}:
string += self.outputs + '\n'
if self.outfmts:
string += '\nPossible output data types: '+ str(self.outfmts) + '\n'
return string
def _str_to_dict(self,adict):
def _argsToString(self, adict):
string=''
for (k,v) in adict.iteritems():
if self.newTransform:
# Some keys already have '--' prefixied
if k.startswith('--'):
string +=" "+k+" '"+v.replace("'", "\\'")+"'"
if not k.startswith('--'):
k = "--"+k
# Now the tricky bit: we have to hackishly massage back to a CLI
# value depending on the value type and possibly the key
# type
if isinstance(v, dict):
# Should be a substep argument
if 'Exec' in k: # preExec, postExec
string += " " + k
for vk, vv in v.iteritems():
string += " " + _parseExecDict(vk, vv)
if 'Include' in k: # preInclude, postInclude
string += " " + k
for vk, vv in v.iteritems():
string += " " + _parseIncludeDict(vk, vv)
elif isinstance(v, (list, tuple)):
# athenaopts are special - space separated
if "athenaopts" in k:
string += " " + k + "=" + "'" + " ".join(v).replace("'", "\\'") + "'"
elif "Exec" in k:
# Special intermediate treatment for pre/postExec from prodsys
string += " " + k + " " + " ".join(["'"+element.replace("'", "\\'")+"'" for element in v])
else:
string += " " + k + "=" + "'" + ",".join(v).replace("'", "\\'") + "'"
else:
string +=" --"+k+" '"+v.replace("'", "\\'")+"'"
# Assume some vanilla value
string +=" "+k+" "+"'"+str(v).replace("'", "\\'")+"'"
else:
string +=" "+k+"='"+v.replace("'", "\\'")+"'"
string +=" "+k+"="+"'"+str(v).replace("'", "\\'")+"'"
return string
## @brief Back convert a pre/postExec dictionary into a set of command
# line compatible strings
def _parseExecDict(substep, value):
string = ""
for bit in value:
string += " '" + substep + ":" + str(bit).replace("'", "\\'")+"'"
return string
## @brief Back convert a pre/postInclude dictionary into a set of command
# line compatible strings
# By default use a comma for joining up the list values
def _parseIncludeDict(substep, value, joinWithChar = ","):
string = "'"
string += substep + ":" + joinWithChar.join(value).replace("'", "\\'")+"'"
return string
## @brief Stores the information about a given tag.
class TagInfo:
def __init__(self,tag):
......@@ -113,74 +158,37 @@ class TagInfo:
for trf in self.trfs:
string+='\n'+str(trf)+'\n'
return string
return string
def dump(self, file):
pass # not yet implemented
## @brief Get AMI client
# @param useReplica If @c True CERN replica is used instead of primary AMI.
# @returns pyAMI.client.AMIClient instance
def getAMIClient(useReplica=False):
## @brief Get an AMI client
# @note Always return a client to the primary replica.
# The caller is allowed to update the replica via the
# config.endpoint value.
# @returns pyAMI.client.Client instance
def getAMIClient():
msg.debug('Getting AMI client...')
endpoint = 'atlas'
try:
from pyAMI.client import AMIClient
from pyAMI.auth import AMI_CONFIG
from pyAMI.exceptions import AMI_Error
from pyAMI import endpoint
from pyAMI.endpoint import get_endpoint, get_XSL_URL
from pyAMI.client import Client
except ImportError:
raise TransformAMIException(errCode, 'Import of pyAMI modules failed.')
raise TransformAMIException(AMIerrorCode, 'Import of pyAMI modules failed.')
if useReplica:
endpoint.TYPE = 'replica'
else:
endpoint.TYPE = 'main'
msg.debug('Using endpoint %s ' % get_endpoint())
msg.debug('Using xsl_url %s ' % get_XSL_URL())
amiclient = AMIClient()
msg.debug("Attempting to get AMI client for endpoint {0}".format(endpoint))
amiclient = Client(endpoint)
return amiclient
## @brief Get list of characters of ProdSys tags
# @returns list of characters
def getProdSysTagsCharacters():
# Due to move to uniform tag definition in AMI this list is now frozen
# So just hard code it
msg.debug('Getting list of ProdSys tag characters...')
defaultList=['y', 'p', 'e', 's', 'd', 'r', 't', 'a', 'b', 'w']
argv=["SearchQuery"]
argv.append("-sql=select productionStep.productionStepTag FROM productionStep WHERE ( ( productionStep.writeStatus LIKE 'valid%') AND productionStep.actor = 'TR')")
argv.append("project=Atlas_Production")
argv.append("processingStep=Atlas_Production")
defaultList=['z', 'p', 'e', 's', 'd', 'r', 't', 'a', 'b', 'w']
try:
from pyAMI.exceptions import AMI_Error
except ImportError:
msg.warning('Import of pyAMI modules failed (is your release setup correctly?).')
msg.warning('Returning default list of ProdSys tags.')
return defaultList
try:
amiclient=getAMIClient(False)
result=amiclient.execute(argv)
except (AMI_Error, TransformAMIException):
msg.debug('An exception occured: %s' % traceback.format_exc())
msg.warning('Getting ProdSysTags from primary AMI failed. Trying CERN replica.')
try:
amiclient=getAMIClient(True)
result=amiclient.execute(argv)
except (AMI_Error, TransformAMIException):
msg.debug('An exception occured: %s' % traceback.format_exc())
msg.warning('Getting ProdSysTags from CERN replica failed (do you have the necessary credentials to access AMI?).')
msg.warning('Returning default list of ProdSysTags.')
return defaultList
return [ row['productionStepTag'] for row in result.rows() ]
return defaultList
## @brief Get PANDA client
......@@ -190,13 +198,13 @@ def getPANDAClient():
try:
import cx_Oracle
except ImportError:
raise TransformAMIException(errCode, 'Import of cx_Oracle failed (is Oracle setup on this machine?).')
raise TransformAMIException(AMIerrorCode, 'Import of cx_Oracle failed (is Oracle setup on this machine?).')
try:
cur = cx_Oracle.connect('atlas_grisli_r/panda_c10@adcr_panda').cursor()
except:
msg.debug('An exception occurred while connecting to PANDA database: %s' % traceback.format_exc())
raise TransformAMIException(errCode, 'Failed to get PANDA client connection (N.B. this does not work from outside CERN).')
raise TransformAMIException(AMIerrorCode, 'Failed to get PANDA client connection (N.B. this does not work from outside CERN).')
return cur
......@@ -219,10 +227,10 @@ def getTrfConfigFromPANDA(tag):
result=pandaclient.fetchone()
except:
msg.info('An exception occurred: %s' % traceback.format_exc())
raise TransformAMIException(errCode, 'Getting tag info from PANDA failed.')
raise TransformAMIException(AMIerrorCode, 'Getting tag info from PANDA failed.')
if result is None:
raise TransformAMIException(errCode, 'Tag %s not found in PANDA database' % tag)
raise TransformAMIException(AMIerrorCode, 'Tag %s not found in PANDA database' % tag)
msg.debug('Raw data returned from panda DB is:' + os.linesep + str(result))
......@@ -241,7 +249,7 @@ def getTrfConfigFromPANDA(tag):
if not ( len(trfn) == len(trfv) == len(lparams) == len(vparams) ):
raise TransformAMIException(errCode, 'Inconsistency in number of trfs.')
raise TransformAMIException(AMIerrorCode, 'Inconsistency in number of trfs.')
# Cache can be a single value, in which case it needs replicated for other
# transform steps, or it can be multivalued - great schema design guys :-(
......@@ -249,7 +257,7 @@ def getTrfConfigFromPANDA(tag):
if len(cache) == 1:
cache = cache * len(trfv)
else:
raise TransformAMIException(errCode, 'Inconsistency in number of caches entries vs. release numbers ({0}; {1}).'.format(cache, trfv))
raise TransformAMIException(AMIerrorCode, 'Inconsistency in number of caches entries vs. release numbers ({0}; {1}).'.format(cache, trfv))
listOfTrfs=[]
......@@ -267,7 +275,7 @@ def getTrfConfigFromPANDA(tag):
values=vparams[iTrf].split(',')
if ( len(keys) != len(values) ):
raise TransformAMIException(errCode, 'Inconsistency in number of arguments.')
raise TransformAMIException(AMIerrorCode, 'Inconsistency in number of arguments.')
physics = dict( (k, ReadablePANDA(v) ) for (k,v) in zip(keys, values))
# Hack to correct trigger keys being stored with spaces in panda
......@@ -276,6 +284,9 @@ def getTrfConfigFromPANDA(tag):
if ' ' in v:
physics[k] = v.replace(' ', ',')
msg.warning('Attempted to correct illegal trigger configuration string: {0} -> {1}'.format(v, physics[k]))
if 'Exec' in k:
# Mash up to a list, where %8C is used as the quote delimitation character
physics[k] = [ execElement.replace("%8C", "") for execElement in v.split("%8C %8C") ]
msg.debug("Checking for pseudo-argument internal to ProdSys...")
if 'extraParameter' in physics:
......@@ -297,7 +308,7 @@ def getTrfConfigFromPANDA(tag):
msg.debug("Checking for not set arguments...")
for arg,value in physics.items():
if value=="NONE" or value=="none":
if value=="NONE" or value=="none" or value==["NONE"]:
val=physics.pop(arg)
msg.debug("Removed %s=%s from arguments." % (arg, val) )
......@@ -317,45 +328,46 @@ def getTrfConfigFromPANDA(tag):
# @param tag Tag for which information is requested
# @returns list of PyJoCbTransforms.trfAMI.TRFConfig instances
def getTrfConfigFromAMI(tag):
msg.debug('Using AMI to get info about tag %s' % tag)
try:
from pyAMI.exceptions import AMI_Error
from pyAMI.query import get_configtags
except ImportError:
raise TransformAMIException(errCode, 'Import of pyAMI modules failed.')
import pyAMI.atlas.api
import pyAMI.exception
except ImportError, e:
raise TransformAMIException(AMIerrorCode, 'Import of pyAMI modules failed ({0})'.format(e))
try:
amiclient=getAMIClient(False)
result=get_configtags(amiclient, tag)
except (AMI_Error, TransformAMIException) as e:
if 'Invalid configTag found' in e.args[0]:
raise TransformAMIException(errCode, 'Tag %s not found in AMI database.' % tag)
amiclient=getAMIClient()
result=pyAMI.atlas.api.get_ami_tag(amiclient, tag)
except pyAMI.exception.Error, e:
msg.warning('An exception occured when connecting to primary AMI: {0}'.format(e))
msg.debug('Exception: {0}'.format(e))
if 'please login' in e.message or 'certificate expired' in e.message:
raise TransformAMIException(AMIerrorCode, 'Getting tag info from AMI failed with credential problem. '
'Please check your AMI account status.')
if 'Invalid amiTag' in e.message:
raise TransformAMIException(AMIerrorCode, 'Invalid AMI tag ({0}).'.format(tag))
msg.debug('An exception occured: %s' % traceback.format_exc())
msg.warning('Getting tag info from primary AMI failed. Trying CERN replica.')
msg.debug("Error may not be fatal - will try AMI replica catalog")
try:
amiclient=getAMIClient(True)
result=get_configtags(amiclient, tag)
except (AMI_Error, TransformAMIException):
msg.debug('An exception occured: %s' % traceback.format_exc())
raise TransformAMIException(errCode, 'Getting tag info from AMI failed.')
amiclient.config.endpoint = 'atlas-replica'
result=pyAMI.atlas.api.get_ami_tag(amiclient, tag)
except pyAMI.exception.Error, e:
msg.error('An exception occured when connecting to the AMI replica catalog: {0}'.format(e))
raise TransformAMIException(AMIerrorCode, 'Getting tag info from AMI failed (tried both primary and replica). '
'See logfile for exception details.')
msg.debug('Raw result from AMI is: %s ' % result)
if ( result[0]!={'amiTag': tag } and result[0]!={'configTag': tag }):
msg.warning('Got unexpected result from AMI: %s when asking for tag %s' % (result[0],tag))
raise TransformAMIException(errCode, 'Getting tag info from AMI failed.')
trf = TrfConfig()
trf.name=result[1]['transformation']
trf.release=result[1]['SWReleaseCache'].replace('AtlasProduction-','')
trf.physics=dict( (k, str(v)) for (k,v) in ast.literal_eval(result[1]['phconfig']).iteritems() )
trf.inFiles=dict( (k, getInputFileName(k)) for k in ast.literal_eval(result[1]['inputs']).iterkeys() )
outputs=ast.literal_eval(result[1]['outputs'])
trf.name=result[0]['transformation']
trf.outputs=result[0].get('outputs', {})
trf.release=result[0]['SWReleaseCache'].replace('AtlasProduction-','')
trf.physics=deserialiseFromAMIString(result[0]['phconfig'])
if not isinstance(trf.physics, dict):
raise TransformAMIException(AMIerrorCode, "Bad result for tag's phconfig: {0}".format(trf.physics))
trf.inFiles=dict( (k, getInputFileName(k)) for k in deserialiseFromAMIString(result[0]['inputs']).iterkeys() )
outputs=deserialiseFromAMIString(result[0]['outputs'])
trf.outFiles=dict( (k, getOutputFileName(outputs[k]['dstype']) ) for k in outputs.iterkeys() )
trf.outfmts=[ outputs[k]['dstype'] for k in outputs.iterkeys() ]
......@@ -367,6 +379,19 @@ def getTrfConfigFromAMI(tag):
return [ trf ]
## @brief Convert from a string to a python object
# @details As we don't know how the string is encoded we
# try JSON then a safe eval. This is a bit ugly :-(
def deserialiseFromAMIString(amistring):
try:
result = json.loads(amistring)
except ValueError, e_json:
msg.debug("Failed to decode {0} as JSON: {1}".format(amistring, e_json))
try:
result = ast.literal_eval(amistring)
except SyntaxError, e_ast:
errMsg = "Failed to deserialise AMI string '{0}' using JSON or eval".format(amistring)
msg.error(errMsg)
raise TransformAMIException(AMIerrorCode, errMsg)
return result
......@@ -3,21 +3,15 @@
## @Package PyJobTransforms.trfArgs
# @brief Standard arguments supported by trf infrastructure
# @author atlas-comp-transforms-dev@cern.ch
# @version $Id: trfArgs.py 613109 2014-08-22 16:55:12Z graemes $
# @version $Id: trfArgs.py 636448 2014-12-17 11:40:15Z graemes $
import argparse
import logging
msg = logging.getLogger(__name__)
import unittest
import pickle
import os
import PyJobTransforms.trfArgClasses as trfArgClasses
from PyJobTransforms.trfArgClasses import argFactory
from PyJobTransforms.trfLogger import stdLogLevels
from PyJobTransforms.trfDecorators import silent
from PyJobTransforms.trfExitCodes import trfExit