通信开销和计算加速之间不可预测的竞争绝对是这里的问题。你所观察的一切都很好。您是否获得净加速取决于许多因素,并且必须正确量化(就像您所做的那样)。
那么为什么multiprocessing
在你的情况下如此“出乎意料地慢”? multiprocessing
'smap
和map_async
函数实际上通过连接父进程和子进程的管道来回腌制 Python 对象。这可能需要相当长的时间。在那段时间里,子进程几乎无事可做,这就是在htop
. 在不同的系统之间,可能存在相当大的管道传输性能差异,这也是为什么对于某些人来说,你的池代码比你的单 CPU 代码快,尽管对你来说不是(其他因素可能在这里起作用,这只是举例说明效果)。
你能做些什么来让它更快?
不要在符合 POSIX 的系统上腌制输入。
如果您使用的是 Unix,则可以通过利用 POSIX 的进程分叉行为(写入时复制内存)来解决父->子通信开销:
创建您的工作输入(例如大型矩阵列表)以在全局可访问变量中的父进程中工作。然后通过调用multiprocessing.Process()
自己来创建工作进程。在子项中,从全局变量中获取作业输入。简单地说,这使得子进程访问父进程的内存没有任何通信开销(*,解释如下)。通过例如 a 将结果发送回父级multiprocessing.Queue
。这将节省大量的通信开销,尤其是在输出与输入相比较小的情况下。此方法不适用于例如 Windows,因为multiprocessing.Process()
它会创建一个全新的 Python 进程,它不会继承父进程的状态。
使用 numpy 多线程。
根据您的实际计算任务,可能会发生完全multiprocessing
没有帮助的情况。如果您自己编译 numpy 并启用 OpenMP 指令,那么大型矩阵上的操作可能会变得非常高效地多线程(并且分布在许多 CPU 内核上;GIL 在这里不是限制因素)。基本上,这是您可以在 numpy/scipy 的上下文中最有效地使用多个 CPU 内核。
*孩子一般不能直接访问父母的记忆。但是,在 之后fork()
,父母和孩子处于等效状态。将父级的整个内存复制到 RAM 中的另一个位置是愚蠢的。这就是copy-on-write原理跳进来的原因。只要孩子不改变它的内存状态,它实际上访问了父母的内存。只有在修改后,相应的点点滴滴才会被复制到孩子的内存空间中。
主要编辑:
让我添加一段代码,它使用多个工作进程处理大量输入数据,并遵循建议“1. 不要在符合 POSIX 的系统上腌制输入。”。此外,传输回工作管理器(父进程)的信息量非常少。这个例子的繁重计算部分是单值分解。它可以大量使用 OpenMP。我已多次执行该示例:
- 一次有 1、2 或 4 个工作进程
OMP_NUM_THREADS=1
,因此每个工作进程创建的最大负载为 100%。在那里,提到的工人数计算时间扩展行为几乎是线性的,并且净加速因子对应于所涉及的工人数量。
- 一次使用 1、2 或 4 个工作进程
OMP_NUM_THREADS=4
,这样每个进程创建的最大负载为 400%(通过产生 4 个 OpenMP 线程)。我的机器有 16 个真正的核心,因此 4 个进程,每个进程的最大负载为 400%,几乎可以获得机器的最大性能。缩放不再是完全线性的,加速因素不是所涉及的工作人员数量,但OMP_NUM_THREADS=1
与工作进程数量相比,绝对计算时间显着减少,时间仍然显着减少。
- 曾经有更大的输入数据、4 个核心和
OMP_NUM_THREADS=4
. 它导致平均系统负载为 1253%。
- 一次与上次相同的设置,但是
OMP_NUM_THREADS=5
. 它导致平均系统负载为 1598%,这表明我们从那台 16 核机器上得到了一切。然而,与后一种情况相比,实际的计算墙时间并没有改善。
编码:
import os
import time
import math
import numpy as np
from numpy.linalg import svd as svd
import multiprocessing
# If numpy is compiled for OpenMP, then make sure to control
# the number of OpenMP threads via the OMP_NUM_THREADS environment
# variable before running this benchmark.
MATRIX_SIZE = 1000
MATRIX_COUNT = 16
def rnd_matrix():
offset = np.random.randint(1,10)
stretch = 2*np.random.rand()+0.1
return offset + stretch * np.random.rand(MATRIX_SIZE, MATRIX_SIZE)
print "Creating input matrices in parent process."
# Create input in memory. Children access this input.
INPUT = [rnd_matrix() for _ in xrange(MATRIX_COUNT)]
def worker_function(result_queue, worker_index, chunk_boundary):
"""Work on a certain chunk of the globally defined `INPUT` list.
"""
result_chunk = []
for m in INPUT[chunk_boundary[0]:chunk_boundary[1]]:
# Perform single value decomposition (CPU intense).
u, s, v = svd(m)
# Build single numeric value as output.
output = int(np.sum(s))
result_chunk.append(output)
result_queue.put((worker_index, result_chunk))
def work(n_workers=1):
def calc_chunksize(l, n):
"""Rudimentary function to calculate the size of chunks for equal
distribution of a list `l` among `n` workers.
"""
return int(math.ceil(len(l)/float(n)))
# Build boundaries (indices for slicing) for chunks of `INPUT` list.
chunk_size = calc_chunksize(INPUT, n_workers)
chunk_boundaries = [
(i, i+chunk_size) for i in xrange(0, len(INPUT), chunk_size)]
# When n_workers and input list size are of same order of magnitude,
# the above method might have created less chunks than workers available.
if n_workers != len(chunk_boundaries):
return None
result_queue = multiprocessing.Queue()
# Prepare child processes.
children = []
for worker_index in xrange(n_workers):
children.append(
multiprocessing.Process(
target=worker_function,
args=(
result_queue,
worker_index,
chunk_boundaries[worker_index],
)
)
)
# Run child processes.
for c in children:
c.start()
# Create result list of length of `INPUT`. Assign results upon arrival.
results = [None] * len(INPUT)
# Wait for all results to arrive.
for _ in xrange(n_workers):
worker_index, result_chunk = result_queue.get(block=True)
chunk_boundary = chunk_boundaries[worker_index]
# Store the chunk of results just received to the overall result list.
results[chunk_boundary[0]:chunk_boundary[1]] = result_chunk
# Join child processes (clean up zombies).
for c in children:
c.join()
return results
def main():
durations = []
n_children = [1, 2, 4]
for n in n_children:
print "Crunching input with %s child(ren)." % n
t0 = time.time()
result = work(n)
if result is None:
continue
duration = time.time() - t0
print "Result computed by %s child process(es): %s" % (n, result)
print "Duration: %.2f s" % duration
durations.append(duration)
normalized_durations = [durations[0]/d for d in durations]
for n, normdur in zip(n_children, normalized_durations):
print "%s-children speedup: %.2f" % (n, normdur)
if __name__ == '__main__':
main()
输出:
$ export OMP_NUM_THREADS=1
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 16.66 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 8.27 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [5587, 8576, 11566, 12315, 7453, 23245, 6136, 12387, 20634, 10661, 15091, 14090, 11997, 20597, 21991, 7972]
Duration: 4.37 s
1-children speedup: 1.00
2-children speedup: 2.02
4-children speedup: 3.81
48.75user 1.75system 0:30.00elapsed 168%CPU (0avgtext+0avgdata 1007936maxresident)k
0inputs+8outputs (1major+809308minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=4
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 1 child(ren).
Result computed by 1 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 8.62 s
Crunching input with 2 child(ren).
Result computed by 2 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 4.92 s
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [22735, 5932, 15692, 14129, 6953, 12383, 17178, 14896, 16270, 5591, 4174, 5843, 11740, 17430, 15861, 12137]
Duration: 2.95 s
1-children speedup: 1.00
2-children speedup: 1.75
4-children speedup: 2.92
106.72user 3.07system 0:17.19elapsed 638%CPU (0avgtext+0avgdata 1022240maxresident)k
0inputs+8outputs (1major+841915minor)pagefaults 0swaps
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [21762, 26806, 10148, 22947, 20900, 8161, 20168, 17439, 23497, 26360, 6789, 11216, 12769, 23022, 26221, 20480, 19140, 13757, 23692, 19541, 24644, 21251, 21000, 21687, 32187, 5639, 23314, 14678, 18289, 12493, 29766, 14987, 12580, 17988, 20853, 4572, 16538, 13284, 18612, 28617, 19017, 23145, 11183, 21018, 10922, 11709, 27895, 8981]
Duration: 12.69 s
4-children speedup: 1.00
174.03user 4.40system 0:14.23elapsed 1253%CPU (0avgtext+0avgdata 2887456maxresident)k
0inputs+8outputs (1major+1211632minor)pagefaults 0swaps
$ export OMP_NUM_THREADS=5
$ /usr/bin/time python test2.py
Creating input matrices in parent process.
Crunching input with 4 child(ren).
Result computed by 4 child process(es): [19528, 17575, 21792, 24303, 6352, 22422, 25338, 18183, 15895, 19644, 20161, 22556, 24657, 30571, 13940, 18891, 10866, 21363, 20585, 15289, 6732, 10851, 11492, 29146, 12611, 15022, 18967, 25171, 10759, 27283, 30413, 14519, 25456, 18934, 28445, 12768, 28152, 24055, 9285, 26834, 27731, 33398, 10172, 22364, 12117, 14967, 18498, 8111]
Duration: 13.08 s
4-children speedup: 1.00
230.16user 5.98system 0:14.77elapsed 1598%CPU (0avgtext+0avgdata 2898640maxresident)k
0inputs+8outputs (1major+1219611minor)pagefaults 0swaps