0

我目前正在开发一组微服务,这些微服务通过使用 Kafka 进行通信,更具体地说是流。

在大多数情况下和在开发环境中,一切似乎都运行良好,没有任何问题,但是,在暂存环境中,我遇到了我无法理解为什么会发生的行为。

在某些情况下,应用程序会多次处理通过流使用者接收到的单个(或多个)消息。

似乎每当消息的实际处理(即应用程序逻辑)需要一些时间完成时就会发生这种情况——考虑到它涉及 I/O 和其他繁重的操作,它可能会这样做。

我对 Kafka 相当陌生,但据我了解,这与我的消费者没有足够快地提交偏移量有关,这反过来又不会将其标记为正在处理。我试图找出可能会缓解此问题的配置设置,但坦率地说,我不明白需要做什么。

作为参考,这是我的 Stream 的配置:

Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

该应用程序分别使用 Micronaut 和 Java,最新版本和版本 11 开发。

如果有人可以提出解决此问题的方法,那将非常有帮助。

4

1 回答 1

0

afaik你应该看看:

session.timeout.ms

使用 Kafka 的组管理工具时用于检测客户端故障的超时。客户端定期发送心跳以向代理指示其活跃性。如果在此会话超时到期之前代理未收到任何心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在代理配置中 group.min.session.timeout.ms 和 group.max.session.timeout.ms 配置的允许范围内。

请求.timeout.ms

配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败。

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

如果处理后不需要提交,也可以考虑异步处理,但如果处理失败,则没有自动选项来重新处理消息

于 2021-04-02T19:41:33.240 回答