1

我启用了将日志保存到 S3 的检查点。如果检查点目录中没有文件,火花流工作正常,我可以看到检查点目录中出现日志文件。然后我杀死火花流并重新启动它。这一次,我开始为 Spark 会话获取 NullPointerException。简而言之,如果检查点目录中没有日志文件,则火花流可以正常工作。但是,一旦我在检查点目录中重新启动带有日志文件的火花流式传输,我就会开始在火花会话上收到空指针异常。下面是代码:

object asf {
  val microBatchInterval = 5
  val sparkSession = SparkSession
    .builder()
    .appName("Streaming")
    .getOrCreate()

    val conf = new SparkConf(true)
    //conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    val sparkContext = SparkContext.getOrCreate(conf)


  val checkpointDirectory = "s3a://bucketname/streaming-checkpoint"

  println("Spark session: " + sparkSession)

  val ssc = StreamingContext.getOrCreate(checkpointDirectory,
    () => {
      createStreamingContext(sparkContext, microBatchInterval, checkpointDirectory, sparkSession)
    }, s3Config.getConfig())

  ssc.start()
  ssc.awaitTermination()
}

  def createStreamingContext(sparkContext: SparkContext, microBatchInterval: Int, checkpointDirectory: String,spark:SparkSession): StreamingContext = {
    println("Spark session inside: " + spark)
    val ssc: org.apache.spark.streaming.StreamingContext = new StreamingContext(sparkContext, Seconds(microBatchInterval))
    //TODO: StorageLevel.MEMORY_AND_DISK_SER
    val lines = ssc.receiverStream(new EventHubClient(StorageLevel.MEMORY_AND_DISK_SER);
    lines.foreachRDD {
      rdd => {
        val df = spark.read.json(rdd)
        df.show()
      }
    }
    ssc.checkpoint(checkpointDirectory)
    ssc
  }
}  

再一次,我第一次运行这段代码时(检查点目录中没有日志文件),我可以看到数据框被打印出来。如果我在检查点目录中运行日志文件,我什至看不到

println("Spark session inside: " + spark)

打印出来,第一次打印出来。错误:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)

错误发生在:

val df = spark.read.json(rdd)

编辑:我添加了这一行:

conf.set("spark.streaming.stopGracefullyOnShutdown","true")

它仍然没有任何区别,仍然得到 NullPointerException。

4

2 回答 2

1

要回答我自己的问题,这是可行的:

lines.foreachRDD {
  rdd => {
    val sqlContext:SQLContext = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate().sqlContext

    val df = sqlContext.read.json(rdd)
    df.show()
  }
}

传递从 rdd.sparkContext 构建的火花会话有效

于 2017-09-14T00:42:03.847 回答
0

只是为了新手的利益明确地说,这是一种反模式。不允许在转换中创建数据集!

正如 Michel 提到的,执行者将无法访问 SparkSession

于 2018-07-10T17:11:18.363 回答