13

In Spark Streaming it is possible (and mandatory if you're going to use stateful operations) to set the StreamingContext to perform checkpoints into a reliable data storage (S3, HDFS, ...) of (AND):

  • Metadata
  • DStream lineage

As described here, to set the output data storage you need to call yourSparkStreamingCtx.checkpoint(datastoreURL)

On the other hand, it is possible to set lineage checkpoint intervals for each DataStream by just calling checkpoint(timeInterval) at them. In fact, it is recommended to set lineage checkpoint interval between 5 and 10 times the DataStream's sliding interval:

dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

My question is:

When the streaming context has been set to perform checkpointing and no ds.checkpoint(interval) is called, is lineage checkpointing enabled for all data streams with a default checkpointInterval equal to batchInterval? Or is, on the contrary, only metadata checkpointing what is enabled?

4

1 回答 1

11

检查Spark代码(v1.5)我发现DStreams'检查点在两种情况下启用:

通过显式调用他们的checkpoint方法(不是StreamContext's):

/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration): DStream[T] = {
    if (isInitialized) {
        throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of an DStream after streaming context has started")
    }
    persist()
    checkpointDuration = interval
    this
}

DStream初始化时,只要具体的“DStream”子类具有覆盖mustCheckpoint属性(将其设置为true):

 private[streaming] def initialize(time: Time) {
  ...
  ...   
   // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
   if (mustCheckpoint && checkpointDuration == null) {
     checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
     logInfo("Checkpoint interval automatically set to " + checkpointDuration)
   }
  ...

第一种情况很明显。对 Spark Streaming 代码进行简单分析:

grep "val mustCheckpoint = true" $(find -type f -name "*.scala")

> ./org/apache/spark/streaming/api/python/PythonDStream.scala:  override     val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala:  override val mustCheckpoint = true
>./org/apache/spark/streaming/dstream/StateDStream.scala:  override val mustCheckpoint = true

我可以发现,一般来说(忽略PythonDStream),检查点只为和实例StreamingContext启用沿袭检查点。这些实例是转换的结果(分别为 AND):StateDStreamReducedWindowedDStream

  • updateStateByKey:即通过多个窗口提供状态的流。
  • reduceByKeyAndWindow
于 2016-01-01T16:55:26.173 回答