0

我在使用 consumer.poll() 方法时遇到问题。使用 poll() 方法获取数据后,消费者将没有任何数据要提交,所以请帮我从 kafka 主题中删除特定数量的行。

4

1 回答 1

0

您需要确保在提交数据之前已完全处理数据,以避免在消费者失败的情况下“数据丢失”。

因此,如果启用auto.commit,请确保poll()在发出下一个之前完全处理完 a 之后的所有数据,poll()因为每个poll()隐式提交其上一个中的所有数据poll()

如果这不可能,您应该auto.commit在通过consumer.commit(...). 为此,请记住,您不需要单独提交每条消息,并且带有偏移量的提交会X隐式提交带有偏移量的所有消息< X(例如,在处理偏移量 5 的消息后,您提交偏移量 6 - 提交的偏移量不是上一条成功处理的消息,但您要处理的下一条消息)。并且偏移量 6 的提交,会提交偏移量 0 到 5 的所有消息。因此,在所有具有较小偏移量的消息被完全处理之前,您不应该提交偏移量 6。

于 2016-11-22T08:33:50.937 回答