trfAMI.py 12.7 KB
Newer Older
Graeme Stewart's avatar
Graeme Stewart committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration

## @package PyJobTransforms.trfAMI
#  @brief Utilities for configuration of transforms via AMI tags
#  @author atlas-comp-transforms-dev@cern.ch
#  @version $Id$



import ast
import os
import traceback

import logging
msg = logging.getLogger(__name__)

from PyJobTransforms.trfExceptions import TransformAMIException
from PyJobTransforms.trfDefaultFiles import getInputFileName, getOutputFileName

from PyJobTransforms.trfExitCodes import trfExit
errCode=trfExit.nameToCode('TRF_AMI_ERROR')


## @brief Stores the configuration of a transform 
class TrfConfig:
    def __init__(self):
        self.name=None
        self.release=None
        self.physics={}
        self.inFiles={}
        self.outFiles={}
        self.inDS=None
        self.outfmts=[]

    def __iter__(self):
        theDict=self.inFiles.copy()
        theDict.update(self.outFiles)
        theDict.update(self.physics)
        for (k,v) in theDict.iteritems():
            yield k,v

    def __str__(self):
        string = 'asetup '+self.release+'\n'+self.name
        string += self._str_to_dict(self.physics) +'\n'
        
        string +='\nInput file arguments:\n'
        if self.inFiles:
            string += self._str_to_dict(self.inFiles) +'\n'
        if self.inDS:    
            string +='\nExample input dataset: '+ self.inDS + '\n'

        string +='\nOutput file arguments:\n'
        if self.outFiles:
            string += self._str_to_dict(self.outFiles) + '\n'
        if self.outfmts:    
            string += '\nPossible output data types: '+ str(self.outfmts) + '\n'
        return string

    def _str_to_dict(self,adict):
        string=''
        for (k,v) in adict.iteritems():
            string +=" "+k+"='"+v.replace("'", "\\'")+"'"
        return string

## @brief Stores the information about a given tag.
class TagInfo:
    def __init__(self,tag):
        self._tag=tag
        self._isProdSys=None
        self._trfs=None

    @property    
    def isProdSys(self):
        if self._isProdSys is None:
            prodtags=getProdSysTagsCharacters()
            if self._tag[0] in prodtags:
                self._isProdSys=True
            else:
                self._isProdSys=False
        return self._isProdSys


    @property
    def trfs(self):
        if self._trfs is None:
            if self.isProdSys:
                self._trfs=getTrfConfigFromPANDA(self._tag)
            else:    
                self._trfs=getTrfConfigFromAMI(self._tag)
        return self._trfs


    def __str__(self):
        string = '\nInformation about tag '+self._tag+':\n'

        if self.isProdSys:
            string +='This is a ProdSys tag. Input and output file arguments are likely to be missing because they are often not part of the tag definition.\n'
        else:
            string +='This is a T0 tag.\n'

        string +='This tag consists of ' + str(len(self.trfs)) + ' transform command(s).\n' 
        string += 'Transform commands follow below.\n'
        string += 'Input and output file names (if present) are only suggestions.\n'

        for trf in self.trfs:
            string+='\n'+str(trf)+'\n'

        return string    

    def  dump(self, file):
        pass # not yet implemented
        

## @brief Get AMI client
#  @param useReplica If @c True CERN replica is used instead of primary AMI.
#  @returns pyAMI.client.AMIClient instance
def getAMIClient(useReplica=False):
    msg.debug('Getting AMI client...')
        
    try:
        from pyAMI.client import AMIClient
        from pyAMI.auth import AMI_CONFIG
        from pyAMI.exceptions import AMI_Error
        from pyAMI import endpoint
        from pyAMI.endpoint import get_endpoint, get_XSL_URL
    except ImportError:
        raise TransformAMIException(errCode, 'Import of pyAMI modules failed.')
        
    if useReplica:
        endpoint.TYPE = 'replica'
    else:
        endpoint.TYPE = 'main'
    msg.debug('Using endpoint %s ' % get_endpoint())    
    msg.debug('Using xsl_url %s ' % get_XSL_URL())

    amiclient = AMIClient()
    return amiclient

