2

我正在使用 Spark-2.2。我正在 POCing Spark 的分桶。我创建了一个分桶表,这是desc formatted my_bucketed_tbl输出:

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|              bundle|              string|   null|
|                 ifa|              string|   null|
|               date_|                date|   null|
|                hour|                 int|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|             my_bucketed_tbl|
|               Owner|            zeppelin|       |
|             Created|Thu Dec 21 13:43:...|       |
|         Last Access|Thu Jan 01 00:00:...|       |
|                Type|            EXTERNAL|       |
|            Provider|                 orc|       |
|         Num Buckets|                  16|       |
|      Bucket Columns|             [`ifa`]|       |
|        Sort Columns|             [`ifa`]|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|hdfs:/user/hive/w...|       |
|       Serde Library|org.apache.hadoop...|       |
|         InputFormat|org.apache.hadoop...|       |
|        OutputFormat|org.apache.hadoop...|       |
|  Storage Properties|[serialization.fo...|       |
+--------------------+--------------------+-------+

当我通过查询执行组解释时,我可以看到我们已经避免了交换阶段:

sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain

== Physical Plan ==
SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
+- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
   +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
      +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>

但是,当我将 Spark 的max函数替换为 时collect_set,我可以看到执行计划与非分桶表相同,这意味着交换阶段不能幸免:

sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain

== Physical Plan ==
ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
+- Exchange hashpartitioning(ifa#1010, 200)
   +- ObjectHashAggregate(keys=[ifa#1010], functions=[partial_collect_set(bundle#998, 0, 0)])
      +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, Format: ORC, Location: InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<bundle:string,ifa:string>

是否有任何我错过的配置,或者这是 Spark 的分桶目前存在的限制?

4

1 回答 1

1

该问题已在版本 2.2.1 中修复。您可以在此处找到 Jira 问题

于 2018-01-19T04:19:37.753 回答