我按预期使用 map_async - 使用以下方法将可迭代映射到多个处理核心:
cores = mp.cpu_count()
pool = mp.Pool()
r = pool.map_async(func, offsets,callback=mycallback)
r.wait()
func
返回一个字典,因此回调使用以下命令“合并”字典:
ddict = defaultdict(set)
def mycallback(w):
for l in w:
for key, value in l.items():
for v in value:
ddict[key].add(v)
Offsets 是我用 1,000 - 50,000 个元素测试过的可迭代对象。
如果我删除r.wait()
,则无法返回map_async
调用的所有输出。
使用r.wait()
,我看到处理时间都低于串行实现且无法扩展,即并行实现时间呈指数增长,而串行版本呈线性增长。
我知道这func
是足够昂贵的,因为它与我的处理核心挂钩。
我在哪里使用 map_async 引入了开销?它不在回调函数中,因为删除和替换result.append
不影响时间。
编辑评论:
我正在移动大型字典,从 1,000 到 100,000 个元素。该值通常是 3-5 个元素的集合。所以,酸洗肯定是个问题。如果不移动到共享内存中的某些东西,人们会建议哪些替代数据结构?
apply_async
使用类似的回调,保存for l in w
行,返回大致相同的结果。对于某些问题集,速度略好于 map_async,而对于其他问题集,速度略差。使用托管字典和可连接队列明显更糟。一些时间测试。这是使用2个核心。当我添加额外的内核时,我看到了指数级的增长,所以我只能假设这种增长是由进程产生或酸洗以返回数据引起的。
func
获取一个数据点并寻找邻居。它在所有情况下都是相同的功能,除了需要传递偏移量来告诉并行代码要搜索哪些数据点。这本质上是一个 KDTree 搜索功能。
均匀分布
1,000 个数据点:序列号 0.098659992218 | apply_async
0.120759010315 | map_async
0.080078125
10,000 个数据点 <====== 只有并行改进 | 序列号 0.507845163345 | apply_async
0.446543931961 | map_async
0.477811098099
随机分布
10,000 个数据点:序列号 0.584854841232 | apply_async
1.03224301338 | map_async
0.948460817337
50,000 个数据点:序列号 3.66075992584 | apply_async
4.95467185974 | map_async
5.37306404114