我是 SPARK 的新手。我正在运行在 hdfs 位置生成 6000 个零件文件的 SPARK-SQL 代码。在这 6000 个部分文件中,大约 1500 个文件的大小约为 100 MB,而其他文件的大小要小得多,小于 100 MB(有些约为 30MB,大多数以 kb 为单位)。我觉得这是不平衡的大小分布使我的代码变慢,并且由于这些执行程序的高负载,一些执行程序正在丢失。有什么办法可以平衡 executor 的负载,避免 executor 丢失?
3 回答
这是由于数据密钥的分布。
据我所知,您只能自己手动对密钥进行负载平衡。
一种可能的解决方案是运行一个示例作业来对您的键的基数进行采样,然后构建一个分区表(通过负载平衡算法,您可以搜索它)并将其传递给您的自定义分区器。
我自己也遇到过很多次这样的问题。我称之为“不公平分工”问题。
您没有包含有关该应用程序的许多详细信息。我遇到的问题之一是 gzip 文本输入文件的大小根本不均匀。Gzip 文件无法拆分,因此每个文件最终都是一个分区。解决这种情况相当简单:只需调用“repartition”以在读取数据后均匀地重新排列数据 ---“sc.textFile(input).repartition(10000)”。
有时问题是某些键比其他键更受欢迎。除了回到绘图板重新考虑另一种键入数据的方法之外,这里没有什么可以做的。
当使用“sortByKey”时,Spark 通过采集数据样本“猜测”一个好的范围分区方案。它最终可能会非常错误,导致分区大小非常不均匀。我在这种情况下使用的解决方案是提出我自己的自定义范围分区方案,并使用“repartitionAndSortWithinPartitions”调用。
所以修复可能就像调用“repartition”一样简单,或者很痛苦——编写自定义且可能很复杂的分区代码并调用“partitionBy”或其表亲。
您可以在不打乱数据的情况下重新分区数据,合并用于将 RDD 中的分区数减少到 numPartitions。
假设您有 6000 个分区从 spark 数据帧中读取您的数据应用合并与您想要的分区数
df.coalesce(5000)
并再次保存
dataFrame.write.mode(SaveMode.Overwrite).parquet(location)