trfDecorators.py 9.3 KB
Newer Older
Graeme Stewart's avatar
Graeme Stewart committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Copyright (C) 2002-2017 CERN for the benefit of the ATLAS collaboration

## @Package PyJobTrasforms.trfDecorators
#  @brief Some useful decorators used by the transforms
#  @author atlas-comp-transforms-dev@cern.ch
#  @version $Id: trfDecorators.py 590263 2014-03-31 13:01:37Z graemes $

import functools
import os
import Queue
import sys
import time
import unittest

import PyJobTransforms.trfUtils as trfUtils
from PyJobTransforms.trfExitCodes import trfExit

from PyJobTransforms.trfLogger import logging


## @brief Redirect stdout/err to /dev/null
# Useful wrapper to get rid of ROOT verbosity...
# N.B. May be somewhat dangerous in its present form - all errors disappear
# even ones you might want to see :-)
def silent(func):
    def silent_running(*args, **kwargs):
        # Create some filehandles to save the stdout/err fds to    
        save_err  = open('/dev/null', 'w')
        save_out  = open('/dev/null', 'w')
        os.dup2(sys.stderr.fileno(), save_err.fileno())
        os.dup2(sys.stdout.fileno(), save_out.fileno())

        # Now open 'quiet' file handles and attach stdout/err
        quiet_err  = open('/dev/null', 'w')
        quiet_out  = open('/dev/null', 'w')
        os.dup2(quiet_err.fileno(), sys.stderr.fileno())
        os.dup2(quiet_out.fileno(), sys.stdout.fileno())
        
        # Execute function
        rc = func(*args, **kwargs)
        
        # Restore fds
        os.dup2(save_err.fileno(), sys.stderr.fileno())
        os.dup2(save_out.fileno(), sys.stdout.fileno())

        return rc
    # Make the wrapper look like the wrapped function
    functools.update_wrapper(silent_running, func)
    return silent_running


## @brief Decorator to wrap a transform in outer try: ... except: ...
def stdTrfExceptionHandler(func):
    def exception_wrapper(*args, **kwargs):
        # Setup imports which the wrapper needs
        import signal
        import traceback
        import logging
        msg = logging.getLogger(__name__)

        import PyJobTransforms.trfExceptions as trfExceptions
        from PyJobTransforms.trfExitCodes import trfExit

        try:
            return func(*args, **kwargs)
        
        except KeyboardInterrupt:
            msg.critical('Caught a keyboard interrupt - exiting at your request.')
            trfUtils.infanticide(message=True)
            sys.exit(128 + signal.SIGINT)
            
        # This subclass is treated as a 'normal' exit condition
        # but it should never happen in production as it's a transform definition error
        except trfExceptions.TransformSetupException, e:
            msg.critical('Transform setup failed: {0}'.format(e.errMsg))
            msg.critical('To help you debug here is the stack trace:')
            msg.critical(traceback.format_exc(None))
            msg.critical('(Early exit - no job report is produced)')
            trfUtils.infanticide(message=True)
            sys.exit(e.errCode)

        except trfExceptions.TransformException, e:
            msg.critical('Got a transform exception in the outer exception handler: {0!s}'.format(e))
            msg.critical('Stack trace is...')
            msg.critical(traceback.format_exc(None))
            msg.critical('Job reports are likely to be missing or incomplete - sorry')
            msg.critical('Please report this as a transforms bug!')
            trfUtils.infanticide(message=True)
            sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_TRF_EXCEPTION'))
            
        except Exception, e:
            msg.critical('Got a general exception in the outer exception handler: {0!s}'.format(e))
            msg.critical('Stack trace is...')
            msg.critical(traceback.format_exc(None))
            msg.critical('Job reports are likely to be missing or incomplete - sorry')
            msg.critical('Please report this as a transforms bug!')
            trfUtils.infanticide(message=True)
            sys.exit(trfExit.nameToCode('TRF_UNEXPECTED_OTHER_EXCEPTION'))
            
    functools.update_wrapper(exception_wrapper, func)
    return exception_wrapper


