我发现可以使用 Chords 来解决类似地图减少的问题。
@celery.task(name='ic.mapper')
def mapper():
#split your problem in embarrassingly parallel maps
maps = [map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s(), map.s()]
#and put them in a chord that executes them in parallel and after they finish calls 'reduce'
mapreduce = celery.chord(maps)(reduce.s())
return "{0} mapper ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)
@celery.task(name='ic.map')
def map():
#do something useful here
import time
time.sleep(10.0)
return "{0} map ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)
@celery.task(name='ic.reduce')
def reduce(results):
#put the maps together and do something with the results
return "{0} reduce ran on {1}".format(celery.current_task.request.id, celery.current_task.request.hostname)
当映射器在三个工作人员/服务器的集群上执行时,它首先执行映射器,拆分您的问题并创建再次提交给代理的新子任务。这些并行运行,因为队列被所有代理使用。还创建了一个和弦任务,轮询所有地图以查看它们是否已完成。完成后,将执行 reduce 任务,您可以将结果重新组合在一起。
总之:是的,这是可能的。感谢蔬菜大佬!