1

I'm very new to PySpark and been having a challenge with partitioning data.

I have 2 datasets:

  • Ad data set (very big) with ad_id and some attribute columns
  • Ad transactions data set (smaller), includes ad_id and transaction date

It appears to me that i can only partition by ad_id, my question is: how can i evenly distribute data by the ranges of ad_id for both data set, so that when i need to compute a join between the 2 sets, it'll be faster?

here is what i'm trying to do:

ads.write.partitionBy("ad_id").mode('overwrite').parquet(os.path.join(output_data, 'ads_table'))

Thanks!

4

1 回答 1

1

使用分桶

如果您使用的是 spark v2.3 及更高版本,则可以使用分来避免写入后在连接上发生的洗牌。

通过分桶,您可以根据一列(通常是您要加入的列)将数据放入桶中。然后当 spark 再次从存储桶中读取数据时,您将不需要执行交换。

1. 样本数据

交易(事实)

t1.sample(n=5)

ad_id     impressions
30        528749
1         552233
30        24298
30        311914
60        41661

名称(维度)

t2.sample(n=5)

ad_id     brand_name
1         McDonalds
30        McDonalds
30        Coca-Cola
1         Coca-Cola
30        Levis

2.禁用广播加入

由于一张桌子很大而另一张桌子很小,因此您需要禁用broadcastJoin.

sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

3. 不分桶

t = spark.createDataFrame(t1)
b = spark.createDataFrame(t2)


t.write.saveAsTable('unbucketed_transactions')
b.write.saveAsTable('unbucketed_brands')

unbucketed_transactions = sqlContext.table("unbucketed_transactions")
unbucketed_brands = sqlContext.table("unbucketed_brands")


unbucketed_transactions.join(unbucketed_brands, 'ad_id').explain()



+- Project [ad_id#1842L, impressions#1843L, brand_name#1847]
   +- SortMergeJoin [ad_id#1842L], [ad_id#1846L], Inner
      :- Sort [ad_id#1842L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(ad_id#1842L, 200), true, [id=#1336]     <-- 0_0
      :     +- Project [ad_id#1842L, impressions#1843L]
      :        +- Filter isnotnull(ad_id#1842L)
      :           +- FileScan parquet default.unbucketed_transactions
      +- Sort [ad_id#1846L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(ad_id#1846L, 200), true, [id=#1337]     <-- 0_0 
            +- Project [ad_id#1846L, brand_name#1847]
               +- Filter isnotnull(ad_id#1846L)
                  +- FileScan parquet default.unbucketed_brands


正如您所看到的,由于未存储的连接而发生了交换。

4. 带桶

# The number 30 tells spark how large the buckets should be. 
# The second parameter is what column the bucket should be based on.

unbucketed_transactions.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_transactions')


unbucketed_brands.write \
.bucketBy(30,'ad_id') \
.sortBy('ad_id') \
.saveAsTable('bucketed_brands')

transactions = sqlContext.table("bucketed_transactions")
brands = sqlContext.table("bucketed_brands")

transactions.join(brands, 'ad_id').explain()


+- Project [ad_id#1867L, impressions#1868L, brand_name#1872]
   +- SortMergeJoin [ad_id#1867L], [ad_id#1871L], Inner
      :- Sort [ad_id#1867L ASC NULLS FIRST], false, 0
      :  +- Project [ad_id#1867L, impressions#1868L]
      :     +- Filter isnotnull(ad_id#1867L)
      :        +- FileScan parquet default.bucketed_transactions
      +- Sort [ad_id#1871L ASC NULLS FIRST], false, 0
         +- Project [ad_id#1871L, brand_name#1872]
            +- Filter isnotnull(ad_id#1871L)
               +- FileScan parquet default.bucketed_brands

从上面的计划可以看出,没有更多的交换发生。因此,您将通过避免交换来提高您的绩效。

于 2021-02-13T01:19:41.450 回答