6

我有一个 Bottle 应用程序,它使用子流程来完成请求的大部分工作。对于返回单个响应的路由,我会执行以下操作。

@route('/index')
def index():
    worker = getWorker()
    return worker.doStuff()

我的一条路线需要是数据流。我想不出一种让工作人员返回响应流的聪明方法。下面的示例与我想做的类似,只是没有工人。

@route('/stream')
def stream():
    yield 'START'
    sleep(3)
    yield 'MIDDLE'
    sleep(5)
    yield 'END'

我希望能够做类似下面的事情。由于我无法产生/返回生成器,因此这种方式是不可能的。

@route('/stream')
def stream():
    worker = getWorker()
    yield worker.doStuff()
class worker:
    # Remember, this is run in a subprocess in real life.
    def doStuff():
        yield 'START'
        sleep(3)
        yield 'MIDDLE'
        sleep(5)
        yield 'END'

这是一个大型项目,我做事的方式没有太大的灵活性。我知道有时最简单的答案是“你的设计是错误的”。但是,在这种情况下,我有一些超出我控制范围的约束(路由必须是数据流,并且工作必须由子进程完成)。

编辑 我也不能有 doStuff() 块。我希望能够创建类似于我返回的 gevent 队列并拥有工作进程的东西。现在的问题是我似乎不能同时使用 gevent.queue 和 Process 。

@route('/stream')
def index():
    body = gevent.queue.Queue()
    worker = multiprocessing.Process(target=do_stuff, args=body)
    worker.start()
    return body()

def do_stuff(body):
    while True:
        gevent.sleep(5)
        body.put("data")
4

2 回答 2

2

经过大量研究和实验,我确定 gevent 队列不能以这种方式与 Python 多处理一起使用。与其以这种方式做事,不如使用 redis 之类的东西来允许进程和 gevent greenlets 通信。

@route('/stream')
def index():
    worker = multiprocessing.Process(target=do_stuff)
    worker.start()
    yield redis_server.lpop()

def do_stuff(body):
    while True:
        gevent.sleep(5)
        redis_server.lpush("data")
于 2013-01-17T18:40:19.940 回答
1

在您的最后一个示例中,worker.doStuff()返回一个可迭代的生成器。您可以将其返回(更改yieldreturn)。Bottle 接受迭代作为返回值,只要它们产生字节或 unicode 字符串。

于 2012-11-10T14:45:19.543 回答