1

一旦我收到来自 kafka 的消息,我需要运行一个长时间运行的进程(最多需要 20 秒),只有当这个进程完成时,我才需要认为一条消息是成功的。

我还需要确保每条消息至少处理一次。

考虑使用具有以下属性的 KafkaMessageListenerContainer:

  1. listenerTaskExecutor 的 ThreadPoolTask​​Executor

  2. 使用 AcknowledgeingMessageListener 类型的 MessageListener

  3. 将确认模式设置为 MANUL_IMMEDIATE。

但我唯一的问题是,如果首先成功处理偏移量为 15 的特定消息,但仍在处理 14 的消息,会发生什么情况。所以在这种情况下,我的偏移量将更新为 15 ,即使 14 尚未处理

如何处理这类情况?

4

1 回答 1

2

你不能那样做;将提交更高的偏移量。

如果您使用单个分区,则需要在同一线程上处理每个请求或管理应用程序中的状态以避免在存在间隙时提交偏移量。

这就是卡夫卡的工作方式。

一个更简单的解决方案是对数据进行分区;偏移量由分区维护。使用 a ConcurrentMessageListenerContainer,分区将分布在线程中;您不得在侦听器中使用执行程序。这样,容器可以在处理每个分区时提交偏移量(AckMode.RECORD)。

只需创建至少具有满足并发要求的分区数量的主题 - 但通常最好对主题进行过度分区。

如果您使用代理分区分配,您应该确保将会话超时属性设置为安全地大于您预期的最大 20 秒,以避免分区重新平衡。但是,只要您不使用自动提交,如果您的侦听器花费的时间过长,容器就会暂停消费者。

于 2016-08-15T12:52:46.023 回答