3

我在带有检查点的 Spark Structured Streaming 应用程序中使用 KafkaSource 收到错误“偏移量已从 X 更改为 0,某些数据可能已丢失”,但它似乎实际上并没有引起任何问题。我试图弄清楚错误的实际含义。

我的设置如下。

  • 我在 docker 容器中运行了 Kafka(0.10.1.0),并在 /tmp/kafka-logs 上安装了一个命名卷,以便在重新启动之间保留日志。

  • 我在另一个 docker 容器中有一个 Spark Structured Streaming (2.1.1) 应用程序。流使用来自 Kafka 的数据。他们还在重新安装在命名卷中的位置使用检查点,以确保元数据在重新启动之间保持不变。

  • 我使用实现ForeachWriter接口的自定义接收器,这意味着我必须实现自己的已处理版本日志,以便在一切重新启动时,我可以告诉 Spark Streaming 不要重新处理已处理的内容。

所有这一切都运作良好,数据从 Kafka 正确使用,我的自定义接收器正确处理它。

现在如果我杀死Spark Streaming应用程序,让Kafka中的数据堆积然后重新启动Spark Streaming,它会抛出以下错误,表明Kafka中的某些数据不再可用

ERROR StreamExecution: Query [id = cd2b69e1-2f24-439a-bebc-89e343df83a8, runId = d4b3ae65-8cfa-4713-912c-404623710048] terminated with error

Java.lang.IllegalStateException: Partition input.clientes-0's offset
 was changed from 908 to 0, some data may have been missed.

Some data may have been lost because they are not available in Kafka
 any more; either the data was aged out by Kafka or the topic may have 
 been deleted before all the data in the topic was processed. If you 
 don't want your streaming query to fail on such cases, set the source 
 option "failOnDataLoss" to "false".

at org.apache.spark.sql.kafka010.KafkaSource.org$apache$spark$sql$kafka010$KafkaSource$$reportDataLoss(KafkaSource.scala:329)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:283)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$8.apply(KafkaSource.scala:281)
at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:281)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:452)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:448)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:448)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:447)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)

但是在抛出错误之后,我看到我的流正常启动。Spark Streaming 正确地将堆积在 Kafka 中的数据推送到我的自定义接收器,并具有预期的版本。然后我的接收器继续并正确处理新数据。

因此,该错误表明某些数据在 Kafka 中不再可用,但仍设法被 Spark Streaming 正确使用。

如果我重新启动 Spark Streaming 应用程序,即使没有数据被推送到 Kafka,我也会再次收到相同的错误。如果我开始将新数据推送到 Kafka,系统将继续正确处理它。

有人知道这里会发生什么吗?我是否错误地解释了错误?

4

2 回答 2

1

/tmp/kafka-logs是 Kafka 的日志目录,其中存储了所有偏移量、主题信息。如果损坏或删除了某些数据,您需要failOnDataLoss:false在您的 Kafka 选项中设置选项SparkProcessContext并重新启动 Spark Job。

Option  : failOnDataLoss
Value   : true or false
Default : TRUE

含义:当数据可能丢失(例如,主题被删除或偏移超出范围)时是否使查询失败。这可能是一个误报。当它不能按预期工作时,您可以禁用它。

于 2018-04-05T06:40:11.970 回答
0

这似乎是旧版本的 kafka-clients 库中的一个已知错误。

SPARK-26267的描述说

“由于KAFKA-7703- KafkaConsumer.position 可能在调用“seekToEnd”后返回错误的偏移量,当 Kafka 源尝试获取最新的偏移量时,它可能会获得最早的偏移量,然后它会重新处理已处理的消息当它在下一批中获得正确的最新偏移量时。”

总而言之,引用从事它的开发人员:

“这是 Kafka 中的一个已知问题,请参阅 KAFKA-7703。这已在 SPARK-26267 中的 2.4.1 和 3.0.0 中修复。请将 Spark 升级到更高版本。另一种可能性是将 Kafka 升级到 2.3.0,其中卡夫卡方面是固定的。”

“KAFKA-7703 仅存在于 Kafka 1.1.0 及更高版本中,因此可能的解决方法是使用没有此问题的旧版本。这不会影响 Spark 2.3.x 及更低版本,因为我们使用 Kafka 0.10.0.1默认。”

在我们的案例中,我们在 HDP 3.1 平台上遇到了同样的问题。我们有 Spark 2.3.2 和 spark-sql-kafka 库(https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.3.2.3.1.0。 0-78),但是,使用 kafka-clients 2.0.0。这意味着我们由于后续条件而面临这个错误:

  • 我们的火花 < 2.4.1
  • 1.1.0 < 我们的卡夫卡 < 2.3.0

可能的解决方法

我们能够通过删除包含偏移量的批号的“偏移”子文件夹中的检查点文件来“解决”这个问题0

删除此文件时,请确保子文件夹“commits”和“offset”中的检查点文件中的批号在删除后仍然匹配。

到目前为止,这不是一个理想的解决方案,尤其是对于生产环境。但是,如果您不能轻松更新库,它可能会有所帮助。

于 2021-01-13T14:26:36.690 回答