4

我正在编写一个生成其他几个子进程的守护程序。运行stop脚本后,主进程在打算退出时继续运行,这让我很困惑。

import daemon, signal
from multiprocessing import Process, cpu_count, JoinableQueue
from http import httpserv
from worker import work

class Manager:
    """
    This manager starts the http server processes and worker
    processes, creates the input/output queues that keep the processes
    work together nicely.
    """
    def __init__(self):
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.i_queue = JoinableQueue()
        self.o_queue = JoinableQueue()

        # Create worker processes
        self.workers = [Process(target=work,
                                args=(self.i_queue, self.o_queue))
                        for i in range(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.daemon = True
            w.start()

        # Create the http server process
        self.http = Process(target=httpserv, args=(self.i_queue, self.o_queue))
        self.http.daemon = True
        self.http.start()

        # Keep the current process from returning
        self.running = True
        while self.running:
            time.sleep(1)

    def stop(self):
        print "quiting ..."

        # Stop accepting new requests from users
        os.kill(self.http.pid, signal.SIGINT)

        # Waiting for all requests in output queue to be delivered
        self.o_queue.join()

        # Put sentinel None to input queue to signal worker processes
        # to terminate
        self.i_queue.put(None)
        for w in self.workers:
            w.join()
        self.i_queue.join()

        # Let main process return
        self.running = False


import daemon

manager = Manager()
context = daemon.DaemonContext()
context.signal_map = {
        signal.SIGHUP: lambda signum, frame: manager.stop(),
        }

context.open()
manager.start()

stop脚本只是一个单行脚本os.kill(pid, signal.SIGHUP),但之后子进程(工作进程和http服务器进程)很好地结束,但主进程只是停留在那里,我不知道是什么让它无法返回。

4

2 回答 2

1

您创建了 http 服务器进程,但没有创建join()。如果不是执行 anos.kill()来停止 http 服务器进程,而是向它发送一个停止处理标记(None就像您发送给工作人员一样)然后执行 a 会发生 self.http.join()什么?

更新:您还需要为每个工作人员None将哨兵发送到输入队列一次。你可以试试:

    for w in self.workers:
        self.i_queue.put(None)
    for w in self.workers:
        w.join()

注意您需要两个循环的原因是,如果您将None放入队列中的同一循环中,则join()可能None会被除 之外的工作人员拾取w,因此加入w将导致调用者阻塞。

您没有显示工作人员或 http 服务器的代码,因此我假设这些在调用 task_done 等方面表现良好,并且每个工作人员都会在看到 aNone时立即退出,而不会 get()从输入中获取更多内容队列。

另外,请注意,至少存在一个开放的、难以重现的问题JoinableQueue.task_done()这可能会困扰您。

于 2009-06-23T22:49:26.867 回答
1

我尝试了一种不同的方法,这似乎有效(注意我删除了代码的守护进程部分,因为我没有安装该模块)。

import signal

class Manager:
    """
    This manager starts the http server processes and worker
    processes, creates the input/output queues that keep the processes
    work together nicely.
    """
    def __init__(self):
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):

       # all your code minus the loop

       print "waiting to die"

       signal.pause()

    def stop(self):
        print "quitting ..."

        # all your code minus self.running


manager = Manager()

signal.signal(signal.SIGHUP, lambda signum, frame: manager.stop())

manager.start()

一个警告是 signal.pause() 将对任何信号取消暂停,因此您可能需要相应地更改代码。

编辑:

以下对我来说很好:

import daemon
import signal
import time

class Manager:
    """
    This manager starts the http server processes and worker
    processes, creates the input/output queues that keep the processes
    work together nicely.
    """
    def __init__(self):
        self.NUMBER_OF_PROCESSES = 5

    def start(self):

       # all your code minus the loop

       print "waiting to die"
       self.running = 1
       while self.running:
           time.sleep(1)

       print "quit"



    def stop(self):
        print "quitting ..."

        # all your code minus self.running

        self.running = 0


manager = Manager()

context = daemon.DaemonContext()
context.signal_map = {signal.SIGHUP : lambda signum, frame: manager.stop()}

context.open()
manager.start()

你用的是什么版本的python?

于 2009-06-23T19:34:59.787 回答