1

我的应用程序使用 max.poll.records: 1 的 1 条消息的平均处理持续时间在 5-15 分钟之间的消息。

当没有消息时,有 1 个 pod (K8S),对于每条传入的消息,我正在扩展 pod,将 1 个 pod 增加到最多 50 个。(我有 50 个分区)

现在我有几个问题:

  1. 当一个新的 pod 出现时,我需要很长时间才能看到分配的分区。我可以看到 pod 在收到 1 条消息之前几分钟就可以启动和终止。
  2. 我可以看到当很多消息插入到主题中时(超过 10 条(对于这个应用程序来说很多))它开始得到 commitException 并因此在不同的 pod 上使用相同的消息两次。

错误:

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1361) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1063) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_252]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_252]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_252]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1109) ~[kafka-clients-2.5.0.jar!/:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:976) ~[kafka-clients-2.5.0.jar!/:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1511) ~[kafka-clients-2.5.0.jar!/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2311) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2306) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2292) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2106) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1097) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
    ... 3 common frames omitted

我的卡夫卡配置是:

spring.kafka:
  bootstrap-servers: {{ .Values.kafka.service }}:9092
  consumer:
    client-id: XXX
    group-id: YYY
    auto-offset-reset: earliest
    enable-auto-commit: false
    fetch-min-size: 1
    fetch-max-wait: 1
    properties:
      heartbeat.interval.ms: 3000
      max.poll.interval.ms: 1200000
      max.poll.records: 1

  producer:
    client-id: XXX
4

0 回答 0