1

我正在从事一个有大量工人的项目。我没有使用内置的multiprocessing.Pool,而是创建了自己的进程池。

它的工作方式是我创建了两个实例multiprocessing.Queue- 一个用于向工作人员发送工作任务,另一个用于接收返回的结果。

每个工人只是坐在一个永久运行的循环中,如下所示:

while True:
    try:
        request = self.request_queue.get(True, 5)
    except Queue.Empty:
        continue
    else:
        result = request.callable(*request.args, **request.kwargs)
        self.results_queue.put((request, result))

还有一些错误处理代码,但我把它留了下来。每个工作进程都daemon设置为1.

我希望正确关闭主进程和所有子工作进程。到目前为止我的经验(使用 Ctrl+C):

  • 在没有特殊实现的情况下,每个子进程都会因 KeyboardInterrupt 回溯而停止/崩溃,但主进程不存在并且必须被杀死(sudo kill -9)。
  • 如果我为子进程实现一个信号处理程序,设置为忽略 SIGINT,主线程会显示 KeyboardInterrupt tracebok,但无论哪种方式都不会发生任何事情。
  • 如果我为子进程和主进程实现一个信号处理程序,我可以看到在主进程中调用了信号处理程序,但调用sys.exit()似乎没有任何效果。

我正在寻找一种“最佳实践”的方式来处理这个问题。我还在某处读到关闭与Queues 和Pipes 交互的进程可能会导致它们与其他进程死锁(由于信号量和内部使用的其他东西)。

我目前的方法如下: - 找到一种方法向将终止其主循环的每个进程(使用单独的命令队列或类似的)发送内部信号。- 为发送关闭命令的主循环实现信号处理程序。子进程将有一个子处理程序,将它们设置为忽略信号。

这是正确的方法吗?

4

1 回答 1

1

您需要注意的事情是处理在您想要关闭时队列中有消息的可能性,因此您需要一种方法让您的进程干净地耗尽其输入队列。假设您的主进程会识别到是时候关闭了,您可以这样做。

  1. 向每个工作进程发送一个哨兵。这是一条特殊的消息(经常None),看起来永远不会像普通消息。在哨兵之后,刷新并关闭每个工作进程的队列。
  2. 在您的工作进程中使用类似于以下伪代码的代码:

    while True:  # Your main processing loop
        msg = inqueue.dequeue()  # A blocking wait
        if msg is None:
            break
        do_something()
    outqueue.flush()
    outqueue.close()
    

如果有可能有多个进程在上面发送消息,那么inqueue您将需要一种更复杂的方法。此示例取自Python 3.2 或更高版本中monitor方法的源代码,显示了一种可能性。logging.handlers.QueueListener

            """
            Monitor the queue for records, and ask the handler
            to deal with them.

            This method runs on a separate, internal thread.
            The thread will terminate if it sees a sentinel object in the queue.
            """
            q = self.queue
            has_task_done = hasattr(q, 'task_done')
            # self._stop is a multiprocessing.Event object that has been set by the
            # main process as part of the shutdown processing, before sending
            # the sentinel           
            while not self._stop.isSet():
                try:
                    record = self.dequeue(True)
                    if record is self._sentinel:
                        break
                    self.handle(record)
                    if has_task_done:
                        q.task_done()
                except queue.Empty:
                    pass
            # There might still be records in the queue.
            while True:
                try:
                    record = self.dequeue(False)
                    if record is self._sentinel:
                        break
                    self.handle(record)
                    if has_task_done:
                        q.task_done()
                except queue.Empty:
                    break
于 2013-08-05T16:57:50.330 回答