我在使用 consumer.poll() 方法时遇到问题。使用 poll() 方法获取数据后,消费者将没有任何数据要提交,所以请帮我从 kafka 主题中删除特定数量的行。
问问题
436 次
1 回答
0
您需要确保在提交数据之前已完全处理数据,以避免在消费者失败的情况下“数据丢失”。
因此,如果启用auto.commit
,请确保poll()
在发出下一个之前完全处理完 a 之后的所有数据,poll()
因为每个poll()
隐式提交其上一个中的所有数据poll()
。
如果这不可能,您应该auto.commit
在通过consumer.commit(...)
. 为此,请记住,您不需要单独提交每条消息,并且带有偏移量的提交会X
隐式提交带有偏移量的所有消息< X
(例如,在处理偏移量 5 的消息后,您提交偏移量 6 - 提交的偏移量不是上一条成功处理的消息,但您要处理的下一条消息)。并且偏移量 6 的提交,会提交偏移量 0 到 5 的所有消息。因此,在所有具有较小偏移量的消息被完全处理之前,您不应该提交偏移量 6。
于 2016-11-22T08:33:50.937 回答