0

我已经编写了一个 Spark Streaming 应用程序,它需要对具有底层转换的各种 Dstream 进行一些检查,如该线程中所建议的(启动 Spark 流式传输上下文时出错),我已经在定义的函数中完成了所有转换以创建上下文,

object StreamingEngine2 {

  val filterF = { (x: Path) => true } 

  // === Configuration to control the flow of the application ===
  val stopActiveContext = true

  val batchIntervalSeconds = 10
  val eventsPerSecond = 10 // For the dummy source



  var newContextCreated = false // Flag to detect whether new context was created or not

  // Function to create a new StreamingContext and set it up
  val creatingFunc = { () =>

    //add winutil to avoid: ERROR org.apache.spark.streaming.scheduler.JobScheduler - Error generating jobs for time 1471237080000 ms java.lang.NullPointerException
    System.setProperty("hadoop.home.dir", "C:\\hadoop")
    println("Creating function called to create new StreamingContext")

    val conf = new SparkConf().setMaster("local[10]").setAppName("FileStreaming").set("spark.streaming.fileStream.minRememberDuration", "2000000h") /*.set("SPARK_CONF_DIR","src/main/resources")*/.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable],
      classOf[org.apache.hadoop.io.Text], classOf[GGSN]))


    // Verify that the attached Spark cluster is 1.4.0+
    val sc = new SparkContext(conf)
    require(sc.version.replace(".", "").substring(0, 3).toInt >= 160, "Spark 1.6.0+ is required to run this code. Please attach it to a Spark 1.6.0+ cluster.")



    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))
    ssc.checkpoint("c:\\checkpoints")


    val ggsnFileLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\GGSN\\Files", filterF, false)
    val ccnFIleLines = ssc.fileStream[LongWritable, Text, TextInputFormat]("C:\\Users\\Mbazarganigilani\\Documents\\RA\\CCN\\Files1", filterF, false)



    //some mapping and transfomration

    probeFileLines.checkpoint(Duration(batchIntervalSeconds*1000*5))
    ggsnFileLines.checkpoint(Duration(batchIntervalSeconds*1000*5))


    //check GGSSN...
    probeFileLines.foreachRDD(s=>
    {

        println(s.count())


   }



    )


    ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively



    newContextCreated = true
    ssc
  }

  def main(args: Array[String]): Unit = {



    System.setProperty("hadoop.home.dir", "C:\\hadoop")

    if (stopActiveContext) {
      StreamingContext.getActive.foreach {
        _.stop(stopSparkContext = false)
      }
    }

    val ssc=StreamingContext.getOrCreate("c:\\checkpoints", creatingFunc)


    if (newContextCreated) {
      println("New context created from currently defined creating function")
    } else {
      println("Existing context running or recovered from checkpoint, may not be running currently defined creating function")
    }

    // Start the streaming context in the background.
    ssc.start()

    // This is to ensure that we wait for some time before the background streaming job starts. This will put this cell on hold for 5 times the batchIntervalSeconds.
    ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2 * 1000*1000)


  }


}

但是,当我的上下文从检查点目录加载时,我仍然会遇到异常

16/08/19 22:28:01 ERROR Utils: Exception encountered
java.lang.NullPointerException
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply$mcV$sp(DStreamCheckpointData.scala:126)
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
    at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$writeObject$1.apply(DStreamCheckpointData.scala:124)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
    at org.apache.spark.streaming.dstream.DStreamCheckpointData.writeObject(DStreamCheckpointData.scala:124)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply$mcV$sp(DStream.scala:516)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:511)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$writeObject$1.apply(DStream.scala:511)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
    at org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:511)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:182)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:177)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply(DStreamGraph.scala:177)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
    at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:177)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:143)
    at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:143)
    at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:143)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
    at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:144)
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:525)
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
    at StreamingEngine2$.main(StreamEngine2.scala:694)
    at StreamingEngine2.main(StreamEngine2.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

谁能帮我在我的代码中解决这个问题,我在 Spark 1.6 和 2.0 中都尝试过我的代码,但得到了同样的异常。

4

1 回答 1

1

好的,问题是由于多个流引起的,我每次都应该对它们调用 count ..仍然不确定我应该在哪里调用 count 以及应该在哪里为每个流放置检查点..

于 2016-08-20T06:38:13.290 回答