我正在使用 Spring 数据 Redis 从 Redis 流中消费,使用反应式流接收器来侦听消费者组的工作,但观察到 Flux 流有时会过早关闭并且不再收听新消息并且通量终止过早地。
代码
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder()
.build();
StreamReceiver.create(reactiveConnFactory, options)
.receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
"CONSUMER_STREAM",
ReadOffset.lastConsumed()))
.doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
.flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
.subscribeOn(streamConsumerExecutor))
.onErrorResume(t -> Flux.empty())
.doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
.doOnComplete(() -> {
LOG.info("Consumer Stream Completed");
})
.doOnTerminate(() -> {
LOG.info("Consumer Stream terminated");
})
.subscribe();
从流中读取消息一段时间后,获取“消费者流终止”的日志
版本:2.2.0.RELEASE
这是一个错误还是我错过了什么,有人可以帮忙吗?
更新
看起来 redis 命令正在超时,因为我收到了 RedisCommandTimeoutException,有没有办法在此类错误上重试流式处理而不是取消它。还发现它发生在 XREADGROUP 操作中,尽管通过 nodejs redis-cli 运行时发出相同的命令工作正常?