1

对于以下代码片段:

case class SomeRow(key: String, value: String)

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val ds1 = Seq(SomeRow("A", "1")).toDS().repartition(col("key"))
val ds2 = Seq(SomeRow("A", "1"), SomeRow("B","2")).toDS().repartition(col("key"))

val dataSetJoined = ds1.joinWith(ds2, ds1("key")===ds2("key"), "left")
val dataFrameJoined = ds1.join(ds2, ds1("key")===ds2("key"), "left")
dataSetJoined.explain(true)
dataFrameJoined.explain(true)

Spark 为数据集生成以下计划:

== Physical Plan ==
SortMergeJoin [_1#132.key], [_2#133.key], LeftOuter
:- *(2) Sort [_1#132.key ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(_1#132.key, 2)
:     +- *(1) Project [named_struct(key, key#122, value, value#123) AS _1#132]
:        +- Exchange hashpartitioning(key#122, 2)
:           +- LocalTableScan [key#122, value#123]
+- *(4) Sort [_2#133.key ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(_2#133.key, 2)
      +- *(3) Project [named_struct(key, key#128, value, value#129) AS _2#133]
         +- Exchange hashpartitioning(key#128, 2)
            +- LocalTableScan [key#128, value#129]

对于数据框:

== Physical Plan ==
SortMergeJoin [key#122], [key#128], LeftOuter
:- *(1) Sort [key#122 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(key#122, 2)
:     +- LocalTableScan [key#122, value#123]
+- *(2) Sort [key#128 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(key#128, 2)
      +- LocalTableScan [key#128, value#129]

使用joinWith连接两个数据集时是否可以避免另一个相同的交换?

4

0 回答 0