问题标签 [redis-streams]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
74 浏览

redis - reddison 客户端中的流监听器

我正在使用 Redisson 客户端并希望实现读取流的解决方案。在 pub/sub 模型中,有一个选项可以添加 MessageListener,它负责订阅主题,我们所要做的就是为其提供一个 impl,每次向主题发布新消息时都会执行该 impl。对于流,我希望有一个类似的侦听器,但似乎我们必须自己处理流的轮询。如果库中有可以做到这一点的东西,那就太好了。正在使用的版本:3.15.6

0 投票
2 回答
76 浏览

redis - Redis 流 NOMKSTREAM

NOMKSTREAMredis流中这是什么?我浏览了文档。太糟糕了,他们甚至没有提供关于它的简单描述。

https://redis.io/commands/xadd

有人可以解释一下吗?

0 投票
1 回答
61 浏览

redis - Redisson RStreams没有收到消息

我正在尝试使用 Reddison lib 将流与 redis 一起使用。我code StreamReadGroupArgs.neverDelivered()在接收消息时使用。但是,如果我不确认它,那么下次调用 readGroup 时它就不会收到这些消息。问题是如果节点崩溃或网络出现故障而我无法处理这些消息怎么办。然后,即使我没有确认它,我也不会再收到这些消息。是否有解决方法或我可以做的其他事情?

0 投票
1 回答
101 浏览

redis - Redis 流分区

我试图了解 redis 流如何进行分区,如果可以将特定消息发送到特定分区(类似于使用 Kafka 的方式)。

我检查了 redis-cli api ,没有什么类似于 partitioning ,在使用StackExchangeRedisredis 库时也没有。

唯一的方法是:IDatabase.StreamAdd(key,streamKey,streamValue,messageId)

我错过了什么吗?分区是否仅以固定方式完成?

PS如果可以分区,可以组成分区键吗?

0 投票
0 回答
33 浏览

spring - Spring data Redis - StreamOperations,配合MapRecord获取reverseRange

我正在尝试读取我在 redis 流中写入的数据,但我正在读取字节。我正在使用spring data redis,并且我已经配置了一个LettuceConnectionFactoryRedisTemplate<String, OutputSpecRest>bean。

写入流:

从流中读取:

如何从 redis 流中读取并放入我的OutputSpecRest自定义对象?

opsForStream为了继续该getRecordsHistory方法,我找不到关于 Redis 的更多文档。

0 投票
0 回答
339 浏览

node.js - 使用 NodeJs 中的消费者组从 Redis Stream 消费消息

Redis 消费者组是 Redis 中相对较新的功能,因此我找不到足够成熟的 npm 包来提供开箱即用的机制来使用消费者组读取消息。在 Spring Data Redis 中,onMessage(V message)接口StreamListener的每条新消息都会被回调。

我正在为 Redis 使用redis npm 包,我必须通过在无限循环中手动触发XREADGROUP命令来管理消息读取。参考
尽管 Spring redis 库不断地轮询服务器以获取新消息并在新消息上调用 onMessage 方法,但它由库本身而不是程序员负责。

来自Spring Data Redis

虽然 Pub/Sub 依赖于临时消息的广播(即,如果你不听,你会错过一条消息),Redis Stream 使用一种持久的、仅附加的数据类型,它会保留消息直到流被修剪。消费的另一个区别是 Pub/Sub 注册了一个服务器端订阅。Redis 将到达的消息推送到客户端,而 Redis 流需要主动轮询。

来自 Spring Data Redis 的代码片段

我想知道是否有任何流行的 npm 包,通过它我可以找到一种更好的方法来使用消费者组从流中读取消息。我正在寻找类似redis-streams-broker这个包对我来说似乎不可靠,因为它的下载量较少,不受管理且没有许可证

0 投票
1 回答
70 浏览

redis - Redis消费者组能否保证并行运行的多个消费者从同一个流中消费不同的独占子集?

目前,我有一个 redis 流,它产生了许多条目。我想并行处理流,但要确保每个条目只处理一次我搜索了关于 redis stream的官方文档。似乎消费者群体是一种解决方案。

所以我尝试创建一个消费者组,并让该组中的多个消费者并行消费相同的流(可能是不同服务器上的多个消费者实例或同一服务器上的多个线程)。

Redis 消费者组能否保证并行运行的多个消费者从同一个流中消费不同的独占子集,以确保每个条目只处理一次?如果可以保证,对于每个消费者,从流中读取时,xreadgroup group mygroup consumer1 [count 1000] streams mystream >就够了吗?

0 投票
1 回答
61 浏览

redis - redis流消费组的数量上限?

我们正在研究使用 redis 流作为集群范围的消息传递总线,其中集群中的每个节点都有一个唯一的 id。这个想法是,每个节点在生成时都会创建一个具有该唯一 ID 的消费者组到中央 redis 流,以保证集群中的每个节点都获得每条消息的副本。在一个编排的环境中,集群节点将在运行中生成和删除,每个节点都有一个唯一的 id。随着时间的推移,我可以看到这导致有 100 个甚至 1000 个旧的/未使用的消费者组都订阅了同一个 redis 流。

我的问题是——redis可以处理的消费者群体的数量是否有上限,大量(未使用的)消费者群体是否有任何实际处理成本?似乎消费者组只是存储在redis中的一个指针,它指向流中的最后一个读取条目,并且仅在该组的消费者执行范围XREADGROUP时才被访问。这将导致我假设(无需深入研究 Redis 代码)消费者组的数量确实无关紧要,除了消费者组指针会占用的少量 RAM。

现在,我知道我们应该更聪明,一个节点应该在它被杀死时删除它自己的消费者组,或者我们应该定期清理它,但是如果一个消费者组只是 redis 中的一条记录,我不确定值得付出努力 - 至少在 MVP 开发阶段。

TL;博士; 我的理解是否正确,给定流的消费者组数量没有实际限制,除非使用它们没有处理成本?

0 投票
1 回答
39 浏览

redis - 如何清除 Redis 中消费者组的 PEL 列表

我想知道如何在不单独确认每条消息的情况下清除给定 Redis 消费者组的 PEL 列表。

上下文 我的一个 Redis 流有一个消费者组。出于测试目的,我需要在不删除流的情况下清除消费者组的 PEL 列表。

0 投票
2 回答
57 浏览

redis - 从 Redis 流中获取来自 XPENDING 的所有剩余未决(PEL)消息

我想从 PEL(待处理条目列表)接收给定流和消费者组的所有待处理消息。

问题是 XPENDING 命令有一个强制计数变量。但我不知道事先有多少待处理的消息。我只想列出所有待处理的消息。那么如何正确处理呢?