0

我在这里回答了一个关于 SO 的问题,作为一个勤奋的人,我查看了物理计划。

编码:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq( (1.0, 2.0, 1), (0.0, -1.0, 1), (3.0, 4.0, 1), (6.0, -2.3, 4))).toDF("x", "y", "z")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

dfZippedWithId.show(false)
dfZippedWithId.printSchema()

val res = dfZippedWithId.as("dfZ1").join(dfZippedWithId.as("dfZ2"), $"dfZ1.z" ===  $"dfZ2.z" &&
                                                                    $"dfZ1.rowid" ===  $"dfZ2.rowid" -1
                                                                   ,"inner")
                        .withColumn("newx", $"dfZ2.x" - $"dfZ1.x").explain(true) 

物理计划:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [x#1223, y#1224, z#1225, rowid#1226L, x#1248, y#1249, z#1250, rowid#1251L, (x#1248 - x#1223) AS newx#1268]
   +- SortMergeJoin [z#1225, rowid#1226L], [z#1250, (rowid#1251L - 1)], Inner
      :- Sort [z#1225 ASC NULLS FIRST, rowid#1226L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(z#1225, rowid#1226L, 200), ENSURE_REQUIREMENTS, [id=#1761]
      :     +- Scan ExistingRDD[x#1223,y#1224,z#1225,rowid#1226L]
      +- Sort [z#1250 ASC NULLS FIRST, (rowid#1251L - 1) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200), ENSURE_REQUIREMENTS, [id=#1762]
            +- Scan ExistingRDD[x#1248,y#1249,z#1250,rowid#1251L]

以我的思维方式:

  1. 应该只复制或重复使用 DF
  2. 并且应该在同一分区内检查 rowid -1。

不知道如何解释这部分:

Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200)

好的,不是大规模的,但似乎已经为(rowid#1251L - 1)恕我直言决定了一些不必要的聪明才智。除非排序以某种方式补偿,但我并不相信。谁能说出优化器想法的真正含义?我想我可以猜到,但这似乎是多余的。

4

0 回答 0