我是 Scala 和 RDD 概念的新手。在 Spark 中使用 Kafka 流 api 从 kafka 读取消息并尝试在业务工作后提交。但我收到错误。
注意:使用重新分区进行并行工作
如何从流 APi 中读取偏移量并将其提交给 Kafka?
scalaVersion := "2.11.8" val sparkVersion = "2.2.0" val connectorVersion = "2.0.7" val kafka_stream_version = "1.6.3"
代码
val ssc = new StreamingContext(spark.sparkContext, Seconds(2))
ssc.checkpoint("C:/Gnana/cp")
val kafkaStream = {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "ignite3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val numPartitionsOfInputTopic = 2
val streams = (1 to numPartitionsOfInputTopic) map {
_ => KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ).map(_.value())
}
val unifiedStream = ssc.union(streams)
val sparkProcessingParallelism = 1
unifiedStream.repartition(sparkProcessingParallelism)
}
//Finding offsetRanges
kafkaStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
//do business operation and persist offset to kafka
kafkaStream.foreachRDD(rdd=> {
println("offsetRanges:"+offsetRanges)
rdd.foreach(conRec=> {
println(conRec)
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
})
println(" Spark parallel reader is ready !!!")
ssc.start()
ssc.awaitTermination()
}
错误
java.io.NotSerializableException:org.apache.spark.streaming.dstream.TransformedDStream 的对象可能作为 RDD 操作闭包的一部分被序列化。这是因为 DStream 对象是从闭包中引用的。请重写此 DStream 中的 RDD 操作以避免这种情况。这是为了避免 Spark 任务因不必要的对象而膨胀。在 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1 的 org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:525)。 apply(DStream.scala:512) at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:512) at org.apache.spark.util.Utils$.tryOrIOException(Utils .scala:1303) 在 org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala: