我知道如何将 kafka 配置为从最早或最新消息中读取。如果我需要从以前的偏移量中读取,我们如何包含一个附加选项?我需要这样做的原因是由于之前处理逻辑中的一些错误,需要再次处理之前读取的消息。
问问题
388 次
2 回答
0
我正在尝试回答一个类似但不完全相同的问题,所以让我们看看我的信息是否可以帮助您。
简而言之,您想要提交您的偏移量,最常见的解决方案是 ZooKeeper。因此,如果您的消费者遇到错误或需要关闭,它可以从中断的地方恢复。
我自己我正在处理一个非常大的高容量流,我的消费者(用于测试)每次都需要从尾部开始。文档表明我必须使用KafkaConsumer seek来声明我的起点。
一旦它们成功且可靠,我将尝试在此处更新我的发现。可以肯定的是,这是一个已解决的问题。
于 2017-08-31T02:48:50.313 回答
0
在 java kafka 客户端中,有一些关于 kafka 消费者的方法,可用于指定下一个消费位置。
public void seek(TopicPartition 分区,长偏移)
覆盖消费者将在下一次轮询(超时)时使用的获取偏移量。如果多次为同一个分区调用此 API,则在下一次 poll() 中将使用最新的偏移量。请注意,如果在消费过程中任意使用此 API 来重置获取偏移量,您可能会丢失数据
这样就够了,还有seekToBeginning和seekToEnd。
于 2017-08-03T08:39:34.943 回答