使用分桶
如果您使用的是 spark v2.3 及更高版本,则可以使用分桶来避免写入后在连接上发生的洗牌。
通过分桶,您可以根据一列(通常是您要加入的列)将数据放入桶中。然后当 spark 再次从存储桶中读取数据时,您将不需要执行交换。
1. 样本数据
交易(事实)
t1.sample(n=5)
ad_id impressions
30 528749
1 552233
30 24298
30 311914
60 41661
名称(维度)
t2.sample(n=5)
ad_id brand_name
1 McDonalds
30 McDonalds
30 Coca-Cola
1 Coca-Cola
30 Levis
2.禁用广播加入
由于一张桌子很大而另一张桌子很小,因此您需要禁用broadcastJoin
.
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
3. 不分桶
t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)
t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')
unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")
unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()
+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
+- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
:- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336] <-- 0_0
: +- Project [ad_id#1842L, impressions#1843L]
: +- Filter isnotnull(ad_id#1842L)
: +- FileScan parquet default.unbucketed_transactions
+- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337] <-- 0_0
+- Project [ad_id#1846L, brand_name#1847]
+- Filter isnotnull(ad_id#1846L)
+- FileScan parquet default.unbucketed_brands
正如您所看到的,由于未存储的连接而发生了交换。
4. 带桶
# The number 30 tells spark how large the buckets should be.
# The second parameter is what column the bucket should be based on.
unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')
unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')
transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")
transactions.join(brands, 'ad_id').explain()
+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
+- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
:- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
: +- Project [ad_id#1867L, impressions#1868L]
: +- Filter isnotnull(ad_id#1867L)
: +- FileScan parquet default.bucketed_transactions
+- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
+- Project [ad_id#1871L, brand_name#1872]
+- Filter isnotnull(ad_id#1871L)
+- FileScan parquet default.bucketed_brands
从上面的计划可以看出,没有更多的交换发生。因此,您将通过避免交换来提高您的绩效。