3

我在我的 spark DF 上使用全局排序,当我启用 AQE 和 post-shuffle 合并时,排序操作后的分区变得比以前更糟糕。

    "spark.sql.adaptive.enabled" -> "true",
    "spark.sql.adaptive.coalescePartitions.enabled" -> "true",
    "spark.sql.adaptive.advisoryPartitionSizeInBytes" -> "256mb",
    "spark.sql.adaptive.coalescePartitions.minPartitionNum" -> "1",
    "spark.sql.adaptive.coalescePartitions.initialPartitionNum" -> "20000"

我的查询,在高层次上,看起来:

.readFromKafka
.deserializeJsonToRow
.cache
.sort(const_part, column which can cause skew, some salt columns)
.writeToS3
  1. 可能导致倾斜的列->是的,我的数据分布不均,这就是我使用盐的原因。
  2. 我从 Kafka 读取数据,所以我使用 Kafka 分区 + 偏移列作为盐。
  3. 为什么在后台使用 reprtitoinByRange 的排序对我没有帮助,我想启用 AQE?-> 现在我看到我的 Kafka 消息的大小差异可能太大。因此,我看到范围重新分区后的分区具有几乎相同数量的记录,但字节数仍然非常不均匀。
  4. 为什么我认为 AQE 必须帮助我?-> 我想创建许多小范围,即使我的数据偏差不会超过~50mb,所以后洗牌合并将能够将它们合并到目标大小(256mb)。在我的情况下,最高 320mb 是可以的。

我的第一个假设是,即使范围很小,峰值也会太大。但我检查并确认按范围重新分区给了我良好的记录分布,但不好的是大小。我有近 200 个分区,记录数量几乎相同,大小差异高达 9 倍,从 ~100Mb 到 ~900mb。但是通过 AEQ 和重新分区到 18000 个小范围,最小的分区是 18mib,最大的分区是 1.8Gib。这种情况比没有 AEQ 的情况要糟糕得多。需要强调的是,我使用 Spark UI -> Details for Stage 选项卡中的指标来确定分区大小(以字节为单位),并且我有自己的记录日志。

所以我开始调试问题,但是 AQE 的输入和输出没有足够的日志 ShufflePartitionsUtil.coalescePartitions。这就是我将查询重写为 repartitionByRange.sortWithingPartitoins 的原因。并通过额外的日志记录进行物理计划优化。我的日志告诉我,我最初的想法是正确的。

  • map 和 write shuffle 阶段之后的输入分区被拆分为足够小
  • Coalesce 算法将它们收集到一个正确的数量,并且分布在字节分区中。
Input shuffleId:2 partitions:17999
Max partition size :27362117
Min partition size :8758435

Number of shuffle stages to coalesce 1
Reduce number of partitions from 17999 to 188
Output partition  maxsize :312832323
Output partition min size :103832323

最小大小是如此不同,因为最后一个分区的大小,这是预期的。TRACE 日志级别显示 99% 的分区接近 290mib。

  • 但是为什么 spark UI 显示出如此不同的结果呢?->

  • spark UI 可能有问题吗?->

  • 也许吧,但除了任务大小,一个任务的持续时间也太大了,这让我觉得 spark UI 还可以。

  • 所以我的假设是问题出MapOutputStatistics在我的阶段。但它总是坏掉还是只在我的情况下?->

  • 仅在我的情况下?-> 我做了一些检查以确认它。

    • 我从 s3(块大小为 120mb 的镶木地板文件)-> 中读取了相同的数据集,并且 AQE 按预期工作。洗牌后合并返回给我 188,按大小、分区很好地分布。重要的是要注意 s3 上的数据分布不均,但在读取过程中的 spark 将其拆分为 259 个接近 120mb 大小的分区,主要是因为 parquet 块大小为 120mb。
    • 我从 Kafka 中读取了相同的数据集,但从分区函数中排除了具有偏斜的列 -> 并且 AQE 按预期工作。洗牌后合并返回给我 203,按大小、分区很好地分布。
    • 我尝试禁用缓存-> 这没有任何结果。我使用缓存,只是为了避免从 kafka 重复读取。因为按范围重新分区使用采样。
    • 我尝试禁用 AQE 并将 18000 个分区写入 s3 -> 结果是预期的,与我的合并输入日志显示的相同:17999 个文件,最小的接近 8mib,最大的 56mib。
  • 所有这些检查让我认为这MapOutputStatistics仅对我的情况是错误的。可能是如何与 Kafka 源相关联或我的 Kafka 输入数据分布非常不均匀的问题。

问题:

  • 那么有人知道我做错了什么吗?
  • 在我的情况下,我可以用输入数据做些什么来使后洗牌合并工作?
  • 如果你认为我是对的,请发表评论。

PS 我还想提一下,我的输入 Kafka 数据帧是 2160,甚至不是分布式分区 -> 某些分区可以比其他分区大 2 倍。minPartitions从具有 720 个分区和选项 * 3的 Kafka 主题中读取。

4

0 回答 0