我有一个 redis 流“mystream”,我正在尝试使用:
XREAD 计数 5 块 50000 流 mystream $
读取附加到 mystream 的五个新条目,阻塞 50 秒。一旦我执行命令以使用以下命令将数据添加到 mystream:
XADD mystream a 5 b 6
被阻止的命令以单个条目作为输出退出。
有没有什么方法可以使用 XREAD 命令接收从现在到 50 年代附加的多个条目,或者我是否必须多次调用 XREAD 才能达到相同的效果?
我有一个 redis 流“mystream”,我正在尝试使用:
XREAD 计数 5 块 50000 流 mystream $
读取附加到 mystream 的五个新条目,阻塞 50 秒。一旦我执行命令以使用以下命令将数据添加到 mystream:
XADD mystream a 5 b 6
被阻止的命令以单个条目作为输出退出。
有没有什么方法可以使用 XREAD 命令接收从现在到 50 年代附加的多个条目,或者我是否必须多次调用 XREAD 才能达到相同的效果?
您要求的是不适合使用XREAD BLOCK
. 我建议使用XREVRANGE命令:
xrevrange mystream + - COUNT 5
min:“+” - 表示流的最高 id,max:“-” - 表示流的最低 id。除了“-”,您还可以传递一个 TIMESTAMP,它将您的XREVRANGE
结果限制在所需的时间到TIMESTAMP
几秒钟前(包括在内)。
xrevrange mystream + 1596617691455 COUNT 5
上面的建议没有实现相同的blocking
行为。如果您仍然想对传入的数据做出反应,您可以查看Redis Pub/Sub。订阅一个主题,并编写自己的逻辑来处理传入的数据。有一个网络聊天的例子。
您还可以使用XREAD BLOCK
来检测命令的最小值XREVRANGE
:
XREAD COUNT 5 BLOCK 50000 STREAMS mystream $
XREVRANGE mystream + [returned_xread_key] COUNT 5
很抱歉,我无法提供 C 语言的示例,这里是 Python:
import redis
import time
redis_client = redis.StrictRedis("0.0.0.0", decode_responses=True)
response = redis_client.xread({'mystream': "$"}, count=1, block=50000)
if len(response) > 0:
min_value = response[0][1][0][0] # [['mystream', [('1596618943439-0', {'mykey': 'myval'})]]]
time.sleep(50) # wait for the end of the window
entries = redis_client.xrevrange('mystream', max="+", min=min_value, count=5)