在我们的集群中,我们有 Kafka 0.10.1 和 Spark 2.1.0。火花流应用程序与检查点机制(HDFS 上的检查点)一起工作得很好。但是,我们注意到,如果代码发生更改,使用检查点流应用程序不会重新启动。
探索 Spark Streaming 文档 - 在 Kafka 上存储偏移量:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself,上面写着:
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
在此之后,我修改了我们的代码,如下所示:
val kafkaMap:Map[String,Object] = KakfaConfigs
val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array("topicName"),kafkaMap))
stream.foreach { rdd =>
val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges
// Filter out the values which have empty values and get the tuple of type
// ( topicname, stringValue_read_from_kafka_topic)
stream.map(x => ("topicName",x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _)
// Sometime later, after outputs have completed.
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
def processRDD(rdd:RDD[(String,String)]) {
// Process futher to hdfs
}
现在,当我尝试启动 Streaming 应用程序时,它没有启动并查看日志,这是我们看到的:
java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)
at org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:29)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:545)
如果我们遗漏了什么,有人可以建议吗?