4

我正在使用 Spring 数据 Redis 从 Redis 流中消费,使用反应式流接收器来侦听消费者组的工作,但观察到 Flux 流有时会过早关闭并且不再收听新消息并且通量终止过早地。

代码

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder() 
            .build();
 StreamReceiver.create(reactiveConnFactory, options)
            .receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
                        "CONSUMER_STREAM",
                        ReadOffset.lastConsumed()))
            .doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
            .flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
                  .subscribeOn(streamConsumerExecutor))
            .onErrorResume(t -> Flux.empty())
            .doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
            .doOnComplete(() -> {
               LOG.info("Consumer Stream Completed");
            })
            .doOnTerminate(() -> {
               LOG.info("Consumer Stream terminated");
            }) 
            .subscribe();


从流中读取消息一段时间后,获取“消费者流终止”的日志

版本:2.2.0.RELEASE

这是一个错误还是我错过了什么,有人可以帮忙吗?

更新

看起来 redis 命令正在超时,因为我收到了 RedisCommandTimeoutException,有没有办法在此类错误上重试流式处理而不是取消它。还发现它发生在 XREADGROUP 操作中,尽管通过 nodejs redis-cli 运行时发出相同的命令工作正常?

4

0 回答 0