RDD 是如何分区的?
默认为每个 HDFS 分区创建一个分区,默认为 64MB。在这里阅读更多。
如何跨分区平衡我的数据?
首先,看看可以重新分区他的数据的三种方式:
1) 将第二个参数(RDD 所需的最小分区数)传递给textFile(),但要小心:
In [14]: lines = sc.textFile("data")
In [15]: lines.getNumPartitions()
Out[15]: 1000
In [16]: lines = sc.textFile("data", 500)
In [17]: lines.getNumPartitions()
Out[17]: 1434
In [18]: lines = sc.textFile("data", 5000)
In [19]: lines.getNumPartitions()
Out[19]: 5926
正如你所看到的,[16]
它并没有达到预期的效果,因为 RDD 拥有的分区数已经大于我们请求的最小分区数。
2)使用repartition(),如下所示:
In [22]: lines = lines.repartition(10)
In [23]: lines.getNumPartitions()
Out[23]: 10
警告:这将调用 shuffle 并且应该在您想要增加RDD 的分区数量时使用。
从文档:
shuffle 是 Spark 用于重新分配数据的机制,以便跨分区以不同方式分组。这通常涉及跨执行器和机器复制数据,使洗牌成为一项复杂且成本高昂的操作。
3)使用coalesce(),如下所示:
In [25]: lines = lines.coalesce(2)
In [26]: lines.getNumPartitions()
Out[26]: 2
在这里,Spark 知道您将缩小 RDD 并利用它。阅读有关repartition() 与 coalesce()的更多信息。
但是,这一切能否保证您的数据将在您的分区之间完美平衡?并非如此,正如我在如何平衡分区间的数据?