0

我有一个 redis 流“mystream”,我正在尝试使用:

XREAD 计数 5 块 50000 流 mystream $

读取附加到 mystream 的五个新条目,阻塞 50 秒。一旦我执行命令以使用以下命令将数据添加到 mystream:

XADD mystream a 5 b 6

被阻止的命令以单个条目作为输出退出。

有没有什么方法可以使用 XREAD 命令接收从现在到 50 年代附加的多个条目,或者我是否必须多次调用 XREAD 才能达到相同的效果?

4

1 回答 1

0

您要求的是不适合使用XREAD BLOCK. 我建议使用XREVRANGE命令:

xrevrange mystream + - COUNT 5

min:“+” - 表示流的最高 id,max:“-” - 表示流的最低 id。除了“-”,您还可以传递一个 TIMESTAMP,它将您的XREVRANGE结果限制在所需的时间到TIMESTAMP几秒钟前(包括在内)。

xrevrange mystream + 1596617691455 COUNT 5

上面的建议没有实现相同的blocking行为。如果您仍然想对传入的数据做出反应,您可以查看Redis Pub/Sub。订阅一个主题,并编写自己的逻辑来处理传入的数据。有一个网络聊天的例子

您还可以使用XREAD BLOCK来检测命令的最小值XREVRANGE

XREAD COUNT 5 BLOCK 50000 STREAMS mystream $
XREVRANGE mystream + [returned_xread_key] COUNT 5

很抱歉,我无法提供 C 语言的示例,这里是 Python:

import redis
import time
redis_client = redis.StrictRedis("0.0.0.0", decode_responses=True)
response = redis_client.xread({'mystream': "$"}, count=1, block=50000)
if len(response) > 0:
    min_value = response[0][1][0][0] # [['mystream', [('1596618943439-0', {'mykey': 'myval'})]]]
    time.sleep(50) # wait for the end of the window
    entries = redis_client.xrevrange('mystream', max="+", min=min_value, count=5)
于 2020-08-05T09:23:55.713 回答