我在带有检查点的 Spark Structured Streaming 应用程序中使用 KafkaSource 收到错误“偏移量已从 X 更改为 0,某些数据可能已丢失”,但它似乎实际上并没有引起任何问题。我试图弄清楚错误的实际含义。
我的设置如下。
我在 docker 容器中运行了 Kafka(0.10.1.0),并在 /tmp/kafka-logs 上安装了一个命名卷,以便在重新启动之间保留日志。
我在另一个 docker 容器中有一个 Spark Structured Streaming (2.1.1) 应用程序。流使用来自 Kafka 的数据。他们还在重新安装在命名卷中的位置使用检查点,以确保元数据在重新启动之间保持不变。
我使用实现ForeachWriter接口的自定义接收器,这意味着我必须实现自己的已处理版本日志,以便在一切重新启动时,我可以告诉 Spark Streaming 不要重新处理已处理的内容。
所有这一切都运作良好,数据从 Kafka 正确使用,我的自定义接收器正确处理它。
现在如果我杀死Spark Streaming应用程序,让Kafka中的数据堆积然后重新启动Spark Streaming,它会抛出以下错误,表明Kafka中的某些数据不再可用
ERROR StreamExecution: Query [id = cd2b69e1-2f24-439a-bebc-89e343df83a8, runId = d4b3ae65-8cfa-4713-912c-404623710048] terminated with error
Java.lang.IllegalStateException: Partition input.clientes-0's offset
was changed from 908 to 0, some data may have been missed.
Some data may have been lost because they are not available in Kafka
any more; either the data was aged out by Kafka or the topic may have
been deleted before all the data in the topic was processed. If you
don't want your streaming query to fail on such cases, set the source
option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:452)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:448)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:447)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)
但是在抛出错误之后,我看到我的流正常启动。Spark Streaming 正确地将堆积在 Kafka 中的数据推送到我的自定义接收器,并具有预期的版本。然后我的接收器继续并正确处理新数据。
因此,该错误表明某些数据在 Kafka 中不再可用,但仍设法被 Spark Streaming 正确使用。
如果我重新启动 Spark Streaming 应用程序,即使没有数据被推送到 Kafka,我也会再次收到相同的错误。如果我开始将新数据推送到 Kafka,系统将继续正确处理它。
有人知道这里会发生什么吗?我是否错误地解释了错误?