2

我试图理解 和 之间的coalesce()区别repartition()

如果我正确理解了这个答案coalesce()只能减少数据帧的分区数,如果我们尝试增加分区数,则分区数保持不变。

但是当我尝试执行下面的代码时,我观察到了两件事

  1. 对于具有合并的分区数的 Dataframe 可以增加
  2. 对于 Rdd,如果 shuffle = false 则分区数不能随着合并而增加。

这是否意味着可以增加合并数据框的分区?

将合并应用于数据框

当我执行以下代码时:

val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
println("Original dataframe partitions = " + h1b1Df.rdd.getNumPartitions)

val coalescedDf = h1b1Df.coalesce(2)
println("Coalesced dataframe partitions = " + coalescedDf.rdd.getNumPartitions

val coalescedDf1 = coalescedDf.coalesce(6) 
println("Coalesced dataframe with increased partitions = " + coalescedDf1.rdd.getNumPartitions) 

我得到以下输出

Original dataframe partitions =  8
Coalesced dataframe partitions = 2
Coalesced dataframe with increased partitions = 6

将合并应用于 RDD

当我执行以下代码时:

val inpRdd = h1b1Df.rdd
println("Original rdd partitions = " + inpRdd.getNumPartitions)

val coalescedRdd = inpRdd.coalesce(4)
println("Coalesced rdd partitions = " + coalescedRdd.getNumPartitions)

val coalescedRdd1 = coalescedRdd.coalesce(6, false)
println("Coalesced rdd with increased partitions = " + coalescedRdd1.getNumPartitions)

我得到以下输出:

Original rdd partitions =  8
Coalesced rdd partitions = 4
Coalesced rdd with increased partitions = 4
4

3 回答 3

1

Coalesce 可用于通过设置 shuffle=true 来增加分区,这等于重新分区。当您使用合并 shuffle=false 来增加时,不会发生数据移动。所以一个分区的数据不能移动到另一个分区。而减少它只是合并最近的分区。

谢谢,

于 2019-09-27T08:42:05.030 回答
1

数据帧的合并不能增加大于集群中核心总数的分区。

 val h1b1Df = spark.read.csv("/FileStore/tables/h1b_data.csv")
 h1b1Df.rdd.getNumPartitions        // prints 8

 val cloasedDf = h1b1Df.coalesce(21)  
 cloasedDf.rdd.getNumPartitions     // prints 8

 val cloasedDf1 = cloasedDf.coalesce(2) // prints 2
 cloasedDf1.rdd.getNumPartitions

 val cloasedDf2 = cloasedDf.coalesce(7) // prints 7
 cloasedDf2.rdd.getNumPartitions
于 2019-09-27T17:29:01.173 回答
1

如果您在同一个数据帧上应用多个合并而不在这些合并之间执行任何转换,Spark 将通过仅应用最新的合并来优化您的合并,给您的印象是您可以使用合并增加分区数量

解释

如果我们采用以下代码片段:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .master("local[8]") // use 8 threads
    .appName("test-app")
    .getOrCreate()

import spark.implicits._

val input = Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).toDF("MyCol")
val simpleCoalesce = input.coalesce(4)
val doubleCoalesce = simpleCoalesce.coalesce(8)

println(doubleCoalesce.rdd.getNumPartitions)

它将打印8. 为了更好地理解为什么我们得到这个结果,我们执行一个explain(true)on doubleCoalesce

doubleCoalesce.explain(true)

我们得到以下步骤来构建最终的 spark 执行计划来解决doubleCoalesce

== Parsed Logical Plan ==
Repartition 8, false
+- Repartition 4, false
   +- Project [value#1 AS MyCol#4]
      +- LocalRelation [value#1]

...

== Physical Plan ==
Coalesce 8
+- LocalTableScan [MyCol#4]

我们可以看到,在解析的逻辑计划(即对代码片段的简单解析)和物理计划(将应用于生成最终数据帧的计划)之间,只保留了最新的合并,因此coalesce(4)转换实际上是从未申请过。因此,只coalesce(8)应用了一个,我们得到一个包含 8 个分区的数据帧。

如果我们想coalesce(4)应用,我们需要在两个合并之间进行转换,例如一个 complex select

import org.apache.spark.sql.functions.col

val separatedCoalesce = simpleCoalesce
  .select((col("MyCol") + 0).as("MyCol"))
  .coalesce(8)

println(separatedCoalesce.rdd.getNumPartitions)

注意:简单的 select as.select(col("MyCol"))将不起作用,因为 spark 将在优化阶段放弃选择

此代码打印4。如果我们看一下 的物理平面图separatedCoalesce

== Physical Plan ==
Coalesce 8
+- *(1) Project [(MyCol#4 + 0) AS MyCol#9]
   +- Coalesce 4
      +- LocalTableScan [MyCol#4]

我们可以看到,这里coalesce(4)应用了,所以最后我们得到一个只有 4 个分区的数据帧,尽管我们应用了coalesce(8)

结论

Spark 优化可能很棘手,并导致您认为没有发生的事情发生了。所以请记住,spark 并不完全执行编写的代码,而是执行它的优化版本。

于 2021-12-30T15:41:27.870 回答