1

我按预期使用 map_async - 使用以下方法将可迭代映射到多个处理核心:

cores = mp.cpu_count()
pool = mp.Pool()

r = pool.map_async(func, offsets,callback=mycallback)
r.wait()

func返回一个字典,因此回调使用以下命令“合并”字典:

ddict = defaultdict(set)
def mycallback(w):
    for l in w:
        for key, value in l.items():
            for v in value:
                ddict[key].add(v)  

Offsets 是我用 1,000 - 50,000 个元素测试过的可迭代对象。

如果我删除r.wait(),则无法返回map_async调用的所有输出。

使用r.wait(),我看到处理时间都低于串行实现且无法扩展,即并行实现时间呈指数增长,而串行版本呈线性增长。

我知道这func是足够昂贵的,因为它与我的处理核心挂钩。

我在哪里使用 map_async 引入了开销?它不在回调函数中,因为删除和替换result.append不影响时间。

编辑评论:

  1. 我正在移动大型字典,从 1,000 到 100,000 个元素。该值通常是 3-5 个元素的集合。所以,酸洗肯定是个问题。如果不移动到共享内存中的某些东西,人们会建议哪些替代数据结构?

  2. apply_async使用类似的回调,保存for l in w行,返回大致相同的结果。对于某些问题集,速度略好于 map_async,而对于其他问题集,速度略差。使用托管字典和可连接队列明显更糟。

  3. 一些时间测试。这是使用2个核心。当我添加额外的内核时,我看到了指数级的增长,所以我只能假设这种增长是由进程产生或酸洗以返回数据引起的。

func获取一个数据点并寻找邻居。它在所有情况下都是相同的功能,除了需要传递偏移量来告诉并行代码要搜索哪些数据点。这本质上是一个 KDTree 搜索功能。

均匀分布

1,000 个数据点:序列号 0.098659992218 | apply_async 0.120759010315 | map_async0.080078125

10,000 个数据点 <====== 只有并行改进 | 序列号 0.507845163345 | apply_async 0.446543931961 | map_async0.477811098099

随机分布

10,000 个数据点:序列号 0.584854841232 | apply_async1.03224301338 | map_async0.948460817337

50,000 个数据点:序列号 3.66075992584 | apply_async4.95467185974 | map_async5.37306404114

4

1 回答 1

3

您可以更改func()为返回集合字典而不是列表字典吗?然后你的回调函数可以这样重写:

def mycallback(w):
    for l in w:
        for key, value in l.items():
            ddict[key].update(value)

这应该有助于串行和并行处理时间。

不幸的是,我认为@Dougal 在线程之间传递数据时对所有数据进行酸洗/取消酸洗是正确的。由于酸洗的开销,将二进制数据写入磁盘并再次读取它可能会更快,而不是在内存中传递它。您可以使用如下格式:

key value1 value2 value3 ...
key2 valueA valueB valueC ...
...

这应该很容易写和读。

于 2013-05-03T00:14:06.283 回答