我目前正在开发一组微服务,这些微服务通过使用 Kafka 进行通信,更具体地说是流。
在大多数情况下和在开发环境中,一切似乎都运行良好,没有任何问题,但是,在暂存环境中,我遇到了我无法理解为什么会发生的行为。
在某些情况下,应用程序会多次处理通过流使用者接收到的单个(或多个)消息。
似乎每当消息的实际处理(即应用程序逻辑)需要一些时间完成时就会发生这种情况——考虑到它涉及 I/O 和其他繁重的操作,它可能会这样做。
我对 Kafka 相当陌生,但据我了解,这与我的消费者没有足够快地提交偏移量有关,这反过来又不会将其标记为正在处理。我试图找出可能会缓解此问题的配置设置,但坦率地说,我不明白需要做什么。
作为参考,这是我的 Stream 的配置:
Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
该应用程序分别使用 Micronaut 和 Java,最新版本和版本 11 开发。
如果有人可以提出解决此问题的方法,那将非常有帮助。