2

我有一个文本文件,其中包含大量由空格分隔的随机浮点值。我正在将此文件加载到 scala 中的 RDD 中。这个RDD是如何分区的?

此外,是否有任何方法可以生成自定义分区,以便所有分区具有相同数量的元素以及每个分区的索引?

val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))

在这里,我从 HDFS 加载多个文本文件,进程是我正在调用的函数。我可以使用 mapPartitonsWithIndex 的解决方案以及如何在流程函数中访问该索引吗?Map 对分区进行洗牌。

4

3 回答 3

3

RDD 是如何分区的?

默认为每个 HDFS 分区创建一个分区,默认为 64MB。在这里阅读更多。

如何跨分区平衡我的数据?

首先,看看可以重新分区他的数据的三种方式:

1) 将第二个参数(RDD 所需的最小分区数)传递给textFile(),但要小心:

In [14]: lines = sc.textFile("data")

In [15]: lines.getNumPartitions()
Out[15]: 1000

In [16]: lines = sc.textFile("data", 500)

In [17]: lines.getNumPartitions()
Out[17]: 1434

In [18]: lines = sc.textFile("data", 5000)

In [19]: lines.getNumPartitions()
Out[19]: 5926

正如你所看到的,[16]它并没有达到预期的效果,因为 RDD 拥有的分区数已经大于我们请求的最小分区数。

2)使用repartition(),如下所示:

In [22]: lines = lines.repartition(10)

In [23]: lines.getNumPartitions()
Out[23]: 10

警告:这将调用 shuffle 并且应该在您想要增加RDD 的分区数量时使用。

文档

shuffle 是 Spark 用于重新分配数据的机制,以便跨分区以不同方式分组。这通常涉及跨执行器和机器复制数据,使洗牌成为一项复杂且成本高昂的操作。

3)使用coalesce(),如下所示:

In [25]: lines = lines.coalesce(2)

In [26]: lines.getNumPartitions()
Out[26]: 2

在这里,Spark 知道您将缩小 RDD 并利用它。阅读有关repartition() 与 coalesce()的更多信息。


但是,这一切能否保证您的数据将在您的分区之间完美平衡?并非如此,正如我在如何平衡分区间的数据?

于 2016-08-19T18:45:59.933 回答
2

加载的 rdd 被默认分区器分区:哈希码。要指定自定义分区器,请使用您自己的分区器提供的 rdd.partitionBy()。

我不认为在这里使用 coalesce() 是可以的,因为根据 api 文档,只有在减少分区数量时才能使用 coalesce(),甚至我们不能使用 coalesce() 指定自定义分区器。

于 2015-09-04T04:30:33.643 回答
1

您可以使用 coalesce 函数生成自定义分区:

coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]
于 2014-07-10T09:30:32.170 回答