5

目标:产生一些greenlet worker处理从redis弹出的数据(从redis弹出然后放入队列)

运行环境:ubuntu 12.04 Python 版本:2.7 GEVENT 版本:1.0 RC2 REDIS 版本:2.6.5 REDIS-PY 版本:2.7.1

from gevent import monkey; monkey.patch_all()
import gevent
from gevent.pool import Group
from gevent.queue import JoinableQueue
import redis

tasks = JoinableQueue()
task_group = Group()

def crawler():
    while True:
        if not tasks.empty():
            print tasks.get()
            gevent.sleep()

task_group.spawn(crawler)
redis_client = redis.Redis()
data = redis_client.lpop('test') #<----------Block here
tasks.put(data)

尝试从 redis 中弹出数据,但它被阻塞了......并且没有引发异常......只需冻结并删除 spawn 方法,它会起作用..我对发生的事情感到困惑,请帮助!谢谢你!

4

1 回答 1

10

gevent 提供协作的轻量级进程(不是线程)。结果是当你在某处有一个无限循环并且调度程序永远不会重新进入时,程序将阻止占用 100% 的 CPU 内核。

在您的示例中,问题在于您定义爬虫循环的方式。显然,当任务为空时,您有一个无限循环。并且因为 gevent.sleep 调用(将执行必要的 yield 操作)仅在任务不为空时调用,这意味着调度程序永远不会重新进入。

它似乎阻止了 lpop 命令,因为连接被 Redis 客户端延迟。事件顺序如下:

  • 任务组产生;但还没有安排greenlet
  • redis_client 已构建,但由于实际连接延迟,它尚未生成 I/O
  • lpop 被调用;这次真的需要连接,因为 Redis 客户端必须等待连接和对 lpop 的回复;因此它让给调度程序
  • 调度程序激活爬虫工作者
  • 无限循环,因为任务队列仍然是空的

如果将 gevent.sleep() 放在循环本身中(在 if 之后),它会更好地工作,但它仍然是实现 dequeuer 的低效方式。这样的事情会好得多:

def crawler():
    while True:
        x = tasks.get()
        try:
            print "Crawler: ",x
        finally:
            tasks.task_done()

get() 调用阻塞了worker,因此它会在队列为空时避免worker 和调度程序之间的乒乓游戏。

于 2012-12-29T10:37:26.677 回答