-2

我正在使用 databricks 社区版笔记本学习火花。我创建了几行的示例数据。由于数据非常小,因此在查询计划中不应该有交换阶段。我也尝试过广播,但我仍然看到交换阶段。这些配置在 DB 社区版笔记本上不起作用吗?

    import org.apache.spark.sql.functions.{col,regexp_replace}

val authorBook = sc.parallelize(Seq(("a1" , "b1") , ("a1","b2") , ("a2" , "b3"),("a3" , "b4")))
val schemaColumn = Seq("author","book")
val df = authorBook.toDF(schemaColumn:_*)

val bookSold = sc.parallelize(Seq(("b1",100) , ("b2" , 500) , ("b3" , 400) , ("b4" , 500)) )
val bookSchema = Seq("book" , "sold_copy")
val dfBook = bookSold.toDF(bookSchema:_*)

///val totalBookSold = df.repartition(4,col("book")).join(dfBook.repartition(4,col("book")) , "book")

sc.broadcast(dfBook)
val totalBookSold = df.join(dfBook , "book")
totalBookSold.explain(true)

查询计划有广播和无广​​播相同

    == Physical Plan ==
*(3) Project [book#698, author#697, sold_copy#708]
+- *(3) SortMergeJoin [book#698], [book#707], Inner
   :- Sort [book#698 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(book#698, 200), [id=#2071]
   :     +- *(1) Project [_1#694 AS author#697, _2#695 AS book#698]
   :        +- *(1) Filter isnotnull(_2#695)
   :           +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#694, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#695]
   :              +- Scan[obj#693]
   +- Sort [book#707 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(book#707, 200), [id=#2079]
         +- *(2) Project [_1#704 AS book#707, _2#705 AS sold_copy#708]
            +- *(2) Filter isnotnull(_1#704)
               +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#704, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#705]
                  +- Scan[obj#703]

此链接解决了我的问题

在 Spark 1.6 中加入数据帧时未发生广播

4

1 回答 1

0

由于在加入期间未使用广播,您可能会收到此消息。

尝试使用以下内容:

val totalBookSold = df.join(broadcast(dfBook) , "book")
totalBookSold.explain(true)

您必须看到逻辑计划中传递的提示,并且会看到物理计划中的差异。

与此类似:

== Parsed Logical Plan ==
'Join UsingJoin(Inner,List(book))
:- Project [_1#3 AS author#6, _2#4 AS book#7]
:  +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
:     +- ExternalRDD [obj#2]
+- ResolvedHint (broadcast)
   +- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
      +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#13, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#14]
         +- ExternalRDD [obj#12]

== Analyzed Logical Plan ==
book: string, author: string, sold_copy: int
Project [book#7, author#6, sold_copy#17]
+- Join Inner, (book#7 = book#16)
   :- Project [_1#3 AS author#6, _2#4 AS book#7]
   :  +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4]
   :     +- ExternalRDD [obj#2]
   +- ResolvedHint (broadcast)
      +- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
         +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#13, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#14]
            +- ExternalRDD [obj#12]

== Optimized Logical Plan ==
Project [book#7, author#6, sold_copy#17]
+- Join Inner, (book#7 = book#16)
   :- Project [_1#3 AS author#6, _2#4 AS book#7]
   :  +- Filter isnotnull(_2#4)
   :     +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
   :        +- ExternalRDD [obj#2]
   +- ResolvedHint (broadcast)
      +- Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
         +- Filter isnotnull(_1#13)
            +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#13, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#14]
               +- ExternalRDD [obj#12]

== Physical Plan ==
*(2) Project [book#7, author#6, sold_copy#17]
+- *(2) BroadcastHashJoin [book#7], [book#16], Inner, BuildRight
   :- *(2) Project [_1#3 AS author#6, _2#4 AS book#7]
   :  +- *(2) Filter isnotnull(_2#4)
   :     +- *(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4]
   :        +- Scan ExternalRDDScan[obj#2]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *(1) Project [_1#13 AS book#16, _2#14 AS sold_copy#17]
         +- *(1) Filter isnotnull(_1#13)
            +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#13, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#14]
               +- Scan ExternalRDDScan[obj#12]
于 2020-01-29T06:38:48.027 回答