语境:
- 使用 a
concurrent.futures.process.ProcessPool
执行代码的 Python 应用程序服务器 - 我们有时想在不重启整个服务器进程的情况下热重载导入的代码
(是的,我知道importlib.reload
有警告)
为了让它工作,我想我必须在进程池管理的importlib.reload
每个multiprocessing
进程中执行。
有没有办法向进程池中的所有进程提交一些东西?
语境:
concurrent.futures.process.ProcessPool
执行代码的 Python 应用程序服务器(是的,我知道importlib.reload
有警告)
为了让它工作,我想我必须在进程池管理的importlib.reload
每个multiprocessing
进程中执行。
有没有办法向进程池中的所有进程提交一些东西?
我不知道这将如何与您提到的热重装尝试一起发挥作用,但是您真正提出的一般问题是可以回答的。
有没有办法向进程池中的所有进程提交一些东西?
这里的挑战在于确保所有进程都得到something
一次且仅一次,并且在每个进程得到它之前不会进一步执行。
您可以借助multiprocessing.Barrier(parties[, action[, timeout]])
. 屏障将阻止各方调用barrier.wait()
,直到每一方都这样做,然后立即释放它们。
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
def foo(x):
for _ in range(int(42e4)):
pass
return x
def reload(something):
print(f"{mp.current_process().name} --- reloading {something} and waiting.")
barrier.wait()
print(f"{mp.current_process().name} --- released.")
def init_barrier(barrier):
globals()['barrier'] = barrier
if __name__ == '__main__':
MAX_WORKERS = 4
barrier = mp.Barrier(MAX_WORKERS)
with ProcessPoolExecutor(
MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
) as executor:
print(list(executor.map(foo, range(10))))
# then something for all processes
futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
for f in futures:
f.result()
print(list(executor.map(foo, range(10))))
示例输出:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Process finished with exit code 0
如果您可以保留全局变量barrier
和multiprocessing.get_context()._name
返回值"fork"
,则不需要使用全局变量,initializer
因为全局变量将被继承并通过分叉访问。