0

我看到一个问题,一段时间后,消费者随机停止消费来自主题的消息,并且没有抛出任何错误。该主题中没有太多消息,因为这是一个测试环境。

该主题的消费者滞后将> 0,并且会无限期地停留在该数字上(就像没有消费者消费它一样)。我使用 kafka 工具检查了该主题,我可以看到该分区已分配给消费者。

我正在运行多个消费者线程来消费一个主题,这里是代码:

  public Runnable getRunnable() {
    return () -> {
      final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate = 
          kafkaLib.createKafkaConsumerTemplate();
      kafkaConsumerTemplate.receive()
          .concatMap(record -> {
            // process message and commit offset

          })
          .subscribe();
    };
  }

  @Override
  public void run(ApplicationArguments args) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    for (int i = 0; i < 2; i++) {
      executorService.execute(getRunnable());
    }

}

我的问题:如果一段时间内没有消息推送到该主题,kafkaConsumperTemplate.receive() 是否有可能自行停止/退出/取消订阅?或者订阅结束后线程会以某种方式退出?

使用反应式 kafka 消费者,没有更多的@KafkaListener注释,所以我使用 ApplicationRunner 并产生消费者线程。是否有另一种启动反应式 kafka 消费者的正确方法?

4

0 回答 0