0

我正在寻找一种方法来并行启动两个函数,每个函数都在一组给定的不同参数上执行。我用它pool.map来实现这一点。我创建了两个不同的进程,每个进程启动一个执行map. 这行得通 - 执行顺序有点狂野,但我会把它留到另一个问题上。

现在我在这里也找到了另一种方法(见第一个答案)。他们只使用一个池并map_async连续调用两次。所以我想知道,是否有这样做的首选方法?因为我读过(遗憾的是我不记得在哪里)最好只使用一个池,这意味着第二种方法(只使用一个池)更好。但是当我测量时间时,第一种方法(在不同的进程中使用两个池)实际上要快一点。此外,在第一种方法中,函数实际上是并行运行的,而在第二种方法中,首先执行第一次调用map_async,然后执行第二次调用。

这是我的测试代码:

from multiprocessing import Process, Pool
import time
import os

multiple_pools = True
data = list(range(1, 11))


def func_a(param):
    print(f'running func_a in process {os.getpid()}')
    print(f'passed argument: {param}')
    print('calculating...\n')
    time.sleep(1.5)
    print('done\n')

def func_b(param):
    print(f'running func_b in process {os.getpid()}')
    print(f'passed argument: {param}')
    print('calculating...\n')
    time.sleep(2.5)
    print('done\n')

def execute_func(func, param):
    p = Pool(processes=8)
    with p:
        p.map(func, param)


if __name__ == '__main__':
    if not multiple_pools:
        t0 = time.time()
        p = Pool(processes=8)

        res = p.map_async(func_a, data)
        res = p.map_async(func_b, data)

        p.close()
        p.join()

        t1 = time.time()
        dt = t1 -t0
        print(f'time spent with one pool: {dt} s')

    else:
        t0 = time.time()
        p1 = Process(target=execute_func, args=(func_a, data))
        p2 = Process(target=execute_func, args=(func_b, data))

        p1.start()
        p2.start()

        p1.join()
        p2.join()
        p1.close()
        p2.close()

        t1=time.time()
        dt = t1 -t0
        print(f'time spent with two pools, each inside an own process: {dt} s')

再说一次,我的问题是:有没有一种方法比另一种更受欢迎?或者甚至可能有其他/更好的方法来做到这一点?

4

1 回答 1

1

首先,我假设当您使用两个池时,您将使用非阻塞map_async方法。我会说两个大小为 N 的池,每个池您将 M 个任务提交到所有任务相同的每个池(即,就 CPU、I/O 等而言,需要相同的资源)应该更多或- 在时间上与将相同的 2 * M 任务提交到大小为 2 * N 的单个池中的等效执行时间更少。

下面的程序演示了这两种情况:

from multiprocessing import Pool
import time

QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second(x):
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return x * x

def callback(result):
    global callback_count
    print('Two pools result:', result)
    callback_count += 1
    if callback_count == 2:
        # Both map-async calls have completed:
        print('Two pools time:', time.time() - start_time)

# required for Windows:
if __name__ == '__main__':
    data1 = range(10)
    data2 = range(10, 20)
    # Two pools:
    pool1 = Pool(4)
    pool2 = Pool(4)
    callback_count = 0
    start_time = time.time()
    pool1.map_async(quarter_second, data1, callback=callback)
    pool2.map_async(quarter_second, data2, callback=callback)
    pool1.close()
    pool1.join()
    pool2.close()
    pool2.join()

    # One Pool:
    data = range(20)
    pool = Pool(8)
    start_time = time.time()
    result = pool.map(quarter_second, data)
    print('One pool result:', result)
    print('One pool time:', time.time() - start_time)
    pool.close()
    pool.join()

印刷:

Two pools result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Two pools result: [100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
Two pools time: 1.4994373321533203
One pool result: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
One pool time: 1.4596436023712158

我重新运行了几次,大多数但不是所有情况下单池案例的表现都稍好一些。但是我的桌面上运行了许多其他进程,这些进程会影响结果。我没有将创建处理池的实际时间包括在总时间中。此外,根据池的大小和可迭代参数,map 函数可以计算一个稍微不同的chunksize值,以便在没有明确指定chunksize参数时使用,就像这里的情况一样。但这对性能的影响可以忽略不计。总而言之,考虑到我的假设,我看不出单池和双池方法之间有任何显着的性能差异。

于 2021-10-14T13:55:23.510 回答