一旦我收到来自 kafka 的消息,我需要运行一个长时间运行的进程(最多需要 20 秒),只有当这个进程完成时,我才需要认为一条消息是成功的。
我还需要确保每条消息至少处理一次。
考虑使用具有以下属性的 KafkaMessageListenerContainer:
listenerTaskExecutor 的 ThreadPoolTaskExecutor
使用 AcknowledgeingMessageListener 类型的 MessageListener
将确认模式设置为 MANUL_IMMEDIATE。
但我唯一的问题是,如果首先成功处理偏移量为 15 的特定消息,但仍在处理 14 的消息,会发生什么情况。所以在这种情况下,我的偏移量将更新为 15 ,即使 14 尚未处理
如何处理这类情况?