我有一个在 gevent 中运行的原始生产者/消费者脚本。它启动了一些将事物放入 a 的生产者函数gevent.queue.Queue
,以及一个将它们再次从队列中取出的消费者函数:
from __future__ import print_function
import time
import gevent
import gevent.queue
import gevent.monkey
q = gevent.queue.Queue()
# define and spawn a consumer
def consumer():
while True:
item = q.get(block=True)
print('consumer got {}'.format(item))
consumer_greenlet = gevent.spawn(consumer)
# define and spawn a few producers
def producer(ID):
while True:
print("producer {} about to put".format(ID))
q.put('something from {}'.format(ID))
time.sleep(0.1)
# consumer_greenlet.switch()
producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
gevent.monkey.patch_all()
print("about to join")
consumer_greenlet.join()
如果我让 gevent 隐式处理调度(例如通过调用 time.sleep 或其他一些gevent.monkey.patch()
ed 函数),它工作得很好,但是当我显式切换到消费者(替换time.sleep
为注释掉的switch
调用)时,gevent 会引发 AssertionError:
Traceback (most recent call last):
File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
result = self._run(*self.args, **self.kwargs)
File "switch_test.py", line 14, in consumer
item = q.get(block=True)
File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get
assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
AssertionError: Invalid switch into Queue.get: ()
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError
我想使用显式切换,因为在生产中我有很多生产者,gevent 的调度没有为消费者分配几乎足够的运行时间,并且队列变得越来越长(这很糟糕)。或者,非常感谢您对如何配置或修改 gevent 的调度程序的任何见解。
这是在 Python 2.7.2、gevent 1.0.1 和 greenlet 0.4.5 上。