我正在从事一个有大量工人的项目。我没有使用内置的multiprocessing.Pool
,而是创建了自己的进程池。
它的工作方式是我创建了两个实例multiprocessing.Queue
- 一个用于向工作人员发送工作任务,另一个用于接收返回的结果。
每个工人只是坐在一个永久运行的循环中,如下所示:
while True:
try:
request = self.request_queue.get(True, 5)
except Queue.Empty:
continue
else:
result = request.callable(*request.args, **request.kwargs)
self.results_queue.put((request, result))
还有一些错误处理代码,但我把它留了下来。每个工作进程都daemon
设置为1
.
我希望正确关闭主进程和所有子工作进程。到目前为止我的经验(使用 Ctrl+C):
- 在没有特殊实现的情况下,每个子进程都会因 KeyboardInterrupt 回溯而停止/崩溃,但主进程不存在并且必须被杀死(
sudo kill -9
)。 - 如果我为子进程实现一个信号处理程序,设置为忽略 SIGINT,主线程会显示 KeyboardInterrupt tracebok,但无论哪种方式都不会发生任何事情。
- 如果我为子进程和主进程实现一个信号处理程序,我可以看到在主进程中调用了信号处理程序,但调用
sys.exit()
似乎没有任何效果。
我正在寻找一种“最佳实践”的方式来处理这个问题。我还在某处读到关闭与Queue
s 和Pipe
s 交互的进程可能会导致它们与其他进程死锁(由于信号量和内部使用的其他东西)。
我目前的方法如下: - 找到一种方法向将终止其主循环的每个进程(使用单独的命令队列或类似的)发送内部信号。- 为发送关闭命令的主循环实现信号处理程序。子进程将有一个子处理程序,将它们设置为忽略信号。
这是正确的方法吗?