我的应用程序使用 max.poll.records: 1 的 1 条消息的平均处理持续时间在 5-15 分钟之间的消息。
当没有消息时,有 1 个 pod (K8S),对于每条传入的消息,我正在扩展 pod,将 1 个 pod 增加到最多 50 个。(我有 50 个分区)
现在我有几个问题:
- 当一个新的 pod 出现时,我需要很长时间才能看到分配的分区。我可以看到 pod 在收到 1 条消息之前几分钟就可以启动和终止。
- 我可以看到当很多消息插入到主题中时(超过 10 条(对于这个应用程序来说很多))它开始得到 commitException 并因此在不同的 pod 上使用相同的消息两次。
错误:
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1361) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1063) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_252]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_252]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1109) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:976) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1511) ~[kafka-clients-2.5.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2311) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2306) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2292) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2106) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1097) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-2.5.6.RELEASE.jar!/:2.5.6.RELEASE]
... 3 common frames omitted
我的卡夫卡配置是:
spring.kafka:
bootstrap-servers: {{ .Values.kafka.service }}:9092
consumer:
client-id: XXX
group-id: YYY
auto-offset-reset: earliest
enable-auto-commit: false
fetch-min-size: 1
fetch-max-wait: 1
properties:
heartbeat.interval.ms: 3000
max.poll.interval.ms: 1200000
max.poll.records: 1
producer:
client-id: XXX