问题
我正在使用 Spring 的 Actuator Prometheus 端点输出的指标监控 Kafka 偏移滞后。在某些情况下,我注意到在执行大型回填作业时,我的一些节点(应用程序的 6 个实例)将停止更新指标,直到所有消息都被消耗完。有问题的节点仍在正确处理消息(如日志记录和最终结果所证明的那样)。在绘制我的指标时,这在偏移滞后中显示为一条平线,直到最后,当它们从悬崖边缘掉下时。
还值得注意的是,在指标按预期更新并且我们已经执行应用程序部署的情况下,我们可能会看到指标从按预期运行到一个或多个节点平坦,如上所述.
对我来说,这感觉像是某种不确定的竞争条件,可能通过我配置应用程序的方式来解释?
语境
我有一个使用多个主题的应用程序。除了一个主题之外,所有主题都具有共享消息格式,并且只有一个主题具有不同的格式,因为它是“重试”主题,因此消息是不同的。我还想将过滤器仅应用于常规主题,因此为了方便起见,我创建了两个单独的ConcurrentKafkaListenerContainerFactory
bean,例如:
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageA> messageAContainerFactory(
ConsumerFactory<Long, MessageA> consumerFactory) { ... }
@Bean
public ConcurrentKafkaListenerContainerFactory<Long, MessageB> messageBContainerFactory(
ConsumerFactory<Long, MessageB> consumerFactory) { ... }
然后我在相关@KafkaListener
注释中引用这些 bean:
@KafkaListener(
topics = "message-a-topic",
containerFactory = "messageAContainerFactory")
public void consumeMessageA(MessageA messageA) { ... }
@KafkaListener(
topics = "message-B-topic",
containerFactory = "messageBContainerFactory")
public void consumeMessageB(MessageB messageB) { ... }
除了这些豆子,没有什么其他的。Kafka 的应用程序配置如下所示:
spring:
kafka:
consumer:
auto-offset-reset: latest
max-poll-records: 10
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring:
deserializer:
value:
delegate:
class: org.springframework.kafka.support.serializer.JsonDeserializer
json:
value:
default:
type: com.example.MessageA
我应该强调,我没有注意到该功能有任何问题 - 消息似乎按预期被使用和处理。这似乎只会影响指标的更新方式。
任何意见,将不胜感激!