33

我想使用 redis 的 pubsub 来传输一些消息,但不想被阻止使用listen,如下面的代码:

import redis
rc = redis.Redis()

ps = rc.pubsub()
ps.subscribe(['foo', 'bar'])

rc.publish('foo', 'hello world')

for item in ps.listen():
    if item['type'] == 'message':
        print item['channel']
        print item['data']

最后一for节将阻塞。我只想检查给定频道是否有数据,我该如何完成?有没有check类似的方法?

4

11 回答 11

50

如果您正在考虑非阻塞、异步处理,您可能正在使用(或应该使用)异步框架/服务器。

更新:距离最初的答案已经 5 年了,与此同时 Python 获得了原生异步 IO 支持。现在有AIORedis,一个异步 IO Redis 客户端

于 2013-01-08T16:32:43.510 回答
24

接受的答案已过时,因为 redis-py 建议您使用 non-blocking get_message()。但它也提供了一种轻松使用线程的方法。

https://pypi.python.org/pypi/redis

阅读消息有三种不同的策略。

在幕后,get_message() 使用系统的“选择”模块快速轮询连接的套接字。如果有数据可供读取,get_message() 将读取它,格式化消息并将其返回或将其传递给消息处理程序。如果没有要读取的数据,get_message() 将立即返回 None。这使得集成到应用程序内的现有事件循环中变得微不足道。

 while True:
     message = p.get_message()
     if message:
         # do something with the message
     time.sleep(0.001)  # be nice to the system :)

旧版本的 redis-py 只能使用 pubsub.listen() 读取消息。listen() 是一个阻塞直到消息可用的生成器。如果您的应用程序除了接收和处理从 redis 接收到的消息之外不需要做任何其他事情,listen() 是一种启动运行的简单方法。

 for message in p.listen():
     # do something with the message

第三个选项在单独的线程中运行事件循环。pubsub.run_in_thread() 创建一个新线程并启动事件循环。线程对象返回给 run_in_thread() 的调用者。调用者可以使用 thread.stop() 方法来关闭事件循环和线程。在幕后,这只是一个围绕 get_message() 的包装器,它在单独的线程中运行,本质上为您创建了一个微小的非阻塞事件循环。run_in_thread() 采用可选的 sleep_time 参数。如果指定,事件循环将使用循环的每次迭代中的值调用 time.sleep()。

注意:由于我们在单独的线程中运行,因此无法处理注册消息处理程序无法自动处理的消息。因此,如果您订阅了没有附加消息处理程序的模式或通道,redis-py 会阻止您调用 run_in_thread()。

p.subscribe(**{'my-channel': my_handler})
thread = p.run_in_thread(sleep_time=0.001)
# the event loop is now running in the background processing messages
# when it's time to shut it down...
thread.stop()

因此,要回答您的问题,只需在您想知道消息是否到达时检查 get_message。

于 2016-02-24T15:50:15.217 回答
17

新版本的 redis-py 支持异步 pubsub,详情请查看https://github.com/andymccurdy/redis-py。这是文档本身的一个示例:

while True:
    message = p.get_message()
    if message:
        # do something with the message
    time.sleep(0.001)  # be nice to the system :)
于 2014-06-26T12:38:13.947 回答
8

我不认为这是可能的。Channel 没有任何“当前数据”,您订阅了一个频道并开始接收其他客户端在该频道上推送的消息,因此它是一个阻塞 API。此外,如果您查看 pub/sub 的 Redis命令文档,它会更清楚。

于 2011-10-24T05:21:11.640 回答
8

这是一个线程阻塞侦听器的工作示例。

import sys
import cmd
import redis
import threading


def monitor():
    r = redis.Redis(YOURHOST, YOURPORT, YOURPASSWORD, db=0)

    channel = sys.argv[1]
    p = r.pubsub()
    p.subscribe(channel)

    print 'monitoring channel', channel
    for m in p.listen():
        print m['data']


class my_cmd(cmd.Cmd):
    """Simple command processor example."""

    def do_start(self, line):
        my_thread.start()

    def do_EOF(self, line):
        return True


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print "missing argument! please provide the channel name."
    else:
        my_thread = threading.Thread(target=monitor)
        my_thread.setDaemon(True)

        my_cmd().cmdloop()
于 2012-01-10T07:55:24.490 回答
3

这是一个没有线程的非阻塞解决方案:

fd = ps.connection._sock.fileno();
rlist,, = select.select([fd], [], [], 0) # or replace 0 with None to block
if rlist:
    for rfd in rlist:
        if fd == rfd:
            message = ps.get_message()

ps.get_message()本身就足够了,但我使用这种方法,以便我可以等待多个 fd 而不仅仅是 redis 连接。

于 2014-12-06T02:07:31.490 回答
1

要达到无阻塞代码,您必须执行另一种范例代码。这并不难,使用一个新线程来监听所有的变化,让主线程去做其他事情。

此外,您将需要一些机制来在主线程和 redis 订阅者线程之间交换数据。

于 2011-10-24T18:05:25.743 回答
1

最有效的方法是基于 greenlet 而不是基于线程。作为一个基于 greenlet 的并发框架,gevent 在 Python 世界中已经相当成熟。因此,gevent 与 redis-py 的集成会很棒。这正是 github 上本期讨论的内容:

https://github.com/andymccurdy/redis-py/issues/310

于 2013-01-24T16:15:36.953 回答
0

你可以使用 gevent、gevent 猴子补丁来构建一个非阻塞的 redis pubsub 应用程序。

于 2012-11-28T12:44:10.437 回答
0

Redis 的 pub/sub 向频道上订阅(收听)的客户端发送消息。如果您不听,您将错过消息(因此阻塞呼叫)。如果你想让它不阻塞,我建议改用队列(redis 也很擅长)。如果您必须使用 pub/sub,您可以按照建议使用 gevent 来拥有一个异步的阻塞侦听器,将消息推送到队列并使用单独的使用者以非阻塞方式处理来自该队列的消息。

于 2014-03-24T14:00:49.053 回答
0

这很简单。我们检查消息是否存在,并继续进行订阅,直到处理完所有消息。

import redis

r = redis.Redis(decode_responses=True)
subscription = r.pubsub()
subscription.psubscribe('channel')

r.publish('channel', 'foo')
r.publish('channel', 'bar')
r.publish('channel', 'baz')

message = subscription.get_message()
while message is not None:
  if message['data'] != 1:
      # Do something with message
      print(message)
  # Get next message
  message = subscription.get_message()
于 2021-11-01T11:23:56.170 回答