我正在尝试使用 ipython 的并行处理并行处理数据。我正在按照@minrk的说明回答有关如何在 ipython 并行处理中获得中间结果的问题?. 由于数据是异构的,因此某些处理任务比其他任务完成得更快,我想在它们可用时立即保存它们。我按以下方式执行此操作:
from IPython.parallel import Client
def specialfunc(param):
import time
if param > 8:
raise IOError
else:
time.sleep( param)
return param
client = Client()
balanced = client.load_balanced_view()
balanced.block = False
param_list = range(10) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)
然后我可以遍历 asyncmap 并在结果准备好时变为可用:
for i in asyncmap:
print i
问题是我的代码有时会抛出我想处理的异常(上面的示例在调用参数超过 8 时强制出现 IOError)。但是,一旦其中一个引擎出现摇晃,整个异步映射“似乎”就完成了。
我实际上注意到,当我询问 asyncmap.metadata 时,可以很好地找出哪个消息给出了错误(asyncmap.metadata[i]['pyerr']),但是我不知道如何等待结果作为他们是这样。
所以我的问题是我应该如何处理从我的引擎异步到达的结果,即使它们有时会抛出异常。如何在不扰乱控制器中等待结果的情况下捕获引擎中的异常?