如果我使用 KafkaUtils.createDirectStream 读取消息,如何在 Kafka 中存储消息偏移量。每次应用程序关闭时,Kafka 都会丢失偏移值。然后它正在读取 auto.offset.reset 中提供的值(这是最新的)并且无法在应用程序的停止-启动间隔内读取消息。
问问题
238 次
1 回答
1
您可以通过手动提交偏移量来避免这种情况。将 enable.auto.commit 设置为 false ,然后在成功操作后使用下面的代码提交 kafka 中的偏移量。
var offsetRanges = Array[OffsetRange]()
val valueStream = stream.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(_.value())
//operation
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
您还可以阅读此文档,这将使您更好地了解偏移管理https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/
于 2019-01-14T07:15:27.573 回答