2

我想做这样的事情(1个队列和多个消费者):

import gevent
from gevent import queue

q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)

def consumer(qq):
    for i in qq:
        print i

jobs=[gevent.spawn(consumer,i) for i in [q,q]]

gevent.joinall(jobs)

但这是不可能的......队列被job1消耗......所以job2会永远阻塞。它给了我例外gevent.hub.LoopExit: This operation would block forever

我希望每个消费者都能从一开始就使用完整的队列。(应该显示 1,2,3,1,2,3 或 1,1,2,2,3,3 ...没关系)

一个想法应该是在生成之前克隆队列,但是使用复制(浅/深)模块是不可能的;-(

还有另一种方法吗?

[编辑] 你怎么看?

import gevent
from gevent import queue

class MasterQueueClonable(queue.Queue):
    def __init__(self,*a,**k):
        queue.Queue.__init__(self,*a,**k)

        self.__cloned = []
        self.__old=[]

    #override
    def get(self,*a,**k):
        e=queue.Queue.get(self,*a,**k)
        for i in self.__cloned:  i.put(e) # serve to current clones
        self.__old.append(e)              # save old element
        return e

    def clone(self):
        q=queue.Queue()
        for i in self.__old: q.put(i)   # feed a queue with elements which are out
        self.__cloned.append(q)         # stock the queue, to be able to put newer elements too
        return q

q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)

def consumer(qq):
    for i in qq:
        print id(qq),i

jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)

它基于RyanYe的想法。有一个没有调度程序的“主队列”。我的主队列覆盖了 GET 方法,并且可以分派到按需克隆。而且,可以在 masterqueue 启动后创建一个“克隆”(使用 __old 技巧)。

4

3 回答 3

2

我建议您创建一个 greenlet 将工作分派给消费者。示例代码:

import gevent
from gevent import queue

master_queue=queue.Queue()
master_queue.put(1)
master_queue.put(2)
master_queue.put(3)
master_queue.put(StopIteration)

total_consumers = 10
consumer_queues = [queue.Queue() for i in xrange(total_consumers)]

def dispatcher(master_queue, consumer_queues):
    for i in master_queue:
        [j.put(i) for j in consumer_queues]
    [j.put(StopIteration) for j in consumer_queues]

def consumer(qq):
    for i in qq:
        print i

jobs=[gevent.spawn(dispatcher, q, consumer_queues)] + [gevent.spawn(consumer, i) for i in consumer_queues]
gevent.joinall(jobs)

更新:修复消费者队列缺少的 StopIteration。感谢arilou指出。

于 2012-07-25T07:42:57.707 回答
1

我已将方法添加 copy()到 Queue 类:

>>> import gevent.queue
>>> q = gevent.queue.Queue()
>>> q.put(5)
>>> q.copy().get()
5
>>> q
<Queue at 0x1062760d0 queue=deque([5])>

让我知道它是否有帮助。

于 2012-07-26T10:54:05.903 回答
0

在 Ryan Ye 的回答中,dispatcher() 函数末尾缺少一行: [j.put(StopIteration) for j in consumer_queues] 没有它,我们仍然会得到 'gevent.hub.LoopExit: This operation will block forever'因为 'for i in master_queue' 循环不会将 StopIteration 异常复制到 consumer_queues 中。

(对不起,我还不能发表评论,所以我把它写成一个单独的答案。)

于 2012-07-25T21:39:10.747 回答