1

我正在尝试使用 ipython 的并行处理并行处理数据。我正在按照@minrk的说明回答有关如何在 ipython 并行处理中获得中间结果的问题?. 由于数据是异构的,因此某些处理任务比其他任务完成得更快,我想在它们可用时立即保存它们。我按以下方式执行此操作:

from IPython.parallel import Client

def specialfunc(param):
    import time
    if param > 8:
        raise IOError
    else:
        time.sleep( param)
        return param

client = Client()
balanced       = client.load_balanced_view()
balanced.block = False
param_list = range(10)   # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncmap = balanced.map_async(specialfunc, param_list, ordered=False)

然后我可以遍历 asyncmap 并在结果准备好时变为可用:

for i in asyncmap:
    print i

问题是我的代码有时会抛出我想处理的异常(上面的示例在调用参数超过 8 时强制出现 IOError)。但是,一旦其中一个引擎出现摇晃,整个异步映射“似乎”就完成了。

我实际上注意到,当我询问 asyncmap.metadata 时,可以很好地找出哪个消息给出了错误(asyncmap.metadata[i]['pyerr']),但是我不知道如何等待结果作为他们是这样。

所以我的问题是我应该如何处理从我的引擎异步到达的结果,即使它们有时会抛出异常。如何在不扰乱控制器中等待结果的情况下捕获引擎中的异常?

4

2 回答 2

1

我知道这听起来有点愚蠢,但是您可以返回一个特殊值来指示错误,例如-1orNone或字符串。为了解决map_async我所做的就是遍历参数并使用apply_async,将结果存储在列表中。然后,我遍历列表,尝试获取结果并一次处理一个。看起来像这样:

 n_cores = len(c.ids)
 for n,p in enumerate( params ):
     core = c.ids[n%n_cores]
     calls.append( c[core].apply_async( f, p ) )

  #then you get the results

 while calls != []:
      for c in calls:
          try:
               result = c.get(1e-3)
               process(result)
               calls.remove( c )
               #in the case your call failed, you can apply_async again.
               # and append the call to calls.
          except parallel.TimeoutError:
               pass

或者使用c[core].apply()并检查调用c.ready()。基本上一样的东西没有异常处理。烦人的事情是这会占用大量内存,因为每个呼叫的 theresults和 otherdict都很难清除。

我在这里做了类似的事情,我决定 map_async 对我不起作用。如果您决定采用这种方法,这也可能是相关的。

干杯。

PS:我认为基本上这就是您在上面实现的,但我发现单独处理调用然后将它们堆叠到地图中更自然,特别是如果您以后可能想重新处理其中一些。

于 2013-10-23T11:41:55.447 回答
0

ipython/*/examples/parallel/customresults.py的启发,我想出了这个解决方案:

asyncmap = balanced.map(specialfunc, param_list, ordered=False)

#create original mapping of msg_ids to parameters
# maybe just a quick way to find which parameter gave what result
msg_ids_to_parameters = dict(zip(asyncmap.msg_ids, param_list))

pending = set(asyncmap.msg_ids) # all queued jobs are pending
while pending:   # we'll come back as long as finished jobs haven't been looked at yet
    try:
        client.wait(pending, 1e-3)
    except parallel.TimeoutError:
        # ignore timeouterrors, since they only mean that at least one isn't done
        pass

    # finished is the set of msg_ids that are complete
    finished = pending.difference(client.outstanding)
    # update pending to exclude those that just finished
    pending = pending.difference(finished)
    for msg_id in finished:
        # we know these are done, so don't worry about blocking
        ar = client.get_result(msg_id)
        # checking whether any exceptions occurred when code ran on the engine
        if ar.metadata['pyerr'] is None:
            print "job id %s finished on engine %i " % (msg_id, ar.engine_id)
            print "and results for parameter %i :" % msg_ids_to_parameters[msg_id]
            # note that each job in a map always returns a list of length chunksize
            # even if chunksize == 1
            for res in ar.result:
                print " item %i \n" % res
        else:
            print('this went wrong for %i (%s)' % (msg_ids_to_parameters[msg_id], ar.metadata['pyerr']))

本质上,示例代码的变化是查看元数据并查看是否记录了错误,只有在没有记录的情况下才继续并通过ar.result.

于 2013-10-22T19:51:44.027 回答