4

在 Python 的concurrent.futures标准模块中,为什么 a ProcessPoolExecutorismax_workers + 1而不是 a 中的max_workerslike ThreadPoolExecutor?仅当提交的调用数量严格大于池工作进程的数量时才会发生这种情况。

以下 Python 代码片段向 a 中的 2 个工作人员提交了 8 个调用ProcessPoolExecutor

import concurrent.futures
import time


def call():
    while True:
        time.sleep(1)


if __name__ == "__main__":
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = [executor.submit(call) for _ in range(8)]
        time.sleep(5)

        for future in futures:
            print(future.running())

打印这个(3个正在运行的电话;意外,因为有2个工人):








使用 aThreadPoolExecutor打印此(2 个正在运行的调用;预期):








4

1 回答 1

1

好吧,我不会running()太相信这种方法。似乎并不能真正反映实际的运行状态。

确保进程状态的最好方法是让它们打印/更新一些东西。我选择使用multiprocessing.Manager().dict()对象创建共享字典。

可以从任何进程安全地查阅/更新此进程同步对象,并具有共享状态,即使在多处理环境中也是如此。

每次启动进程时,使用 PID 作为键和True值来更新共享字典。设置False退出。

import concurrent.futures
import multiprocessing
import time,os


def call(shared_dict):
    shared_dict[os.getpid()] = True
    print("start",shared_dict)
    time.sleep(10)
    shared_dict[os.getpid()] = False
    print("end",shared_dict)


if __name__ == "__main__":

    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        shared_dict = multiprocessing.Manager().dict()
        futures = [executor.submit(call,shared_dict) for _ in range(8)]
        time.sleep(5)
        for future in futures:
            print(future.running())

这是我得到的输出:

start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}

如您所见,我有 5 个正在运行的进程。而我的字典清楚地表明

  • 不超过 2 个进程同时运行
  • 进程在开始时只创建一次,然后重新用于执行进一步的调用(毕竟它是一个池)

让我们检查一下非常简约的文档

running() 如果调用当前正在执行且无法取消,则返回 True。

它似乎反映了与取消Future对象未来执行的可能性相关的状态(因为它尚未正确初始化/连接到通信队列并且仍然是时候取消它)而不是进程的实际“运行”状态本身。

这可能就是源代码中这条注释在下面set_running_or_notify_cancel定义中的含义:

将未来标记为正在运行或处理任何取消通知。

如果未来已被取消(cancel() 被调用并返回 True),那么任何等待未来完成的线程(尽管调用 as_completed() 或 wait())都会收到通知并返回 False。

如果未取消未来,则将其置于运行状态(未来对 running() 的调用将返回 True)并返回 True。

再一次,我们了解到最好要求子流程进行协作,发布它们的状态,而不是试图使用不明确的记录方法来敲诈它。

于 2019-06-13T20:06:23.187 回答