7

我正在从 HDFS 检查点(例如,ConstantInputDSTream)恢复一个流,但我不断得到SparkException: <X> has not been initialized.

从检查点恢复时我需要做一些具体的事情吗?

我可以看到它想要DStream.zeroTime设置,但是当流恢复时zeroTimenull. 由于它是私有成员 IDK,它可能无法恢复。我可以看到StreamingContext恢复的流所引用的确实具有zeroTime.

initialize是一个私有方法并且被调用StreamingContext.graph.start而不是被调用StreamingContext.graph.restart,大概是因为它期望zeroTime被持久化。

有人有一个从检查点恢复并具有非空值的 Stream 示例zeroTime吗?

def createStreamingContext(): StreamingContext = {
    val ssc = new StreamingContext(sparkConf, Duration(1000))
    ssc.checkpoint(checkpointDir)
    ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir), createStreamingContext)

val socketStream = ssc.socketTextStream(...)
socketStream.checkpoint(Seconds(1))
socketStream.foreachRDD(...)
4

3 回答 3

12

问题是我在从检查点重新创建 StreamingContext 之后创建了 dstream,即在StreamingContext.getOrCreate. 创建 dstream 和所有转换都应该在createStreamingContext.

当从检查点恢复 StreamingContext 并随后创建 dstream 时,该问题被填充为[SPARK-13​​316] "SparkException: DStream has not been initialized" 。

于 2016-02-05T11:27:04.587 回答
1

当您尝试将相同的检查点目录用于 2 个不同的火花流作业时,也可能会发生此异常。在这种情况下,你也会得到这个例外。

尝试为每个 spark 作业使用唯一的检查点目录。

于 2017-11-24T07:33:33.407 回答
0

错误 StreamingContext:错误启动上下文,将其标记为已停止 org.apache.spark.spark.SparkException:org.apache.spark.streaming.dstream.FlatMappedDStream@6c17c0f8 尚未在 org.apache.spark.streaming.dstream.DStream 初始化。 isTimeValid(DStream.scala:313) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) at org.apache.spark.streaming.dstream.DStream$$anonfun $getOrCompute$1.apply(DStream.scala:334) 在 scala.Option.orElse(Option.scala:289)


上述错误是由于我还有另一个 Spark Job 写入相同的检查点目录。即使另一个 spark 作业没有运行,它已写入 checkpointdir,新的 Spark 作业也无法配置 StreamingContext。

我删除了checkpointdir的内容,重新提交了Spark Job,问题就解决了。

或者,您可以为每个 Spark 作业使用单独的 checkpointdir,以保持简单。

于 2018-05-11T00:32:02.117 回答