我有以下代码。
这使用了一个名为 decorator 的 python 模块。
from multiprocessing import Pool
from random import randint
import traceback
import decorator
import time
def test_retry(number_of_retry_attempts=1, **kwargs):
timeout = kwargs.get('timeout', 2.0) # seconds
@decorator.decorator
def tryIt(func, *fargs, **fkwargs):
for _ in xrange(number_of_retry_attempts):
try: return func(*fargs, **fkwargs)
except:
tb = traceback.format_exc()
if timeout is not None:
time.sleep(timeout)
print 'Catching exception %s. Attempting retry: '%(tb)
raise
return tryIt
装饰器模块帮助我装饰我的数据仓库调用函数。所以我不需要处理连接丢失和各种基于连接的问题,并允许我重置连接并在超时后重试。我用这种方法装饰了我所有的数据仓库读取功能,所以我可以免费重试。
我有以下方法。
def process_generator(data):
#Process the generated data
def generator():
data = data_warhouse_fetch_method()#This is the actual method which needs retry
yield data
@test_retry(number_of_retry_attempts=2,timeout=1.0)
def data_warhouse_fetch_method():
#Fetch the data from data-warehouse
pass
我尝试使用这样的多处理模块对我的代码进行多处理。
try:
pool = Pool(processes=2)
result = pool.imap_unordered(process_generator,generator())
except Exception as exception:
print 'Do some post processing stuff'
tb = traceback.format_exc()
print tb
当一切顺利时,一切都是正常的。当它在重试次数内自行修复时,情况也很正常。但是一旦重试次数超过我会在 test_retry 方法中引发异常,该异常不会在主进程中被捕获。进程死亡,主进程分叉的进程被保留为孤儿。可能是我在这里做错了什么。我正在寻找一些帮助来解决以下问题。将异常传播到父进程,以便我可以处理异常并使我的孩子优雅地死去。另外我想知道如何通知子进程优雅地死去。在此先感谢您的帮助 。
编辑:添加了更多代码来解释。
def test_retry(number_of_retry_attempts=1, **kwargs):
timeout = kwargs.get('timeout', 2.0) # seconds
@decorator.decorator
def tryIt(func, *fargs, **fkwargs):
for _ in xrange(number_of_retry_attempts):
try: return func(*fargs, **fkwargs)
except:
tb = traceback.format_exc()
if timeout is not None:
time.sleep(timeout)
print 'Catching exception %s. Attempting retry: '%(tb)
raise
return tryIt
@test_retry(number_of_retry_attempts=2,timeout=1.0)
def bad_method():
sample_list =[]
return sample_list[0] #This will result in an exception
def process_generator(number):
if isinstance(number,int):
return number+1
else:
raise
def generator():
for i in range(20):
if i%10 == 0 :
yield bad_method()
else:
yield i
try:
pool = Pool(processes=2)
result = pool.imap_unordered(process_generator,generator())
pool.close()
#pool.join()
for r in result:
print r
except Exception, e: #Hoping the generator will catch the exception. But not .
print 'got exception: %r, terminating the pool' % (e,)
pool.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
pool.join()
print 'join complete'
print 'the end'
实际问题归结为如果生成器抛出异常,我无法在包含 pool.imap_unordered() 方法的 except 子句中捕获生成器抛出的异常。所以在抛出异常后,主进程被卡住,子进程永远等待。不确定我在这里做错了什么。