1

我阅读了很多关于使用该multiprocessing模块进行并行化的帖子,但没有一篇能完全回答我的问题。

我有一个很长的生成器给我参数值,并且对于每个我想计算一些函数值。但是,我只想保存最好的n,因为我只对最好的感兴趣,保存所有结果会炸毁 RAM。在我看来,有两种方法可以做到这一点:1)在保存最佳值的进程之间使用公共共享内存,或者 2)为每个核心/进程保留单独的最佳结果列表,然后手动合并这些一起列出。

我认为第二种方法会更好,但是我不确定如何实现。这是我到目前为止得到的:

import numpy as np
import multiprocessing
from functools import partial


def get_generator(length: int):
    for i in range(length):
        yield [i, i + 1]


def some_func(x, other_stuff):
    y = np.sum(x)
    return y


def task(other_stuff, x: np.ndarray):
    val = some_func(x, other_stuff)
    
    if val > task.some_dict['min']:
        task.l.append(val)
        task.some_dict['min'] = val
    return


def task_init(l, some_dict):
    task.l = l
    task.some_dict = some_dict
    task.some_dict['min'] = np.NINF

n = 20
generator = get_generator(n)
other_stuff = np.nan

func = partial(task, other_stuff)

l = multiprocessing.Manager().list()
some_dict = multiprocessing.Manager().dict()

p = multiprocessing.Pool(None, task_init, [l, some_dict])

p.imap(func, generator, chunksize=10000)

p.close()
p.join()

这与我想做的有点相似。但我真的很关心性能,在实际代码中,最佳值的比较/保存会更加复杂,所以我认为共享内存方法会非常慢。

我的问题归结为:如果我有例如 8 个内核,我怎么能有 8 个列表,每个列表对应一个将返回的内核的最佳结果,以便内核完全独立且相当快速地工作?

非常感谢!

4

1 回答 1

1

这些是我付诸行动的意见。我希望您的实际任务是更复杂的计算,否则几乎不值得使用多处理。

import numpy as np
import multiprocessing
from functools import partial
from heapq import *

def get_generator(length: int):
    for i in range(length):
        yield [i, i + 1]


def some_func(x, other_stuff):
    y = np.sum(x)
    return y


def task(other_stuff, x: np.ndarray):
    val = some_func(x, other_stuff)
    return val



def main():
    n = 20
    generator = get_generator(n)
    other_stuff = np.nan

    func = partial(task, other_stuff)

    cpu_count = multiprocessing.cpu_count() - 1 # leave a processor for the main process
    chunk_size = n // cpu_count
    HEAPSIZE = 8
    with multiprocessing.Pool(cpu_count) as pool:
        heap = []
        for val in pool.imap_unordered(func, generator, chunksize=chunk_size):
            if len(heap) < HEAPSIZE:
                heappush(heap, val)
            elif val > heap[0]:
                heappushpop(heap, val)
        # sort
        values = sorted(heap, reverse=True)
        print(values)


if __name__ == '__main__':
    main()

印刷:

[39, 37, 35, 33, 31, 29, 27, 25]

更新

我发现最好通过以下实验为池分配多个进程,这等于mp.cpu_count() - 1让主进程有一个空闲的处理器来处理工作人员返回的结果。我还试验了这个chunksize参数:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for n in range(10000):
        s += i * i # square the argument
    s /= 10000
    return s

def main():
    cpu_count = mp.cpu_count() - 1 # leave a processor for the main process
    N = 10000
    chunk_size = N // cpu_count # 100 may be good enough
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        #print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)

在我的桌面上(运行其他进程,例如流媒体音乐),上面的代码在分配mp.cpu_count() - 1到时表现更好cpu_count(2.4 秒与 2.5 秒)。以下是其他时间(四舍五入到小数点后一位):

chunksize = 1428 -> 2.4 seconds (N // (mp.cpu_count() - 1)
chunksize = 1000 -> 2.7 seconds
chunksize = 100 -> 2.4 seconds
chunksize = 10 -> 2.4 seconds
chunksize = 1 -> 2.6 seconds

chunksize 值为 1000 的结果有点异常。我建议尝试不同的值,否则N // (mp.cpu_count() - 1)。这是假设您可以计算N, 迭代中的项目数。当您有一个生成器作为可迭代对象时,在一般情况下,您必须首先将其转换为列表,才能获得其长度。在这个特定的基准测试中,即使是chunksize1 的值也不会差太多。但这是我从不同的工作量中学worker_process到的:

您的工作进程完成其任务所需的工作(即 CPU)越多,它对chunksize参数的敏感度就越低。如果它在使用非常少的 CPU 后返回,则传输下一个块的开销会变得很大,并且您希望将块传输的数量保持在一个较小的值(即您想要一个较大的chunksize值)。但是,如果进程运行时间很长,则传输下一个块的开销不会有那么大的影响。

在以下代码中,工作进程的 CPU 需求是微不足道的:

import multiprocessing as mp
import timeit

def worker_process(i):
    return i ** 2

def main():
    cpu_count = mp.cpu_count() - 1
    N = 100000
    chunk_size = N // cpu_count
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=10, globals=globals()) / 10)

时间安排:

chunksize = 1428 -> .19 seconds
chunksize = 100 -> .39 seconds
chunksize = 1 -> 11.06 seconds

在下面的代码中,工作进程的 CPU 需求更为可观:

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for _ in range(1000000):
        s += i * i
    return s // 1000000


def main():
    cpu_count = mp.cpu_count() - 1
    N = 1000
    chunk_size = N // cpu_count
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)

时间安排:

chunksize = 142 -> 22.6 seconds (N // (mp.cpu_count() - 1)
chunksize = 10 -> 23.5 seconds
chunksize = 1 -> 23.2 seconds

更新 2

根据Python multiprocessing: Understanding logic behindchunksize,当方法或被调用时,map有一个用于计算 a 的特定算法,我在下面的代码中使用了它。我不知道为什么方法的默认值为1并且不使用相同的算法。也许是因为这不会像这些方法的描述所暗示的那样“懒惰”。在下面的代码中,它重复了之前的基准测试,我使用相同算法的重新定义来计算默认值:starmapmap_asyncchunksize=Nonechunksizeimapimap_unorderedchunksize

import multiprocessing as mp
import timeit

def worker_process(i):
    s = 0
    for _ in range(1000000):
        s += i * i
    return s // 1000000


def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize


def main():
    cpu_count = mp.cpu_count() - 1
    N = 1000
    chunk_size = compute_chunksize(cpu_count, N)
    print('chunk_size =', chunk_size)
    results = []
    with mp.Pool(cpu_count) as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunk_size):
            results.append(result)
        print(results[0:10])

if __name__ == '__main__':
    print(timeit.timeit(stmt='main()', number=3, globals=globals()) / 3)

时间:

chunksize 36 -> 22.2 seconds
于 2020-10-19T23:57:07.763 回答