0

我有一个在 gevent 中运行的原始生产者/消费者脚本。它启动了一些将事物放入 a 的生产者函数gevent.queue.Queue,以及一个将它们再次从队列中取出的消费者函数:

from __future__ import print_function

import time

import gevent
import gevent.queue
import gevent.monkey

q = gevent.queue.Queue()

# define and spawn a consumer
def consumer():
    while True:
        item = q.get(block=True)
        print('consumer got {}'.format(item))

consumer_greenlet = gevent.spawn(consumer)

# define and spawn a few producers
def producer(ID):
    while True:
        print("producer {} about to put".format(ID))
        q.put('something from {}'.format(ID))
        time.sleep(0.1)
#       consumer_greenlet.switch()      

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]

# wait indefinitely
gevent.monkey.patch_all()
print("about to join")
consumer_greenlet.join()

如果我让 gevent 隐式处理调度(例如通过调用 time.sleep 或其他一些gevent.monkey.patch()ed 函数),它工作得很好,但是当我显式切换到消费者(替换time.sleep为注释掉的switch调用)时,gevent 会引发 AssertionError:

Traceback (most recent call last):
  File "/my/virtualenvs/venv/local/lib/python2.7/site-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "switch_test.py", line 14, in consumer
    item = q.get(block=True)
  File "/my/virtualenvs/venv/lib/python2.7/site-packages/gevent/queue.py", line 201, in get
    assert result is waiter, 'Invalid switch into Queue.get: %r' % (result, )
AssertionError: Invalid switch into Queue.get: ()
<Greenlet at 0x7fde6fa6c870: consumer> failed with AssertionError

我想使用显式切换,因为在生产中我有很多生产者,gevent 的调度没有为消费者分配几乎足够的运行时间,并且队列变得越来越长(这很糟糕)。或者,非常感谢您对如何配置或修改 gevent 的调度程序的任何见解。

这是在 Python 2.7.2、gevent 1.0.1 和 greenlet 0.4.5 上。

4

1 回答 1

0

在我看来,显式开关与隐式开关并不能很好地配合。由于猴子修补 I/O 或因为gevent.queue.Queue().

gevent 文档不鼓励使用原始 greenlet 方法:

作为 greenlet 的子类,Greenlet 也有 switch() 和 throw() 方法。但是,这些不应该在应用程序级别使用,因为它们很容易导致永远没有计划的 greenlet。更喜欢更高级别的安全类,例如 Event 和 Queue。

迭代gevent.queue.Queue()或访问队列的get方法会进行隐式切换,有趣的put是不会。所以你必须自己生成一个隐式线程切换。最简单的是打电话gevent.sleep(0)(您实际上不必等待特定时间)。

总之,只要您的代码没有阻塞 IO 操作,您甚至不必对事物进行修补。

我会像这样重写你的代码:

import gevent
import gevent.queue

q = gevent.queue.Queue()

# define and spawn a consumer
def consumer():
    for item in q:
        print('consumer got {}'.format(item))

consumer_greenlet = gevent.spawn(consumer)

# define and spawn a few producers
def producer(ID):
    print('producer started', ID)
    while True:
        print("producer {} about to put".format(ID))
        q.put('something from {}'.format(ID))
        gevent.sleep(0)

producer_greenlets = [gevent.spawn(producer, i) for i in range(5)]
# wait indefinitely
print("about to join")
consumer_greenlet.join()
于 2015-08-26T12:58:45.980 回答