这些是我付诸行动的意见。我希望您的实际任务是更复杂的计算,否则几乎不值得使用多处理。
import numpy as np
import multiprocessing
from functools import partial
from heapq import *
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
return val
def main():
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
chunk_size = n // cpu_count
HEAPSIZE = 8
with multiprocessing.Pool(cpu_count) as pool:
heap = []
for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
if len(heap) < HEAPSIZE:
heappush(heap, val)
elif val > heap[0]:
heappushpop(heap, val)
# sort
values = sorted(heap, reverse=True)
print(values)
if __name__ == '__main__':
main()
印刷:
[39, 37, 35, 33, 31, 29, 27, 25]
更新
我发现最好通过以下实验为池分配多个进程,这等于mp.cpu_count() - 1
让主进程有一个空闲的处理器来处理工作人员返回的结果。我还试验了这个chunksize
参数:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for n in range(10000):
s += i * i # square the argument
s /= 10000
return s
def main():
cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
N = 10000
chunk_size = N // cpu_count # 100 may be good enough
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
#print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
在我的桌面上(运行其他进程,例如流媒体音乐),上面的代码在分配mp.cpu_count() - 1
到时表现更好cpu_count
(2.4 秒与 2.5 秒)。以下是其他时间(四舍五入到小数点后一位):
chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds
chunksize 值为 1000 的结果有点异常。我建议尝试不同的值,否则N // (mp.cpu_count() - 1)
。这是假设您可以计算N
, 迭代中的项目数。当您有一个生成器作为可迭代对象时,在一般情况下,您必须首先将其转换为列表,才能获得其长度。在这个特定的基准测试中,即使是chunksize
1 的值也不会差太多。但这是我从不同的工作量中学worker_process
到的:
您的工作进程完成其任务所需的工作(即 CPU)越多,它对chunksize
参数的敏感度就越低。如果它在使用非常少的 CPU 后返回,则传输下一个块的开销会变得很大,并且您希望将块传输的数量保持在一个较小的值(即您想要一个较大的chunksize
值)。但是,如果进程运行时间很长,则传输下一个块的开销不会有那么大的影响。
在以下代码中,工作进程的 CPU 需求是微不足道的:
import multiprocessing as mp
import timeit
def worker_process(i):
return i ** 2
def main():
cpu_count = mp.cpu_count() - 1
N = 100000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)
时间安排:
chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds
在下面的代码中,工作进程的 CPU 需求更为可观:
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = N // cpu_count
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
时间安排:
chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds
更新 2
根据Python multiprocessing: Understanding logic behindchunksize
,当方法或被调用时,map
有一个用于计算 a 的特定算法,我在下面的代码中使用了它。我不知道为什么方法的默认值为1并且不使用相同的算法。也许是因为这不会像这些方法的描述所暗示的那样“懒惰”。在下面的代码中,它重复了之前的基准测试,我使用相同算法的重新定义来计算默认值:starmap
map_async
chunksize=None
chunksize
imap
imap_unordered
chunksize
import multiprocessing as mp
import timeit
def worker_process(i):
s = 0
for _ in range(1000000):
s += i * i
return s // 1000000
def compute_chunksize(pool_size, iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, pool_size * 4)
if extra:
chunksize += 1
return chunksize
def main():
cpu_count = mp.cpu_count() - 1
N = 1000
chunk_size = compute_chunksize(cpu_count, N)
print('chunk_size =', chunk_size)
results = []
with mp.Pool(cpu_count) as pool:
for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
results.append(result)
print(results[0:10])
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)
时间:
chunksize 36 -> 22.2 seconds