0

当从检查点恢复失败的作业时,应用程序逻辑被正确调用并且 RDD 被重新实例化,但是对 RDD.map 的调用会导致 NullPointerException。

lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)

private def createStreamingContext: StreamingContext = {
  val ssc = new StreamingContext(spark.sparkContext, batchInterval)
  ssc.checkpoint(checkpointDir)
  consumeStreamingContext(ssc)
  ssc
}

def consumeStreamingContext(ssc: StreamingContext) = {
  //... create dstreams
  val dstream = KinesisUtil.createStream(....
  ...

  dstream.checkpoint(batchInterval)

  dstream
    .foreachRDD(process)
}

def process(events: RDD[Event]) = {
  if (!events.isEmpty()) {
    logger.info("Transforming events for processing")
    //rdd seems to support some operations? 
    logger.info(s"RDD LENGTH: ${events.count}")
    //nullpointer exception on call to .map
    val df = events.map(e => {
      ...
    }

  }
}

编辑:更新通知我正在使用 Kinesis 并且 WAL 已启用。S3 是否支持 WAL 检查点?我正在阅读其他没有得到很好支持的地方。 https://issues.apache.org/jira/browse/SPARK-9215

编辑:我在 HDFS 上遇到了类似的结果。

4

2 回答 2

1

我遇到了类似的问题——让我先解释一下我的问题,然后再解释一下我是如何解决它的。

问题陈述:使用火花流处理 Kinesis 数据。当 spark 流在 kinesis 之上工作时,我们仍然会得到一个非结构化流 (DStream),而不是我们在听 Kafka 时得到的结构化流。

问题:将 RDD 转换为 DF 或 DataSet 时出现空指针异常。下面是有问题的代码:

def processData(spark: SparkSession, jobArgs: JobArgs, kinesisStream:ReceiverInputDStream[Array[Byte]]):Unit={
    val filenamesRDD = kinesisStream.map(decodeKinesisMessage)
    // Import spark implicits which add encoders for case classes.
    import spark.implicits._
    val events = filenamesRDD.flatMap(filenameToEvents(new AmazonS3Client))
    events.foreachRDD(rdd => {
      spark.createDataset(rdd)
        .write
        .partitionBy("date") // TODO add hour
        .mode(SaveMode.Append.name())
        .parquet(jobArgs.outputPath)
    })
}

问题是什么:此代码在检查点目录不存在时有效,但在检查点目录存在时因空指针异常而失败。

为什么:我的理论是它试图通过反序列化获取 SQLContext 和其他对象,但它们不可用。

我是如何解决这个问题的:通过在将 rdd 转换为数据集之前再次构建 SQLContext。请参阅以下代码:

def processData(spark: SparkSession, kinesisStream: ReceiverInputDStream[Array[Byte]]): Unit = {
    val filenamesRDD = kinesisStream.map(decodeKinesisMessage)
    // Import spark implicits which add encoders for case classes.
    import spark.implicits._
    val events = filenamesRDD.flatMap(filenameToEvents(new AmazonS3Client))

    events.foreachRDD(rdd => {
      val sqlContext = SparkSession.builder().getOrCreate().sqlContext
      import sqlContext.implicits._

      val outputPath: String = sqlContext.sparkSession.conf.get("output.path")
      sqlContext.createDataset(rdd)
        .write
        .partitionBy("date") // TODO add hour
        .mode(SaveMode.Append.name())
        .parquet(outputPath)
    })
}

让我知道它是否有帮助。

谢谢,侯赛因·博拉

于 2017-07-25T20:10:24.947 回答
0

rdd.checkpoint解决方案是在 foreach 中的每次转换后调用。每个 RDD 转换都必须设置检查点。

于 2017-05-16T18:36:48.570 回答