2

我试图在我的 Pyramid 应用程序中拥有一个工作进程池,可用于执行我不想让视图陷入困境的 CPU 密集型(或长时间运行)后台任务。我现在有什么工作,但有一个问题:如果女服务员退出终止(就像 --reload 发生的那样),工作人员会继续徘徊,我不知道如何发出信号让他们停止。

编辑:使用 Gunicorn(或仅从某个文件运行它)时,这似乎不是问题。这可能是女服务员的错误吗?

编辑2:嗯。或者 Gunicorn 只是以不同的方式处理它,使它看起来更好。

import multiprocessing as mp
from queue import Empty as QueueEmptyError

class MyPool(object):

    def __init__(self, processes=10, queue=None):
        self.jobqueue = queue if queue is not None else mp.Queue()
        self.procs = []
        self.running = True
        for i in range(processes):
            worker = mp.Process(target=self._worker, daemon=True)
            self.procs.append(worker)
            worker.start()

    def __del__(self):
        self.stopall()

    def stopall(self):
        self.running = False
        for worker in self.procs:
            worker.join()

    def _worker(self):
        while self.running:
            try:
                self._dojob(self.jobqueue.get(True, 1))
            except QueueEmptyError:
                pass

    def _dojob(self, taskdata):
        print(str(taskdata) + ' is happening')

class PoolMaster(object):
    def __init__(self):
        self.pools = []
        self.aqueue = mp.Queue()
        self.apool = MyPool(6, self.aqueue)
        self.pools.append(self.apool)

    def __del__(self):
        for pool in self.pools:
            pool.stopall()

    def do_something(self):
        self.aqueue.put_nowait('Something')

PoolMaster 在我的项目的 main() 函数中实例化一次,并通过将其添加到所有事件来公开给所有视图。

我之前尝试过的是在__del__发生时在队列中添加“毒丸”,但事实证明__del__似乎根本没有被调用。我不想使用多处理自己的池,因为它们似乎是为一次性运行一组工作负载而设计的,而不是一直在队列上工作。那么,在实际应用程序退出后如何阻止它们运行?

4

2 回答 2

0

您可以为您的(例如)Pipe中的每个工人使用 a 。然后,您的工作人员会定期收听一些取消代码(例如)。要在工作进程中使用管道,您可以执行以下操作:MyPoolcancelPipePipeTrue

connections = mp.Pipe()
self.cancelPipes.append(connections[0])
worker = mp.Process(target=self._worker, daemon=True, args=[connections[1]])

然后,当您想取消所有人时,请执行以下操作:

for c in self.cancelPipes:
    c.send(True)

工人会检查他们是否应该停止这样做:

gotSomething = cancelPipe.poll()
if gotSomething:
    cancel = cancelPipe.recv()
    if cancel:
        # Stop your worker
于 2013-09-14T16:52:49.227 回答
0

好吧,我找到了一种通过更改_worker函数来实现它的方法:

def _worker(self):
    while True:
        try:
            self._dojob(self.jobqueue.get(True, 3))
        except QueueEmptyError:
            pass
        if os.getppid() == 1:
            print('Parent process died, stopping worker.')
            sys.exit(0)

当父进程在没有结束的情况下死亡时,守护_worker进程os.getppid()将返回 1 而不是类 UNIX 系统上的父 ID。每 3 秒它停止等待队列中的新对象并检查它。

不过,我觉得这不是正确的方法。

于 2013-09-15T13:45:05.677 回答