我正在从 HDFS 检查点(例如,ConstantInputDSTream)恢复一个流,但我不断得到SparkException: <X> has not been initialized
.
从检查点恢复时我需要做一些具体的事情吗?
我可以看到它想要DStream.zeroTime
设置,但是当流恢复时zeroTime
是null
. 由于它是私有成员 IDK,它可能无法恢复。我可以看到StreamingContext
恢复的流所引用的确实具有zeroTime
.
initialize
是一个私有方法并且被调用StreamingContext.graph.start
而不是被调用StreamingContext.graph.restart
,大概是因为它期望zeroTime
被持久化。
有人有一个从检查点恢复并具有非空值的 Stream 示例zeroTime
吗?
def createStreamingContext(): StreamingContext = {
val ssc = new StreamingContext(sparkConf, Duration(1000))
ssc.checkpoint(checkpointDir)
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)
val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)