## @brief Get list of characters of ProdSys tags
#  @returns list of characters
def getProdSysTagsCharacters():

    msg.debug('Getting list of ProdSys tag characters...')

    defaultList=['y', 'p', 'e', 's', 'd', 'r', 't', 'a', 'b', 'w']
        
    argv=["SearchQuery"]
    argv.append("-sql=select productionStep.productionStepTag FROM productionStep WHERE ( ( productionStep.writeStatus LIKE 'valid%') AND productionStep.actor = 'TR')")
    argv.append("project=Atlas_Production")
    argv.append("processingStep=Atlas_Production")

    try:
        from pyAMI.exceptions import AMI_Error
    except ImportError:
        msg.warning('Import of pyAMI modules failed (is your release setup correctly?).')
        msg.warning('Returning default list of ProdSys tags.')
        return defaultList

    try:
        amiclient=getAMIClient(False)
        result=amiclient.execute(argv)
    except (AMI_Error, TransformAMIException):
        msg.debug('An exception occured: %s' % traceback.format_exc())
        msg.warning('Getting ProdSysTags from primary AMI failed. Trying CERN replica.')
        
        try:
            amiclient=getAMIClient(True)
            result=amiclient.execute(argv)
        except (AMI_Error, TransformAMIException):
            msg.debug('An exception occured: %s' % traceback.format_exc())
            msg.warning('Getting ProdSysTags from CERN replica failed (do you have the necessary credentials to access AMI?).')
            msg.warning('Returning default list of ProdSysTags.')
            return defaultList
            
    return [ row['productionStepTag'] for row in result.rows() ]


## @brief Get PANDA client
#  @returns cx_Oracle cursor instance
def getPANDAClient():
    msg.debug('Getting PANDA client...')
    try:
        import cx_Oracle
    except ImportError:
        raise TransformAMIException(errCode, 'Import of cx_Oracle failed (is Oracle setup on this machine?).')
        
    try:
        cur = cx_Oracle.connect('atlas_grisli_r/panda_c10@adcr_panda').cursor()
    except: 
        msg.debug('An exception occurred while connecting to PANDA database: %s' % traceback.format_exc())
        raise TransformAMIException(errCode, 'Failed to get PANDA client connection (N.B. this does not work from outside CERN).')
        
    return cur

## @brief Un-escape information from PANDA
#  @detail Provided by Pavel.
def ReadablePANDA(s):
    return s.replace('%0B',' ').replace('%9B','; ').replace('%8B','"').replace('%3B',';').replace('%2C',',').replace('%2B','+')


## @brief Get information about a ProdSys tag from PANDA
#  @param tag Tag for which information is requested
#  @returns list of PyJoCbTransforms.trfAMI.TRFConfig instances
def getTrfConfigFromPANDA(tag):
    
    msg.debug('Using PANDA to get info about tag %s' % tag)
            
    try:
        pandaclient=getPANDAClient()
        pandaclient.execute("select trf,trfv,lparams,vparams,formats,cache from t_trf_config where tag='%s' and cid=%d" %(tag[:1],int(tag[1:]) ) )
        result=pandaclient.fetchone()
    except:
        msg.info('An exception occurred: %s' % traceback.format_exc())
        raise TransformAMIException(errCode, 'Getting tag info from PANDA failed.')

    if result is None:
        raise TransformAMIException(errCode, 'Tag %s not found in PANDA database' % tag)
    
    msg.debug('Raw data returned from panda DB is:' + os.linesep + str(result))
    
    trfn=result[0].split(',')
    msg.debug('List of transforms: %s' % trfn)
    trfv=result[1].split(',')
    msg.debug('List of releases: %s' % trfv)
    lparams=result[2].split(';')
    msg.debug('List of arguments: %s' % lparams)
    vparams=result[3].split(';')
    msg.debug('List of argument values: %s' % vparams)
    formats=result[4].split('.')
    msg.debug('List of formats: %s' % formats)
    cache=result[5].split(',')
    msg.debug('List of caches: %s' % formats)


    if not ( len(trfn) == len(trfv) == len(lparams) == len(vparams) ):
        raise TransformAMIException(errCode, 'Inconsistency in number of trfs.')
    
    # Cache can be a single value, in which case it needs replicated for other
    # transform steps, or it can be multivalued - great schema design guys :-(
    if len(cache) != len(trfv):
        if len(cache) == 1:
            cache = cache * len(trfv)
        else:
            raise TransformAMIException(errCode, 'Inconsistency in number of caches entries vs. release numbers ({0}; {1}).'.format(cache, trfv))

    listOfTrfs=[]

    for iTrf in range(len(trfn)):

        trf = TrfConfig()
        trf.name =trfn[iTrf]
        trf.release=trfv[iTrf] + "," + cache[iTrf]

        keys=lparams[iTrf].split(',')
        values=vparams[iTrf].split(',')

        if ( len(keys) != len(values) ):
            raise TransformAMIException(errCode, 'Inconsistency in number of arguments.')

        physics = dict( (k, ReadablePANDA(v) ) for (k,v) in zip(keys, values))
        # Hack to correct trigger keys being stored with spaces in panda  
        for k, v in physics.iteritems():
            if 'triggerConfig' in k or 'triggerConfigByRun' in k:
                if ' ' in v:
                    physics[k] = v.replace(' ', ',')
                    msg.warning('Attempted to correct illegal trigger configuration string: {0} -> {1}'.format(v, physics[k]))

        msg.debug("Checking for pseudo-argument internal to ProdSys...")
        if 'extraParameter' in physics:
            val=physics.pop('extraParameter')
            msg.debug("Removed extraParamater=%s from arguments." % val)

        msg.debug("Checking for input/output file arguments...")
        for arg in physics.keys():
            if arg.lstrip('-').startswith('input') and arg.endswith('File'):
                value=physics.pop(arg)
                msg.debug("Found input file argument %s=%s." % (arg,value) ) 
                fmt=arg.lstrip('-').replace('input','').replace('File','')
                trf.inFiles[arg]=getInputFileName(arg)
            elif arg.lstrip('-').startswith('output') and arg.endswith('File'):
                value=physics.pop(arg)
                msg.debug("Found output file argument %s=%s." % (arg,value) )
                fmt=arg.lstrip('-').replace('output','').replace('File','')
                trf.outFiles[arg]=getOutputFileName(fmt)

        msg.debug("Checking for not set arguments...")
        for arg,value in physics.items():
            if value=="NONE" or value=="none":
                val=physics.pop(arg)
                msg.debug("Removed %s=%s from arguments." % (arg, val) )

        trf.physics=physics

        listOfTrfs.append(trf)

    listOfTrfs[0].inDS=None # not yet implemented
    listOfTrfs[-1].outfmts=formats
    
    return listOfTrfs


