Commit e7c117ca authored by Graeme Stewart's avatar Graeme Stewart
Browse files

* trfExe.py

        - Change format of DAOD output filename to match
          grid expectations
    * Tag PyJobTransforms-02-03-26 (PyJobTransforms-02-03-26)
parent 9a3aec74
package PyJobTransforms
use AtlasPolicy AtlasPolicy-*
use AtlasPython AtlasPython-* External -no_auto_imports
use AtlasPyFwdBwdPorts * External -no_auto_imports
apply_pattern declare_python_modules files="*.py"
# default directory in current package to look for trf's.
macro tfs_dir '../scripts'
apply_pattern declare_scripts files="${expand_files_cmd} find_*.py -s=../python"
apply_pattern generic_declare_for_link kind=runtime files='-s=../share *.db' prefix=share name=trf
# Pattern to declare python job transforms.
# Each job transform normally has 2 components:
# - The python script (*_tf.py), defining the trf
# - The corresponding skeleton job options file (at least for athena transforms)
# The pattern takes 2 arguments:
# tfs = list of job transforms, by default taken from ../scripts
# These scripts will be installed in InstallArea/share/bin
# jo = list of skeleton joboptions files belonging to the job transforms
# By default taken from ../share
# These will be installed in the Installarea/jobOptions/<package>
#
pattern declare_job_transforms \
private ; \
apply_pattern generic_declare_for_link kind=tfs_exe files='-s=${tfs_dir} <tfs>' prefix=share/bin ; \
apply_pattern generic_declare_for_link kind=tfs_pyt files='-s=${tfs_dir} <tfs>' prefix=python/<package> ; \
apply_pattern generic_declare_for_link kind=tfs_jop files='-s=../share <jo>' prefix=jobOptions/<package> ; \
macro <package>_job_transforms "`${expand_files_cmd} -r=$(<PACKAGE>ROOT) -d=<package> -s=${tfs_dir} <tfs>`" ; \
apply_pattern install_python_init ; \
macro_append <package>_python_init_dependencies " install_tfs_pyt " ; \
end_private ; \
macro_append all_job_transforms " ${<package>_job_transforms}"
# For sample/utility tfs we need to do this after the pattern def
apply_pattern declare_job_transforms tfs='*_tf.py' jo='*.py'
# RTT tests
apply_pattern generic_declare_for_link kind=test files='-s=../test test_*.py' prefix=share/JobTransforms/test
macro PyJobTransforms_TestConfiguration "../test/PyJobTransforms_TestConfiguration.xml"
apply_pattern declare_runtime_extras extras="../test/PyJobTransforms_TestConfiguration.xml"
# Now make sure we generate the signature file with transform arguments in it
## DEPRECATED
#apply_pattern generic_declare_for_link kind=json files="../share/$(package)Signatures.json" prefix=share/JobTransforms
#
#private
#action makeTrfSignatures "../scripts/makeTrfSignatures.py --output ../share/$(package)Signatures.json"
#macro_append makeTrfSignatures_dependencies " install_tfs_jop install_python_modules "
#macro_append all_dependencies " makeTrfSignatures "
#macro_append check_install_json_dependencies " makeTrfSignatures "
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
This diff is collapsed.
This diff is collapsed.
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
## @package PyJobTransforms.trfAMI
# @brief Utilities for configuration of transforms via AMI tags
# @author atlas-comp-transforms-dev@cern.ch
# @version $Id$
import ast
import os
import traceback
import logging
msg = logging.getLogger(__name__)
from PyJobTransforms.trfExceptions import TransformAMIException
from PyJobTransforms.trfDefaultFiles import getInputFileName, getOutputFileName
from PyJobTransforms.trfExitCodes import trfExit
errCode=trfExit.nameToCode('TRF_AMI_ERROR')
## @brief Stores the configuration of a transform
class TrfConfig:
def __init__(self):
self.name=None
self.release=None
self.physics={}
self.inFiles={}
self.outFiles={}
self.inDS=None
self.outfmts=[]
def __iter__(self):
theDict=self.inFiles.copy()
theDict.update(self.outFiles)
theDict.update(self.physics)
for (k,v) in theDict.iteritems():
yield k,v
def __str__(self):
string = 'asetup '+self.release+'\n'+self.name
string += self._str_to_dict(self.physics) +'\n'
string +='\nInput file arguments:\n'
if self.inFiles:
string += self._str_to_dict(self.inFiles) +'\n'
if self.inDS:
string +='\nExample input dataset: '+ self.inDS + '\n'
string +='\nOutput file arguments:\n'
if self.outFiles:
string += self._str_to_dict(self.outFiles) + '\n'
if self.outfmts:
string += '\nPossible output data types: '+ str(self.outfmts) + '\n'
return string
def _str_to_dict(self,adict):
string=''
for (k,v) in adict.iteritems():
string +=" "+k+"='"+v.replace("'", "\\'")+"'"
return string
## @brief Stores the information about a given tag.
class TagInfo:
def __init__(self,tag):
self._tag=tag
self._isProdSys=None
self._trfs=None
@property
def isProdSys(self):
if self._isProdSys is None:
prodtags=getProdSysTagsCharacters()
if self._tag[0] in prodtags:
self._isProdSys=True
else:
self._isProdSys=False
return self._isProdSys
@property
def trfs(self):
if self._trfs is None:
if self.isProdSys:
self._trfs=getTrfConfigFromPANDA(self._tag)
else:
self._trfs=getTrfConfigFromAMI(self._tag)
return self._trfs
def __str__(self):
string = '\nInformation about tag '+self._tag+':\n'
if self.isProdSys:
string +='This is a ProdSys tag. Input and output file arguments are likely to be missing because they are often not part of the tag definition.\n'
else:
string +='This is a T0 tag.\n'
string +='This tag consists of ' + str(len(self.trfs)) + ' transform command(s).\n'
string += 'Transform commands follow below.\n'
string += 'Input and output file names (if present) are only suggestions.\n'
for trf in self.trfs:
string+='\n'+str(trf)+'\n'
return string
def dump(self, file):
pass # not yet implemented
## @brief Get AMI client
# @param useReplica If @c True CERN replica is used instead of primary AMI.
# @returns pyAMI.client.AMIClient instance
def getAMIClient(useReplica=False):
msg.debug('Getting AMI client...')
try:
from pyAMI.client import AMIClient
from pyAMI.auth import AMI_CONFIG
from pyAMI.exceptions import AMI_Error
from pyAMI import endpoint
from pyAMI.endpoint import get_endpoint, get_XSL_URL
except ImportError:
raise TransformAMIException(errCode, 'Import of pyAMI modules failed.')
if useReplica:
endpoint.TYPE = 'replica'
else:
endpoint.TYPE = 'main'
msg.debug('Using endpoint %s ' % get_endpoint())
msg.debug('Using xsl_url %s ' % get_XSL_URL())
amiclient = AMIClient()
return amiclient
## @brief Get list of characters of ProdSys tags
# @returns list of characters
def getProdSysTagsCharacters():
msg.debug('Getting list of ProdSys tag characters...')
defaultList=['y', 'p', 'e', 's', 'd', 'r', 't', 'a', 'b', 'w']
argv=["SearchQuery"]
argv.append("-sql=select productionStep.productionStepTag FROM productionStep WHERE ( ( productionStep.writeStatus LIKE 'valid%') AND productionStep.actor = 'TR')")
argv.append("project=Atlas_Production")
argv.append("processingStep=Atlas_Production")
try:
from pyAMI.exceptions import AMI_Error
except ImportError:
msg.warning('Import of pyAMI modules failed (is your release setup correctly?).')
msg.warning('Returning default list of ProdSys tags.')
return defaultList
try:
amiclient=getAMIClient(False)
result=amiclient.execute(argv)
except (AMI_Error, TransformAMIException):
msg.debug('An exception occured: %s' % traceback.format_exc())
msg.warning('Getting ProdSysTags from primary AMI failed. Trying CERN replica.')
try:
amiclient=getAMIClient(True)
result=amiclient.execute(argv)
except (AMI_Error, TransformAMIException):
msg.debug('An exception occured: %s' % traceback.format_exc())
msg.warning('Getting ProdSysTags from CERN replica failed (do you have the necessary credentials to access AMI?).')
msg.warning('Returning default list of ProdSysTags.')
return defaultList
return [ row['productionStepTag'] for row in result.rows() ]
## @brief Get PANDA client
# @returns cx_Oracle cursor instance
def getPANDAClient():
msg.debug('Getting PANDA client...')
try:
import cx_Oracle
except ImportError:
raise TransformAMIException(errCode, 'Import of cx_Oracle failed (is Oracle setup on this machine?).')
try:
cur = cx_Oracle.connect('atlas_grisli_r/panda_c10@adcr_panda').cursor()
except:
msg.debug('An exception occurred while connecting to PANDA database: %s' % traceback.format_exc())
raise TransformAMIException(errCode, 'Failed to get PANDA client connection (N.B. this does not work from outside CERN).')
return cur
## @brief Un-escape information from PANDA
# @detail Provided by Pavel.
def ReadablePANDA(s):
return s.replace('%0B',' ').replace('%9B','; ').replace('%8B','"').replace('%3B',';').replace('%2C',',').replace('%2B','+')
## @brief Get information about a ProdSys tag from PANDA
# @param tag Tag for which information is requested
# @returns list of PyJoCbTransforms.trfAMI.TRFConfig instances
def getTrfConfigFromPANDA(tag):
msg.debug('Using PANDA to get info about tag %s' % tag)
try:
pandaclient=getPANDAClient()
pandaclient.execute("select trf,trfv,lparams,vparams,formats,cache from t_trf_config where tag='%s' and cid=%d" %(tag[:1],int(tag[1:]) ) )
result=pandaclient.fetchone()
except:
msg.info('An exception occurred: %s' % traceback.format_exc())
raise TransformAMIException(errCode, 'Getting tag info from PANDA failed.')
if result is None:
raise TransformAMIException(errCode, 'Tag %s not found in PANDA database' % tag)
msg.debug('Raw data returned from panda DB is:' + os.linesep + str(result))
trfn=result[0].split(',')
msg.debug('List of transforms: %s' % trfn)
trfv=result[1].split(',')
msg.debug('List of releases: %s' % trfv)
lparams=result[2].split(';')
msg.debug('List of arguments: %s' % lparams)
vparams=result[3].split(';')
msg.debug('List of argument values: %s' % vparams)
formats=result[4].split('.')
msg.debug('List of formats: %s' % formats)
cache=result[5].split(',')
msg.debug('List of caches: %s' % formats)
if not ( len(trfn) == len(trfv) == len(lparams) == len(vparams) ):
raise TransformAMIException(errCode, 'Inconsistency in number of trfs.')
# Cache can be a single value, in which case it needs replicated for other
# transform steps, or it can be multivalued - great schema design guys :-(
if len(cache) != len(trfv):
if len(cache) == 1:
cache = cache * len(trfv)
else:
raise TransformAMIException(errCode, 'Inconsistency in number of caches entries vs. release numbers ({0}; {1}).'.format(cache, trfv))
listOfTrfs=[]
for iTrf in range(len(trfn)):
trf = TrfConfig()
trf.name =trfn[iTrf]
trf.release=trfv[iTrf] + "," + cache[iTrf]
keys=lparams[iTrf].split(',')
values=vparams[iTrf].split(',')
if ( len(keys) != len(values) ):
raise TransformAMIException(errCode, 'Inconsistency in number of arguments.')
physics = dict( (k, ReadablePANDA(v) ) for (k,v) in zip(keys, values))
# Hack to correct trigger keys being stored with spaces in panda
for k, v in physics.iteritems():
if 'triggerConfig' in k or 'triggerConfigByRun' in k:
if ' ' in v:
physics[k] = v.replace(' ', ',')
msg.warning('Attempted to correct illegal trigger configuration string: {0} -> {1}'.format(v, physics[k]))
msg.debug("Checking for pseudo-argument internal to ProdSys...")
if 'extraParameter' in physics:
val=physics.pop('extraParameter')
msg.debug("Removed extraParamater=%s from arguments." % val)
msg.debug("Checking for input/output file arguments...")
for arg in physics.keys():
if arg.lstrip('-').startswith('input') and arg.endswith('File'):
value=physics.pop(arg)
msg.debug("Found input file argument %s=%s." % (arg,value) )
fmt=arg.lstrip('-').replace('input','').replace('File','')
trf.inFiles[arg]=getInputFileName(arg)
elif arg.lstrip('-').startswith('output') and arg.endswith('File'):
value=physics.pop(arg)
msg.debug("Found output file argument %s=%s." % (arg,value) )
fmt=arg.lstrip('-').replace('output','').replace('File','')
trf.outFiles[arg]=getOutputFileName(fmt)
msg.debug("Checking for not set arguments...")
for arg,value in physics.items():
if value=="NONE" or value=="none":
val=physics.pop(arg)
msg.debug("Removed %s=%s from arguments." % (arg, val) )
trf.physics=physics
listOfTrfs.append(trf)
listOfTrfs[0].inDS=None # not yet implemented
listOfTrfs[-1].outfmts=formats
return listOfTrfs
## @brief Get information about a T0 tag from AMI
# @param tag Tag for which information is requested
# @returns list of PyJoCbTransforms.trfAMI.TRFConfig instances
def getTrfConfigFromAMI(tag):
msg.debug('Using AMI to get info about tag %s' % tag)
try:
from pyAMI.exceptions import AMI_Error
from pyAMI.query import get_configtags
except ImportError:
raise TransformAMIException(errCode, 'Import of pyAMI modules failed.')
try:
amiclient=getAMIClient(False)
result=get_configtags(amiclient, tag)
except (AMI_Error, TransformAMIException) as e:
if 'Invalid configTag found' in e.args[0]:
raise TransformAMIException(errCode, 'Tag %s not found in AMI database.' % tag)
msg.debug('An exception occured: %s' % traceback.format_exc())
msg.warning('Getting tag info from primary AMI failed. Trying CERN replica.')
try:
amiclient=getAMIClient(True)
result=get_configtags(amiclient, tag)
except (AMI_Error, TransformAMIException):
msg.debug('An exception occured: %s' % traceback.format_exc())
raise TransformAMIException(errCode, 'Getting tag info from AMI failed.')
msg.debug('Raw result from AMI is: %s ' % result)
if ( result[0]!={'amiTag': tag } and result[0]!={'configTag': tag }):
msg.warning('Got unexpected result from AMI: %s when asking for tag %s' % (result[0],tag))
raise TransformAMIException(errCode, 'Getting tag info from AMI failed.')
trf = TrfConfig()
trf.name=result[1]['transformation']
trf.release=result[1]['SWReleaseCache'].replace('AtlasProduction-','')
trf.physics=dict( (k, str(v)) for (k,v) in ast.literal_eval(result[1]['phconfig']).iteritems() )
trf.inFiles=dict( (k, getInputFileName(k)) for k in ast.literal_eval(result[1]['inputs']).iterkeys() )
outputs=ast.literal_eval(result[1]['outputs'])
trf.outFiles=dict( (k, getOutputFileName(outputs[k]['dstype']) ) for k in outputs.iterkeys() )
trf.outfmts=[ outputs[k]['dstype'] for k in outputs.iterkeys() ]
return [ trf ]
This diff is collapsed.
This diff is collapsed.
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration
## @Package PyJobTrasforms.trfDecorators
# @brief Some useful decorators used by the transforms
# @author atlas-comp-transforms-dev@cern.ch
# @version $Id: trfDecorators.py 590263 2014-03-31 13:01:37Z graemes $
import functools
import os
import Queue
import sys
import time
import unittest
import PyJobTransforms.trfUtils as trfUtils
from PyJobTransforms.trfExitCodes import trfExit
from PyJobTransforms.trfLogger import logging
## @brief Redirect stdout/err to /dev/null
# Useful wrapper to get rid of ROOT verbosity...
# N.B. May be somewhat dangerous in its present form - all errors disappear
# even ones you might want to see :-)
def silent(func):
def silent_running(*args, **kwargs):
# Create some filehandles to save the stdout/err fds to
save_err = open('/dev/null', 'w')
save_out = open('/dev/null', 'w')
os.dup2(sys.stderr.fileno(), save_err.fileno())
os.dup2(sys.stdout.fileno(), save_out.fileno())
# Now open 'quiet' file handles and attach stdout/err
quiet_err = open('/dev/null', 'w')
quiet_out = open('/dev/null', 'w')
os.dup2(quiet_err.fileno(), sys.stderr.fileno())
os.dup2(quiet_out.fileno(), sys.stdout.fileno())
# Execute function
rc = func(*args, **kwargs)
# Restore fds
os.dup2(save_err.fileno(), sys.stderr.fileno())
os.dup2(save_out.fileno(), sys.stdout.fileno())
return rc
# Make the wrapper look like the wrapped function
functools.update_wrapper(silent_running, func)
return silent_running
## @brief Decorator to wrap a transform in outer try: ... except: ...
def stdTrfExceptionHandler(func):
def exception_wrapper(*args, **kwargs):
# Setup imports which the wrapper needs
import signal
import traceback
import logging
msg = logging.getLogger(__name__)
import PyJobTransforms.trfExceptions as trfExceptions
from PyJobTransforms.trfExitCodes import trfExit
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
msg.critical('Caught a keyboard interrupt - exiting at your request.')
trfUtils.infanticide(message=True)
sys.exit(128 + signal.SIGINT)
# This subclass is treated as a 'normal' exit condition
# but it should never happen in production as it's a transform definition error
except trfExceptions.TransformSetupException, e:
msg.critical('Transform setup failed: {0}'.format(e.errMsg))
msg.critical('To help you debug here is the stack trace:')
msg.critical(traceback.format_exc(None))
msg.critical('(Early exit - no job report is produced)')
trfUtils.infanticide(message=True)
sys.exit(e.errCode)
except trfExceptions.TransformException, e:
msg.critical('Got a transform exception in the outer exception handler: {0!s}'.format(e))
msg.critical('Stack trace is...')
msg.critical(traceback.format_exc(None))
msg.critical('Job reports are likely to be missing or incomplete - sorry')
msg.critical('Please report this as a transforms bug!')
trfUtils.infanticide(message=True)
sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_TRF_EXCEPTION'))
except Exception, e:
msg.critical('Got a general exception in the outer exception handler: {0!s}'.format(e))
msg.critical('Stack trace is...')
msg.critical(traceback.format_exc(None))
msg.critical('Job reports are likely to be missing or incomplete - sorry')
msg.critical('Please report this as a transforms bug!')
trfUtils.infanticide(message=True)
sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_OTHER_EXCEPTION'))
functools.update_wrapper(exception_wrapper, func)
return exception_wrapper
## @brief Decorator to dump a stack trace when hit by SIGUSR
# Note that this decorator has to go inside the stdTrfExceptionHandler
# Or the general exception catcher catches the SigUser exception.
def sigUsrStackTrace(func):
import os
import signal
import traceback
import logging
msg = logging.getLogger(__name__)
class SigUsr1(Exception):
pass
def sigHandler(signum, frame):
msg.info('Handling signal %d in sigHandler' % signum)
raise SigUsr1
def signal_wrapper(*args, **kwargs):
signal.signal(signal.SIGUSR1, sigHandler)
try:
return func(*args, **kwargs)
except SigUsr1:
msg.error('Transform received SIGUSR1. Exiting now with stack trace...')
msg.error('(The important frame is usually the one before this trfDecorators module.)')
msg.error(traceback.format_exc(None))
trfUtils.infanticide(message=True)
sys.exit(128 + signal.SIGUSR1)
functools.update_wrapper(signal_wrapper, func)
return signal_wrapper
def timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None):
import traceback
import Queue
import multiprocessing as mp
from sys import exc_info
from PyJobTransforms.trfExceptions import TransformTimeoutException
msg = logging.getLogger(__name__)
def internal(func):
## @brief Run our wrapped function on the multiprocess queue
# @detail Run wrapper function and use the message queue to communicate status and result
# @return None. However, on the message queue add a tuple with two members:
# - @c key, which is True if function exited properly, False if an exception occurred
# - @c result, which is the output of the function or a tuple of exception information
def funcWithQueue(queue, *args, **kwargs):
try:
result = func(*args, **kwargs)
queue.put((True, result))
except:
exc0=exc_info()[0]
exc1=exc_info()[1]
exc2=traceback.format_exc()
msg.warning('In time limited function %s an exception occurred' % (func.func_name))
msg.warning('Original traceback:')
msg.warning(exc2)
queue.put((False,(exc0, exc1, exc2)))
def funcWithTimeout(*args, **kwargs):
ltimeout=timeout
lretry=retry
ltimefactor=timefactor
lsleeptime=sleeptime
ldefaultrc=defaultrc
if 'timeout' in kwargs:
ltimeout=kwargs.pop('timeout')
if 'retry' in kwargs:
lretry=kwargs.pop('retry')
if 'timefactor' in kwargs:
ltimefactor=kwargs.pop('timefactor')
if 'sleeptime' in kwargs:
lsleeptime=kwargs.pop('sleeptime')