9

multiprocessing我想从这个例子开始尝试不同的使用方式:

$ cat multi_bad.py 
import multiprocessing as mp
from time import sleep
from random import randint

def f(l, t):
#   sleep(30)
    return sum(x < t for x in l)

if __name__ == '__main__':
    l = [randint(1, 1000) for _ in range(25000)]
    t = [randint(1, 1000) for _ in range(4)]
#   sleep(15)
    pool = mp.Pool(processes=4)
    result = pool.starmap_async(f, [(l, x) for x in t])
    print(result.get())

l是一个列表,当产生 4 个进程时,该列表被复制 4 次。为避免这种情况,文档页面提供了使用队列、共享数组或使用multiprocessing.Manager. 对于最后一个,我更改了以下定义l

$ diff multi_bad.py multi_good.py 
10c10,11
<     l = [randint(1, 1000) for _ in range(25000)]
---
>     man = mp.Manager()
>     l = man.list([randint(1, 1000) for _ in range(25000)])

结果看起来仍然正确,但是执行时间急剧增加,以至于我认为我做错了什么:

$ time python multi_bad.py 
[17867, 11103, 2021, 17918]

real    0m0.247s
user    0m0.183s
sys 0m0.010s

$ time python multi_good.py 
[3609, 20277, 7799, 24262]

real    0m15.108s
user    0m28.092s
sys 0m6.320s

文档确实说这种方式比共享数组慢,但这感觉不对。我也不确定如何对此进行分析以获取有关正在发生的事情的更多信息。我错过了什么吗?

PS 使用共享阵列,我得到的时间低于 0.25 秒。

PPS 这是在 Linux 和 Python 3.3 上。

4

2 回答 2

9

当子进程被编辑时, Linux 使用写时复制。os.fork展示:

import multiprocessing as mp
import numpy as np
import logging
import os

logger = mp.log_to_stderr(logging.WARNING)

def free_memory():
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u = unit))
                total += amount
    return total

def worker(i):
    x = data[i,:].sum()    # Exercise access to data
    logger.warn('Free memory: {m}'.format(m = free_memory()))

def main():
    procs = [mp.Process(target = worker, args = (i, )) for i in range(4)]
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()

logger.warn('Initial free: {m}'.format(m = free_memory()))
N = 15000
data = np.ones((N,N))
logger.warn('After allocating data: {m}'.format(m = free_memory()))

if __name__ == '__main__':
    main()

这产生了

[WARNING/MainProcess] Initial free: 2522340
[WARNING/MainProcess] After allocating data: 763248
[WARNING/Process-1] Free memory: 760852
[WARNING/Process-2] Free memory: 757652
[WARNING/Process-3] Free memory: 757264
[WARNING/Process-4] Free memory: 756760

这表明最初大约有 2.5GB 的可用内存。分配 15000x15000 的float64s 数组后,有 763248 KB 可用空间。这大致是有道理的,因为 15000**2*8 字节 = 1.8GB 并且内存下降,2.5GB - 0.763248GB 也大约为 1.8GB。

现在,在每个进程生成后,可用内存再次报告为 ~750MB。可用内存没有显着减少,因此我得出结论,系统必须使用写时复制。

结论:如果您不需要修改数据,在__main__模块的全局级别定义它是在子进程之间共享数据的一种方便且(至少在 Linux 上)内存友好的方式。

于 2012-10-29T19:35:28.143 回答
6

这是意料之中的,因为访问共享对象意味着必须腌制请求,通过某种信号/系统调用发送它,取消腌制请求执行它并以相同的方式返回结果。

基本上你应该尽量避免共享内存。这导致更多可调试代码(因为您的并发性要少得多)并且速度更快。

只有在真正需要时才应该使用共享内存(例如,共享千兆字节的数据,以便复制它需要太多的 RAM,或者如果进程应该能够通过这个共享内存进行交互)。

附带说明一下,使用管理器可能比共享数组慢得多,因为管理器必须能够处理任何 PyObject * 并因此必须腌制/取消腌制等,而数组可以避免大部分这种开销。

从多处理的文档中:

管理器提供了一种方法来创建可以在不同进程之间共享的数据。管理器对象控制管理共享对象的服务器进程。其他进程可以使用代理访问共享对象。

因此,使用 Manager 意味着生成一个仅用于处理共享内存的新进程,这可能就是它需要更多时间的原因。

如果您尝试分析代理的速度,它会比非共享列表慢很多:

>>> import timeit
>>> import multiprocessing as mp
>>> man = mp.Manager()
>>> L = man.list(range(25000))
>>> timeit.timeit('L[0]', 'from __main__ import L')
50.490395069122314
>>> L = list(range(25000))
>>> timeit.timeit('L[0]', 'from __main__ import L')
0.03588080406188965
>>> 50.490395069122314 / _
1407.1701119638526

虽然 anArray并没有那么慢:

>>> L = mp.Array('i', range(25000))
>>> timeit.timeit('L[0]', 'from __main__ import L')
0.6133401393890381
>>> 0.6133401393890381 / 0.03588080406188965
17.09382371507359

由于非常基本的操作很慢,并且认为加快它们的希望不大,这意味着如果您必须共享大量数据并希望快速访问它,那么您应该使用Array.

可能会加快速度的方法是一次访问多个元素(例如,获取切片而不是单个元素),但取决于您想要执行的操作,这可能会也可能不会。

于 2012-10-29T12:43:12.840 回答