4

我的 Spark Structured Streaming 应用程序运行了几个小时,然后出现此错误而失败

java.lang.IllegalStateException: Partition [partition-name] offset was changed from 361037 to 355053, some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".

当然,偏移量每次都不同,但第一个总是大于第二个。主题数据不能过期,因为主题的保留期是 5 天,我昨天重新创建了这个主题,但今天又出现错误。从中恢复的唯一方法是删除检查点。

Spark 的 Kafka 集成指南failOnDataLoss在选项下提到:

当数据可能丢失(例如,主题被删除或偏移量超出范围)时是否使查询失败。这可能是一个误报。当它不能按预期工作时,您可以禁用它。如果由于丢失数据而无法从提供的偏移量中读取任何数据,批量查询将始终失败。

但是我找不到任何关于何时可以将其视为误报的更多信息,所以我不知道设置是否安全,failOnDataLoss或者false我的集群是否存在实际问题(在这种情况下,我们实际上会失败数据)。

更新:我调查了 Kafka 日志,在 Spark 失败的所有情况下,Kafka 都记录了几条这样的消息(我假设每个 Spark 消费者都有一条消息):

INFO [GroupCoordinator 1]: Preparing to rebalance group spark-kafka-...-driver-0 with old generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
4

2 回答 2

0

This seems to be a known bug in older versions of Spark and the spark-sql-kafka library.

I find the following JIRA tickets relevant:

  • SPARK-28641: MicroBatchExecution committed offsets greater than available offsets
  • SPARK-26267: Kafka source may reprocess data
  • KAFKA-7703: KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

In short, citing the developers who worked on it:

"this is a know issue in Kafka, please see KAFKA-7703. This is fixed in 2.4.1 and 3.0.0 in SPARK-26267. Please upgrade Spark to higher versions. The other possibility is to upgrade Kafka to 2.3.0 where the Kafka side is fixed."

"KAFKA-7703 only exists in Kafka 1.1.0 and above, so a possible workaround is using an old version that doesn't have this issue. This doesn't impact Spark 2.3.x and below as we use Kafka 0.10.0.1 by default."

In our case, we faced the same issue on our HDP 3.1 platform. There we have Spark 2.3.2 and the spark-sql-kafka library (https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11/2.3.2.3.1.0.0-78), however, uses kafka-clients 2.0.0. Which means we face this bug due to subsequent conditions:

  • Our Spark < 2.4.1
  • 1.1.0 < Our Kafka < 2.3.0

Work-around solution

We were able to solve this issue by removing the checkpoint file in the "offset" sub-folder of the batch number that contains the 0 offset.

When deleting this one file make sure that the batch number in the checkpoint files in sub-folders "commits" and "offset" still match after the deletion.

于 2020-12-22T06:32:32.640 回答
0

我不再有这个问题了。我做了两个改变:

  1. 禁用 YARN 的动态资源分配(这意味着我必须手动计算最佳执行器数量等并将它们传递给spark-submit
  2. 升级到 Spark 2.4.0,同时也将 Kafka 客户端从 0.10.0.1 升级到 2.0.0

禁用动态资源分配意味着执行程序(=消费者)不会在应用程序运行时创建和终止,从而无需重新平衡。所以这很可能是阻止错误发生的原因。

于 2019-06-03T05:12:01.137 回答