0

我正在读取我保存到磁盘的两个数据集,这些数据集bucketBy在具有相同分区数的相同键上具有选项。当我读回它们并加入它们时,它们不应该导致洗牌。

但是,我看到的情况并非如此。以下代码演示了所谓的行为:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold", "-1").getOrCreate()
import random

data1 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for i in range(5)]
data2 = [(i,random.randint(1,5),random.randint(1,5)) for t in range(2) for i in range(5)]
df1=spark.createDataFrame(data1,schema = 'a int,b int,c int')
df2=spark.createDataFrame(data1,schema = 'a int,b int,c int')

parquet_path1 = './bucket_test_parquet1'
parquet_path2 = './bucket_test_parquet2'

df1.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path1,mode='overwrite')
df2.write.bucketBy(5,"a").format("parquet").saveAsTable('df',path=parquet_path2,mode='overwrite')

read_parquet1 = spark.read.format("parquet").load(parquet_path1,header=True)
read_parquet1.createOrReplaceTempView("read_parquet1")
read_parquet1.createOrReplaceTempView('read_parquet1')
read_parquet1 = spark.sql("SELECT * from read_parquet1")

read_parquet2 = spark.read.format("parquet").load(parquet_path2,header=True)
read_parquet2.createOrReplaceTempView("read_parquet2")
read_parquet2.createOrReplaceTempView('read_parquet2')
read_parquet2 = spark.sql("SELECT * from read_parquet2")
read_parquet1.join(read_parquet2,on='a').explain()

我得到的输出是

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [a#24, b#25, c#26, b#34, c#35]
   +- SortMergeJoin [a#24], [a#33], Inner
      :- Sort [a#24 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(a#24, 200), ENSURE_REQUIREMENTS, [id=#61]
      :     +- Filter isnotnull(a#24)
      :        +- FileScan parquet [a#24,b#25,c#26] Batched: true, DataFilters: [isnotnull(a#24)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/nitin/pymonsoon/bucket_test_parquet1], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:int,c:int>
      +- Sort [a#33 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(a#33, 200), ENSURE_REQUIREMENTS, [id=#62]
            +- Filter isnotnull(a#33)
               +- FileScan parquet [a#33,b#34,c#35] Batched: true, DataFilters: [isnotnull(a#33)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/home/nitin/pymonsoon/bucket_test_parquet2], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:int,c:int>

这显然有 hashpartitioning goiong 。请帮我澄清bucketBy

4

0 回答 0