我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后的分区变得比以前更糟糕。
"spark.sql.adaptive.enabled" -> "true",
"spark.sql.adaptive.coalescePartitions.enabled" -> "true",
"spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
"spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
"spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"
我的查询,在高层次上,看起来:
.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
- 可能导致倾斜的列->是的,我的数据分布不均,这就是我使用盐的原因。
- 我从 Kafka 读取数据,所以我使用 Kafka 分区 + 偏移列作为盐。
- 为什么在后台使用 reprtitoinByRange 的排序对我没有帮助,我想启用 AQE?-> 现在我看到我的 Kafka 消息的大小差异可能太大。因此,我看到范围重新分区后的分区具有几乎相同数量的记录,但字节数仍然非常不均匀。
- 为什么我认为 AQE 必须帮助我?-> 我想创建许多小范围,即使我的数据偏差不会超过~50mb,所以后洗牌合并将能够将它们合并到目标大小(256mb)。在我的情况下,最高 320mb 是可以的。
我的第一个假设是,即使范围很小,峰值也会太大。但我检查并确认按范围重新分区给了我良好的记录分布,但不好的是大小。我有近 200 个分区,记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。但是通过 AEQ 和重新分区到 18000 个小范围,最小的分区是 18mib,最大的分区是 1.8Gib。这种情况比没有 AEQ 的情况要糟糕得多。需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来确定分区大小(以字节为单位),并且我有自己的记录日志。
所以我开始调试问题,但是 AQE 的输入和输出没有足够的日志
ShufflePartitionsUtil.coalescePartitions
。这就是我将查询重写为 repartitionByRange.sortWithingPartitoins 的原因。并通过额外的日志记录进行物理计划优化。我的日志告诉我,我最初的想法是正确的。
- map 和 write shuffle 阶段之后的输入分区被拆分为足够小
- Coalesce 算法将它们收集到一个正确的数量,并且分布在字节分区中。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435
和
Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition maxsize :312832323
Output partition min size :103832323
最小大小是如此不同,因为最后一个分区的大小,这是预期的。TRACE 日志级别显示 99% 的分区接近 290mib。
但是为什么 spark UI 显示出如此不同的结果呢?->
spark UI 可能有问题吗?->
也许吧,但除了任务大小,一个任务的持续时间也太大了,这让我觉得 spark UI 还可以。
所以我的假设是问题出
MapOutputStatistics
在我的阶段。但它总是坏掉还是只在我的情况下?->仅在我的情况下?-> 我做了一些检查以确认它。
-
- 我从 s3(块大小为 120mb 的镶木地板文件)-> 中读取了相同的数据集,并且 AQE 按预期工作。洗牌后合并返回给我 188,按大小、分区很好地分布。重要的是要注意 s3 上的数据分布不均,但在读取过程中的 spark 将其拆分为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
-
- 我从 Kafka 中读取了相同的数据集,但从分区函数中排除了具有偏斜的列 -> 并且 AQE 按预期工作。洗牌后合并返回给我 203,按大小、分区很好地分布。
-
- 我尝试禁用缓存-> 这没有任何结果。我使用缓存,只是为了避免从 kafka 重复读取。因为按范围重新分区使用采样。
-
- 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果是预期的,与我的合并输入日志显示的相同:17999 个文件,最小的接近 8mib,最大的 56mib。
所有这些检查让我认为这
MapOutputStatistics
仅对我的情况是错误的。可能是如何与 Kafka 源相关联或我的 Kafka 输入数据分布非常不均匀的问题。
问题:
- 那么有人知道我做错了什么吗?
- 在我的情况下,我可以用输入数据做些什么来使后洗牌合并工作?
- 如果你认为我是对的,请发表评论。
PS 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 某些分区可以比其他分区大 2 倍。minPartitions
从具有 720 个分区和选项 * 3的 Kafka 主题中读取。