好吧,我不会running()
太相信这种方法。似乎并不能真正反映实际的运行状态。
确保进程状态的最好方法是让它们打印/更新一些东西。我选择使用multiprocessing.Manager().dict()
对象创建共享字典。
可以从任何进程安全地查阅/更新此进程同步对象,并具有共享状态,即使在多处理环境中也是如此。
每次启动进程时,使用 PID 作为键和True
值来更新共享字典。设置False
退出。
import concurrent.futures
import multiprocessing
import time,os
def call(shared_dict):
shared_dict[os.getpid()] = True
print("start",shared_dict)
time.sleep(10)
shared_dict[os.getpid()] = False
print("end",shared_dict)
if __name__ == "__main__":
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
shared_dict = multiprocessing.Manager().dict()
futures = [executor.submit(call,shared_dict) for _ in range(8)]
time.sleep(5)
for future in futures:
print(future.running())
这是我得到的输出:
start {3076: True}
start {9968: True, 3076: True}
True
True
True
True
True
False
False
False
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
start {9968: True, 3076: True}
end {9968: False, 3076: True}
start {9968: True, 3076: True}
end {9968: True, 3076: False}
end {9968: False, 3076: False}
如您所见,我有 5 个正在运行的进程。而我的字典清楚地表明
- 不超过 2 个进程同时运行
- 进程在开始时只创建一次,然后重新用于执行进一步的调用(毕竟它是一个池)
让我们检查一下非常简约的文档:
running() 如果调用当前正在执行且无法取消,则返回 True。
它似乎反映了与取消Future
对象未来执行的可能性相关的状态(因为它尚未正确初始化/连接到通信队列并且仍然是时候取消它)而不是进程的实际“运行”状态本身。
这可能就是源代码中这条注释在下面set_running_or_notify_cancel
定义中的含义:
将未来标记为正在运行或处理任何取消通知。
如果未来已被取消(cancel() 被调用并返回 True),那么任何等待未来完成的线程(尽管调用 as_completed() 或 wait())都会收到通知并返回 False。
如果未取消未来,则将其置于运行状态(未来对 running() 的调用将返回 True)并返回 True。
再一次,我们了解到最好要求子流程进行协作,发布它们的状态,而不是试图使用不明确的记录方法来敲诈它。