## @brief Get information about a T0 tag from AMI
#  @param tag Tag for which information is requested
#  @returns list of PyJoCbTransforms.trfAMI.TRFConfig instances
def getTrfConfigFromAMI(tag):

    msg.debug('Using AMI to get info about tag %s' % tag)
    
    try:
        from pyAMI.exceptions import AMI_Error
        from pyAMI.query import get_configtags
    except ImportError:
        raise TransformAMIException(errCode, 'Import of pyAMI modules failed.')
        
    try:
        amiclient=getAMIClient(False)
        result=get_configtags(amiclient, tag)
    except (AMI_Error, TransformAMIException) as e:
        
        if 'Invalid configTag found' in e.args[0]:
            raise TransformAMIException(errCode, 'Tag %s not found in AMI database.' % tag)
            
        msg.debug('An exception occured: %s' % traceback.format_exc())
        msg.warning('Getting tag info from primary AMI failed. Trying CERN replica.')

        try:
            amiclient=getAMIClient(True)
            result=get_configtags(amiclient, tag)
        except (AMI_Error, TransformAMIException):
            msg.debug('An exception occured: %s' % traceback.format_exc())
            raise TransformAMIException(errCode, 'Getting tag info from AMI failed.')

    msg.debug('Raw result from AMI is: %s ' % result)

    if ( result[0]!={'amiTag': tag } and result[0]!={'configTag': tag }):
        msg.warning('Got unexpected result from AMI: %s when asking for tag %s' % (result[0],tag))
        raise TransformAMIException(errCode, 'Getting tag info from AMI failed.')

    trf = TrfConfig()
    trf.name=result[1]['transformation']
    trf.release=result[1]['SWReleaseCache'].replace('AtlasProduction-','')
    trf.physics=dict( (k, str(v)) for (k,v) in ast.literal_eval(result[1]['phconfig']).iteritems() )
    trf.inFiles=dict( (k, getInputFileName(k)) for k in ast.literal_eval(result[1]['inputs']).iterkeys() )
    outputs=ast.literal_eval(result[1]['outputs'])                  
    trf.outFiles=dict( (k, getOutputFileName(outputs[k]['dstype']) ) for k in outputs.iterkeys() )
    trf.outfmts=[ outputs[k]['dstype'] for k in outputs.iterkeys() ]

    return [ trf ]