Forked from
atlas / athena
103445 commits behind the upstream repository.
-
Stewart Martin-Haugh authoredStewart Martin-Haugh authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Decorators.py 6.27 KiB
# Copyright (C) 2002-2019 CERN for the benefit of the ATLAS collaboration
# @author: Sebastien Binet <binet@cern.ch>
# @date: March 2008
# @purpose: a set of decorators. Most of them (if not all) have been stolen
# from here:
# http://www.phyast.pitt.edu/~micheles/python/documentation.html
#
from __future__ import with_statement, print_function
__version__ = "$Revision$"
__author__ = "Sebastien Binet <binet@cern.ch>"
__all__ = [
'memoize',
'forking',
'async_decor',
]
import sys
import os
import itertools
from decorator import *
@decorator
def memoize(func, *args):
"""This decorator implements the memoize pattern, i.e. it caches the result
of a function in a dictionary, so that the next time the function is called
with the same input parameters the result is retrieved from the cache and
not recomputed.
"""
try:
mem_dict = getattr(func, "_mem_dict")
except AttributeError:
# look-up failed so we have to build the cache holder
mem_dict = {}
setattr(func, "_mem_dict", mem_dict)
try:
return mem_dict[args]
except KeyError:
# look-up failed so we have to build the result the first time around
# then we cache
mem_dict[args] = result = func(*args)
return result
# FIXME: does not work... func is an instance of FunctionMaker which cannot
# be pickled...
@decorator
def mp_forking(func, *args, **kwargs):
import multiprocessing as mp
## pool = mp.Pool (processes=1)
## return pool.apply (func, *args, **kwargs)
# create a local queue to fetch the results back
def wrapping(func):
q = mp.Queue()
def wrap_fct(*args, **kwargs):
try:
res = func(*args, **kwargs)
# catch *everything* and 're-raise'
except BaseException as err:
#import traceback; traceback.print_exc()
res = err
q.put(res)
wrap_fct.q = q
return wrap_fct
func = wrapping(func)
proc = mp.Process(target=func, args=args, kwargs=kwargs)
proc.start()
res = func.q.get()
proc.join()
proc.terminate()
if isinstance(res, BaseException):
#import traceback; traceback.print_exc()
raise res
#reraise_exception(exc,exc_info)
return res
def reraise_exception(new_exc, exc_info=None):
if exc_info is None:
exc_info = sys.exc_info()
_exc_class, _exc, tb = exc_info
raise new_exc.__class__ (new_exc, tb)
@decorator
def forking(func, *args, **kwargs):
"""
This decorator implements the forking patterns, i.e. it runs the function
in a forked process.
see:
http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
"""
import os
try:
import cPickle as pickle
except ImportError:
import pickle
# create a pipe which will be shared between parent and child
pread, pwrite = os.pipe()
# do fork
pid = os.fork()
## parent ##
if pid > 0:
os.close(pwrite)
with os.fdopen(pread, 'rb') as f:
status, result = pickle.load(f)
os.waitpid(pid, 0)
if status == 0:
return result
else:
remote_exc = result[0]
reraise_exception(remote_exc)
## child ##
else:
os.close(pread)
try:
result = func(*args, **kwargs)
status = 0
except (Exception, KeyboardInterrupt) as exc:
import traceback
exc_string = traceback.format_exc(limit=10)
for l in exc_string.splitlines():
print ("[%d]"%os.getpid(),l.rstrip())
result = exc, exc_string
status = 1
with os.fdopen(pwrite, 'wb') as f:
try:
pickle.dump((status,result), f, pickle.HIGHEST_PROTOCOL)
except pickle.PicklingError as exc:
pickle.dump((2,exc), f, pickle.HIGHEST_PROTOCOL)
os._exit(0)
pass # forking
### a decorator converting blocking functions into asynchronous functions
# stolen from http://pypi.python.org/pypi/decorator/3.0.0
def _async_on_success(result): # default implementation
"Called on the result of the function"
return result
def _async_on_failure(exc_info): # default implementation
"Called if the function fails"
_exc_class, _exc, tb = exc_info
raise _exc_class (_exc, tb)
pass
def _async_on_closing(): # default implementation
"Called at the end, both in case of success and failure"
pass
class Async(object):
"""
A decorator converting blocking functions into asynchronous
functions, by using threads or processes. Examples:
async_with_threads = Async(threading.Thread)
async_with_processes = Async(multiprocessing.Process)
"""
def __init__(self, threadfactory):
self.threadfactory = threadfactory
def __call__(self, func,
on_success=_async_on_success,
on_failure=_async_on_failure,
on_closing=_async_on_closing):
# every decorated function has its own independent thread counter
func.counter = itertools.count(1)
func.on_success = on_success
func.on_failure = on_failure
func.on_closing = on_closing
return decorator(self.call, func)
def call(self, func, *args, **kw):
def func_wrapper():
try:
result = func(*args, **kw)
except:
func.on_failure(sys.exc_info())
else:
return func.on_success(result)
finally:
func.on_closing()
name = '%s-%s' % (func.__name__, next(func.counter))
thread = self.threadfactory(None, func_wrapper, name)
thread.start()
return thread
# default async decorator: using processes
def async_decor(async_type='mp'):
if async_type in ("mp", "multiprocessing"):
from multiprocessing import Process
factory = Process
elif async_type in ("th", "threading"):
from threading import Thread
factory = Thread
else:
raise ValueError ("async_type must be either 'multiprocessing' "
"or 'threading' (got: %s)"%async_type)
async_obj = Async (factory)
return async_obj