1

在我们的集群中,我们有 Kafka 0.10.1 和 Spark 2.1.0。火花流应用程序与检查点机制(HDFS 上的检查点)一起工作得很好。但是,我们注意到,如果代码发生更改,使用检查点流应用程序不会重新启动。

探索 Spark Streaming 文档 - 在 Kafka 上存储偏移量:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself,上面写着:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

在此之后,我修改了我们的代码,如下所示:

val kafkaMap:Map[String,Object] = KakfaConfigs

val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array("topicName"),kafkaMap))

stream.foreach { rdd =>
    val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges

    // Filter out the values which have empty values and get the tuple of type 
        // ( topicname, stringValue_read_from_kafka_topic)
    stream.map(x => ("topicName",x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _)

    // Sometime later, after outputs have completed.
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


def processRDD(rdd:RDD[(String,String)]) {
 // Process futher to hdfs 
}

现在,当我尝试启动 Streaming 应用程序时,它没有启动并查看日志,这是我们看到的:

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:65)
    at org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:29)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:546)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:545)

如果我们遗漏了什么,有人可以建议吗?

4

0 回答 0