3

Redis 消费者组是 Redis 中相对较新的功能,因此我找不到足够成熟的 npm 包来提供开箱即用的机制来使用消费者组读取消息。在 Spring Data Redis 中,onMessage(V message)接口StreamListener的每条新消息都会被回调。

我正在为 Redis 使用redis npm 包,我必须通过在无限循环中手动触发XREADGROUP命令来管理消息读取。参考
尽管 Spring redis 库不断地轮询服务器以获取新消息并在新消息上调用 onMessage 方法,但它由库本身而不是程序员负责。

来自Spring Data Redis

虽然 Pub/Sub 依赖于临时消息的广播(即,如果你不听,你会错过一条消息),Redis Stream 使用一种持久的、仅附加的数据类型,它会保留消息直到流被修剪。消费的另一个区别是 Pub/Sub 注册了一个服务器端订阅。Redis 将到达的消息推送到客户端,而 Redis 流需要主动轮询。

来自 Spring Data Redis 的代码片段

private void doLoop(K key) {

    do {

        try {

            // allow interruption
            Thread.sleep(0);

            List<V> read = readFunction.apply(key, pollState.getCurrentReadOffset());

            for (V message : read) {

                listener.onMessage(message);
                pollState.updateReadOffset(message.getId().getValue());
            }
        } catch (InterruptedException e) {

            cancel();
            Thread.currentThread().interrupt();
        } catch (RuntimeException e) {

            if (cancelSubscriptionOnError.test(e)) {
                cancel();
            }

            errorHandler.handleError(e);
        }
    } while (pollState.isSubscriptionActive());
}

我想知道是否有任何流行的 npm 包,通过它我可以找到一种更好的方法来使用消费者组从流中读取消息。我正在寻找类似redis-streams-broker这个包对我来说似乎不可靠,因为它的下载量较少,不受管理且没有许可证

4

0 回答 0