在我的 Spring Boot 项目中,我有许多 Spring Kafka 消费者,我添加了一些事件侦听器来监控这些消费者的健康状况。这是代码:
@Component
public class ApplicationContextListeningService {
@EventListener
public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
}
@EventListener
public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
}
}
有谁知道在什么情况下会抛出这些事件(也许我如何手动触发这些事件以进行测试)?对于最后三个事件(ConsumerStoppedEvent、ListenerContainerIdleEvent 和 NonResponsiveConsumerEvent),当我得到其中一个时,是否需要人工干预来解决问题(例如重新启动服务器以再次创建消费者)?谢谢!