14

我正在编写一个具有一个生产者和多个消费者的服务器程序,让我感到困惑的是只有放入队列的第一个任务生产者被消费,之后排队的任务不再被消费,它们永远留在队列中。

from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time

def work(queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(5)
        print "task done:", task
    queue.put(None)

class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        self.workers = [Process(target=work, args=(self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        httpserv(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        queue.close()

Manager().start()

生产者是一个 HTTP 服务器,一旦收到用户的请求,它就会将任务放入队列中。当队列中有新任务时,消费者进程似乎仍然被阻塞,这很奇怪。

PS另外两个与上述无关的问题,我不确定将HTTP服务器放在主进程以外的自己的进程中是否更好,如果是,我怎样才能让主进程在所有子进程结束之前继续运行。第二个问题,优雅地停止 HTTP 服务器的最佳方法是什么?

编辑:添加生产者代码,它只是一个简单的 python wsgi 服务器:

import fapws._evwsgi as evwsgi
from fapws import base

def httpserv(queue):
    evwsgi.start("0.0.0.0", 8080)
    evwsgi.set_base_module(base)

    def request_1(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_1')
        return ["request 1!"]

    def request_2(environ, start_response):
        start_response('200 OK', [('Content-Type','text/html')])
        queue.put('task_2')
        return ["request 2!!"]

    evwsgi.wsgi_cb(("/request_1", request_1))
    evwsgi.wsgi_cb(("/request_2", request_2))

    evwsgi.run()
4

3 回答 3

11

我认为 Web 服务器部分一定有问题,因为它完美地工作:

from multiprocessing import Process, Queue, cpu_count
import random
import time


def serve(queue):
    works = ["task_1", "task_2"]
    while True:
        time.sleep(0.01)
        queue.put(random.choice(works))


def work(id, queue):
    while True:
        task = queue.get()
        if task is None:
            break
        time.sleep(0.05)
        print "%d task:" % id, task
    queue.put(None)


class Manager:
    def __init__(self):
        self.queue = Queue()
        self.NUMBER_OF_PROCESSES = cpu_count()

    def start(self):
        print "starting %d workers" % self.NUMBER_OF_PROCESSES
        self.workers = [Process(target=work, args=(i, self.queue,))
                        for i in xrange(self.NUMBER_OF_PROCESSES)]
        for w in self.workers:
            w.start()

        serve(self.queue)

    def stop(self):
        self.queue.put(None)
        for i in range(self.NUMBER_OF_PROCESSES):
            self.workers[i].join()
        self.queue.close()


Manager().start()

样本输出:

starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
于 2009-05-27T13:08:41.067 回答
4

“第二个问题,优雅停止HTTP服务器的最佳方式是什么?”

这很难。

进程间通信有两种选择:

  • 带外控制。服务器有另一种通信机制。另一个套接字、Unix 信号或其他东西。其他东西可能是服务器本地目录中的“立即停止”文件。看起来很奇怪,但它确实工作得很好,并且比引入一个选择循环来监听多个套接字或一个信号处理程序来捕获 Unis 信号更简单。

    “立即停止”文件很容易实现。循环仅在evwsgi.run()每次请求后检查此文件。为了让服务器停止,你创建文件,执行一个/control请求(这将得到一个 500 错误或其他东西,这并不重要)并且服务器应该停止。请记住删除立即停止文件,否则您的服务器将无法重新启动。

  • 带内控制。服务器有另一个 URL ( /stop) 将停止它。从表面上看,这似乎是一场安全噩梦,但这完全取决于该服务器的使用地点和方式。由于它似乎是内部请求队列的简单包装器,因此这个额外的 URL 运行良好。

    为了完成这项工作,您需要编写自己的版本evwsgi.run(),可以通过设置一些变量以跳出循环的方式终止该版本。

编辑

您可能不想终止您的服务器,因为您不知道它的工作线程的状态。您需要向服务器发出信号,然后您只需等待它正常完成即可。

如果你想强行杀死服务器,那么os.kill()(或multiprocessing.terminate)将起作用。当然,除非您不知道子线程在做什么。

于 2009-05-27T14:16:14.987 回答
1

这可以提供帮助: http ://www.rsdcbabu.com/2011/02/multiprocessing-with-python.html

于 2011-02-27T10:18:38.230 回答