1

我有应用程序,它实现了来自 Confluent REST Proxy for Kafka 的GET /topics/(string: topic_name)/partitions/(int: partition_id)/messages?offset=(int)[&count=(int)] 之类的 API。

所以,我有一群消费者。我的 API 处理程序非常简单:

  1. 从池中获取消费者
  2. consumer.assign(util.Arrays.asList(partition))
  3. consumer.seek(partition, startOffset)
  4. consumer.poll(Duration.ofMillis(300L))
  5. consumer.unsubscribe()
  6. 将消费者返回池

所以,我的解决方案在几天内效果很好。然后,发生了一些事情,并且poll()总是返回空的记录列表。

我可以解决此问题以重新启动我的应用程序。另外,我可以启动新的应用程序实例,这个实例可以从 Kakfa 读取记录,所以 Kafka 是活着的。

4

0 回答 0