我正在使用 databricks 社区版笔记本学习火花。我创建了几行的示例数据。由于数据非常小,因此在查询计划中不应该有交换阶段。我也尝试过广播,但我仍然看到交换阶段。这些配置在 DB 社区版笔记本上不起作用吗?
import org.apache.spark.sql.functions.{col,regexp_replace}
val authorBook = sc.parallelize(Seq(("a1" , "b1") , ("a1","b2") , ("a2" , "b3"),("a3" , "b4")))
val schemaColumn = Seq("author","book")
val df = authorBook.toDF(schemaColumn:_*)
val bookSold = sc.parallelize(Seq(("b1",100) , ("b2" , 500) , ("b3" , 400) , ("b4" , 500)) )
val bookSchema = Seq("book" , "sold_copy")
val dfBook = bookSold.toDF(bookSchema:_*)
///val totalBookSold = df.repartition(4,col("book")).join(dfBook.repartition(4,col("book")) , "book")
sc.broadcast(dfBook)
val totalBookSold = df.join(dfBook , "book")
totalBookSold.explain(true)
查询计划有广播和无广播相同
== Physical Plan ==
*(3) Project [book#698, author#697, sold_copy#708]
+- *(3) SortMergeJoin [book#698], [book#707], Inner
:- Sort [book#698 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(book#698, 200), [id=#2071]
: +- *(1) Project [_1#694 AS author#697, _2#695 AS book#698]
: +- *(1) Filter isnotnull(_2#695)
: +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#694, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#695]
: +- Scan[obj#693]
+- Sort [book#707 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(book#707, 200), [id=#2079]
+- *(2) Project [_1#704 AS book#707, _2#705 AS sold_copy#708]
+- *(2) Filter isnotnull(_1#704)
+- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#704, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#705]
+- Scan[obj#703]
此链接解决了我的问题