0

我在kombu有两个队列;一个提交请求(做某事),另一个通过 pub/sub 吐出所述请求的增量状态。因此,在我的过程中,它将发布到请求队列并在响应队列中使用。由于任务可能需要一些时间,我想向用户提供有关后端发生的事情的反馈;这一切都在命令行上运行,因为我的 Kombuconsume回调允许我添加一条logging.info()语句以向我的用户吐回信息:

def callback( msg, env ):
    logging.info( str(msg) )

consumer.register_callback( callback )
consumer.consume()
while continue_consuming:
    connection.drain_events()

但是,我现在希望能够在 django 中提供相同的功能。我知道我可以创建一个generator函数作为HttpResponse对象的输入:

def view( reqeust ):
    HttpResponse( gen() )

def gen():
    yield 'streaming... '

但我无法概念化如何将kombu队列的消息回调实现到生成器中来提供这个......有什么想法吗?

如果可能的话,我想避免使用数据库层来存储进度/结果。

4

1 回答 1

0

最后我决定稍微重构一下代码;因为我有一个围绕 kombu 队列的包装器以使界面multiprocess.Queue更像,所以我为我的get()方法创建了一个生成器。

def get( self, until=None ):
    if until == None:
        until = self.end_marker
    for c in count():
        m = self.consumer.queues[0].get( True )
        if not m == None:
            if m.payload == until:
                raise StopIteration
            yield m.payload

这似乎工作正常 - 但并不是那么干净,因为我需要知道self.end_markerorutil是什么,并且还可能想要遍历所有消费者队列(但我的类无论如何都是每个对象的队列,所以这还不错)

那么在我看来,我所做的就是:

 def view( response ):
     q = Queue()
     return HttpResponse( q.get() )

有很多关于各种中间件的帖子;我只是不费心使用它们,它似乎工作正常。

于 2011-06-13T18:01:38.433 回答