我想做这样的事情(1个队列和多个消费者):
import gevent
from gevent import queue
q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print i
jobs=[gevent.spawn(consumer,i) for i in [q,q]]
gevent.joinall(jobs)
但这是不可能的......队列被job1消耗......所以job2会永远阻塞。它给了我例外gevent.hub.LoopExit: This operation would block forever
。
我希望每个消费者都能从一开始就使用完整的队列。(应该显示 1,2,3,1,2,3 或 1,1,2,2,3,3 ...没关系)
一个想法应该是在生成之前克隆队列,但是使用复制(浅/深)模块是不可能的;-(
还有另一种方法吗?
[编辑] 你怎么看?
import gevent
from gevent import queue
class MasterQueueClonable(queue.Queue):
def __init__(self,*a,**k):
queue.Queue.__init__(self,*a,**k)
self.__cloned = []
self.__old=[]
#override
def get(self,*a,**k):
e=queue.Queue.get(self,*a,**k)
for i in self.__cloned: i.put(e) # serve to current clones
self.__old.append(e) # save old element
return e
def clone(self):
q=queue.Queue()
for i in self.__old: q.put(i) # feed a queue with elements which are out
self.__cloned.append(q) # stock the queue, to be able to put newer elements too
return q
q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print id(qq),i
jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)
它基于RyanYe的想法。有一个没有调度程序的“主队列”。我的主队列覆盖了 GET 方法,并且可以分派到按需克隆。而且,可以在 masterqueue 启动后创建一个“克隆”(使用 __old 技巧)。