1

如果我使用 KafkaUtils.createDirectStream 读取消息,如何在 Kafka 中存储消息偏移量。每次应用程序关闭时,Kafka 都会丢失偏移值。然后它正在读取 auto.offset.reset 中提供的值(这是最新的)并且无法在应用程序的停止-启动间隔内读取消息。

4

1 回答 1

1

您可以通过手动提交偏移量来避免这种情况。将 enable.auto.commit 设置为 false ,然后在成功操作后使用下面的代码提交 kafka 中的偏移量。

  var offsetRanges = Array[OffsetRange]()

          val valueStream = stream.transform {
            rdd =>
              offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
              rdd
          }.map(_.value())
//operation
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

您还可以阅读此文档,这将使您更好地了解偏移管理https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

于 2019-01-14T07:15:27.573 回答