## @brief Decorator to dump a stack trace when hit by SIGUSR
# Note that this decorator has to go inside the stdTrfExceptionHandler
# Or the general exception catcher catches the SigUser exception.
def sigUsrStackTrace(func):
    import os
    import signal
    import traceback
    import logging
    msg = logging.getLogger(__name__)
    
    class SigUsr1(Exception):
        pass
    
    def sigHandler(signum, frame):
        msg.info('Handling signal %d in sigHandler' % signum)
        raise SigUsr1
    
    def signal_wrapper(*args, **kwargs):
        signal.signal(signal.SIGUSR1, sigHandler)
        
        try:
            return func(*args, **kwargs)
        
        except SigUsr1:
            msg.error('Transform received SIGUSR1. Exiting now with stack trace...')
            msg.error('(The important frame is usually the one before this trfDecorators module.)')
            msg.error(traceback.format_exc(None))
            trfUtils.infanticide(message=True)
            sys.exit(128 + signal.SIGUSR1)  
            
    functools.update_wrapper(signal_wrapper, func)
    return signal_wrapper



def timelimited(timeout=None, retry=1, timefactor=1.5, sleeptime=10, defaultrc=None):

    import traceback
    import Queue
    import multiprocessing as mp

    from sys import exc_info
    from PyJobTransforms.trfExceptions import TransformTimeoutException

    msg = logging.getLogger(__name__)

    def internal(func):
        
        ## @brief Run our wrapped function on the multiprocess queue
        #  @detail Run wrapper function and use the message queue to communicate status and result
        #  @return None. However, on the message queue add a tuple with two members:
        #   - @c key, which is True if function exited properly, False if an exception occurred
        #   - @c result, which is the output of the function or a tuple of exception information
        def funcWithQueue(queue, *args, **kwargs):
            try:
                result = func(*args, **kwargs)
                queue.put((True, result))
            except:
                exc0=exc_info()[0]
                exc1=exc_info()[1]
                exc2=traceback.format_exc()
                msg.warning('In time limited function %s an exception occurred' % (func.func_name))
                msg.warning('Original traceback:')
                msg.warning(exc2)            
                queue.put((False,(exc0, exc1, exc2))) 
            
        def funcWithTimeout(*args, **kwargs):
            ltimeout=timeout
            lretry=retry
            ltimefactor=timefactor
            lsleeptime=sleeptime
            ldefaultrc=defaultrc

            if 'timeout' in kwargs:
                ltimeout=kwargs.pop('timeout')
            if 'retry' in kwargs:
                lretry=kwargs.pop('retry')
            if 'timefactor' in kwargs:    
                ltimefactor=kwargs.pop('timefactor')
            if 'sleeptime' in kwargs:    
                lsleeptime=kwargs.pop('sleeptime')
            if 'defaultrc' in kwargs:    
                ldefaultrc=kwargs.pop('defaultrc')

            if ltimeout is None:
                # Run function normally with no timeout wrapper
                msg.debug('Running {0}: {1} {2} without timeout'.format(func, args, kwargs))
                return func(*args, **kwargs)
                
            n=0
            while n<=lretry:
                msg.info('Try %i out of %i (time limit %s s) to call %s.' % (n+1, retry+1, ltimeout, func.func_name))
                starttime = time.time()
                q=mp.Queue(maxsize=1)
                nargs = (q,) + args
                proc=mp.Process(target=funcWithQueue, args=nargs, kwargs=kwargs)
                proc.start()
                try:
                    # Wait for function to run and return, but with a timeout
                    flag,result = q.get(block=True, timeout=ltimeout)
                    proc.join(60)
                    msg.info('Executed call within %d s.' % (time.time()-starttime))
                    if flag:
                        return result
                    else:
                        msg.warning('But an exception occurred in function %s.' % (func.func_name))
                        msg.warning('Returning default return code %s.' % ldefaultrc)
                        return ldefaultrc
                except Queue.Empty:
                    # Our function did not run in time - kill increase timeout
                    msg.warning('Timeout limit of %d s reached. Kill subprocess and its children.' % ltimeout)
                    parent=proc.pid
                    pids=[parent]
                    pids.extend(trfUtils.listChildren(parent=parent, listOrphans = False))
                    trfUtils.infanticide(pids)
                    proc.join(60) # Ensure cleanup
                    if n!=lretry:
                        msg.info('Going to sleep for %d s.' % lsleeptime)                    
                        time.sleep(lsleeptime)
                    n+=1
                    ltimeout*=ltimefactor
                    lsleeptime*=ltimefactor

            msg.warning('All %i tries failed!' % n)
            raise TransformTimeoutException(trfExit.nameToCode('TRF_EXEC_TIMEOUT'), 'Timeout in function %s' % (func.func_name))
            
        return funcWithTimeout
    
        functools.update_wrapper(funcWithTimeout, func)
        
    return internal