我正在使用 Spark Kafka 直接流式传输来自 Kafka 的消息。我想实现零消息丢失,重新启动 spark 后,它必须从 Kafka 读取丢失的消息。我正在使用检查点来保存所有读取偏移量,以便下次 spark 将从存储的偏移量开始读取。这是我的理解。
我使用了下面的代码。我停止了我的火花,向卡夫卡推送了一些信息。重新启动未从 Kafka 读取丢失消息的火花后。Spark 读取来自 kafka 的最新消息。如何读取来自 Kafka 的遗漏消息?
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(6000))
ssc.checkpoint("C:/cp")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val ssc = new StreamingContext(spark.sparkContext, Milliseconds(50))
val msgStream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
注意:应用程序日志显示 auto.offset.reset 为none而不是latest。为什么 ?
WARN KafkaUtils: overriding auto.offset.reset to none for executor
SBT
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"
Windows 7的