我看到一个问题,一段时间后,消费者随机停止消费来自主题的消息,并且没有抛出任何错误。该主题中没有太多消息,因为这是一个测试环境。
该主题的消费者滞后将> 0,并且会无限期地停留在该数字上(就像没有消费者消费它一样)。我使用 kafka 工具检查了该主题,我可以看到该分区已分配给消费者。
我正在运行多个消费者线程来消费一个主题,这里是代码:
public Runnable getRunnable() {
return () -> {
final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate =
kafkaLib.createKafkaConsumerTemplate();
kafkaConsumerTemplate.receive()
.concatMap(record -> {
// process message and commit offset
})
.subscribe();
};
}
@Override
public void run(ApplicationArguments args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; i++) {
executorService.execute(getRunnable());
}
}
我的问题:如果一段时间内没有消息推送到该主题,kafkaConsumperTemplate.receive() 是否有可能自行停止/退出/取消订阅?或者订阅结束后线程会以某种方式退出?
使用反应式 kafka 消费者,没有更多的@KafkaListener
注释,所以我使用 ApplicationRunner 并产生消费者线程。是否有另一种启动反应式 kafka 消费者的正确方法?