0

在我的 Spring Boot 项目中,我有许多 Spring Kafka 消费者,我添加了一些事件侦听器来监控这些消费者的健康状况。这是代码:

@Component
public class ApplicationContextListeningService {

    @EventListener
    public void handleConsumerPausedEvent(ConsumerPausedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERPAUSEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerResumedEvent(ConsumerResumedEvent event) {
        LOGGER_ERROR.warn(WARNING_KAFKA_CONSUMERRESUMEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleConsumerStoppedEvent(ConsumerStoppedEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_CONSUMERSTOPPEDEVENT + event.getSource() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleListenerContainerIdleEvent(ListenerContainerIdleEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_LISTENERCONTAINERIDLEEVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

    @EventListener
    public void handleNonResponsiveConsumerEvent(NonResponsiveConsumerEvent event) {
        LOGGER_ERROR.error(ERROR_KAFKA_NONRESPONSIVECONSUMEREVENT + event.getListenerId() + LOG_MSG_DELIMITER + event.toString());
    }

}

有谁知道在什么情况下会抛出这些事件(也许我如何手动触发这些事件以进行测试)?对于最后三个事件(ConsumerStoppedEvent、ListenerContainerIdleEvent 和 NonResponsiveConsumerEvent),当我得到其中一个时,是否需要人工干预来解决问题(例如重新启动服务器以再次创建消费者)?谢谢!

4

1 回答 1

1

您可以通过将 Mock 消费者工厂注入容器来模拟它们。

  • ConsumerStoppedEventstop()当你容器时发出。
  • ListenerContainerIdleEvent只是意味着没有收到任何记录,idleEventInterval所以通常并不意味着有问题。
  • NonResponsiveConsumerEvent- 很难说;对于较旧的客户端,poll()如果服务器关闭,则会阻塞,因此我们无法发出空闲事件(或做任何事情)。

我不知道您是否仍然可以通过更新的客户获得它们;但是要模拟它,您只需要在模拟使用者poll()方法中阻塞足够长的时间,以便监控任务检测到问题并发出事件。

于 2019-05-06T22:10:58.583 回答