我在这里回答了一个关于 SO 的问题,作为一个勤奋的人,我查看了物理计划。
编码:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val df = sc.parallelize(Seq( (1.0, 2.0, 1), (0.0, -1.0, 1), (3.0, 4.0, 1), (6.0, -2.3, 4))).toDF("x", "y", "z")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
dfZippedWithId.show(false)
dfZippedWithId.printSchema()
val res = dfZippedWithId.as("dfZ1").join(dfZippedWithId.as("dfZ2"), $"dfZ1.z" === $"dfZ2.z" &&
$"dfZ1.rowid" === $"dfZ2.rowid" -1
,"inner")
.withColumn("newx", $"dfZ2.x" - $"dfZ1.x").explain(true)
物理计划:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [x#1223, y#1224, z#1225, rowid#1226L, x#1248, y#1249, z#1250, rowid#1251L, (x#1248 - x#1223) AS newx#1268]
+- SortMergeJoin [z#1225, rowid#1226L], [z#1250, (rowid#1251L - 1)], Inner
:- Sort [z#1225 ASC NULLS FIRST, rowid#1226L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(z#1225, rowid#1226L, 200), ENSURE_REQUIREMENTS, [id=#1761]
: +- Scan ExistingRDD[x#1223,y#1224,z#1225,rowid#1226L]
+- Sort [z#1250 ASC NULLS FIRST, (rowid#1251L - 1) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200), ENSURE_REQUIREMENTS, [id=#1762]
+- Scan ExistingRDD[x#1248,y#1249,z#1250,rowid#1251L]
以我的思维方式:
- 应该只复制或重复使用 DF
- 并且应该在同一分区内检查 rowid -1。
不知道如何解释这部分:
Exchange hashpartitioning(z#1250, (rowid#1251L - 1), 200)
好的,不是大规模的,但似乎已经为(rowid#1251L - 1)
恕我直言决定了一些不必要的聪明才智。除非排序以某种方式补偿,但我并不相信。谁能说出优化器想法的真正含义?我想我可以猜到,但这似乎是多余的。