当从检查点恢复失败的作业时,应用程序逻辑被正确调用并且 RDD 被重新实例化,但是对 RDD.map 的调用会导致 NullPointerException。
lazy val ssc = StreamingContext.getOrCreate(checkpointDir, createStreamingContext _)
private def createStreamingContext: StreamingContext = {
val ssc = new StreamingContext(spark.sparkContext, batchInterval)
ssc.checkpoint(checkpointDir)
consumeStreamingContext(ssc)
ssc
}
def consumeStreamingContext(ssc: StreamingContext) = {
//... create dstreams
val dstream = KinesisUtil.createStream(....
...
dstream.checkpoint(batchInterval)
dstream
.foreachRDD(process)
}
def process(events: RDD[Event]) = {
if (!events.isEmpty()) {
logger.info("Transforming events for processing")
//rdd seems to support some operations?
logger.info(s"RDD LENGTH: ${events.count}")
//nullpointer exception on call to .map
val df = events.map(e => {
...
}
}
}
编辑:更新通知我正在使用 Kinesis 并且 WAL 已启用。S3 是否支持 WAL 检查点?我正在阅读其他没有得到很好支持的地方。 https://issues.apache.org/jira/browse/SPARK-9215
编辑:我在 HDFS 上遇到了类似的结果。