我正在尝试通过在 spark 数据帧上使用分区来调整 spark 的性能。这是代码:
file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
.where((func.col("organization") == organization))
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()
在执行期间,first()
我检查了 Spark UI 中的作业阶段,在这里我发现了什么:
- 为什么舞台上没有
repartition
台阶? - 为什么还有第8阶段?我只请求了一项操作
first()
。是因为 shuffle 造成的repartition
吗? - 有没有办法更改镶木地板文件的重新分区而不必进行此类操作?最初当我阅读时,
df
您可以看到它已分区超过 43k 分区,这确实很多(与我将其保存到 csv 文件时的大小相比:4 MB 和 13k 行)并在进一步的步骤中产生问题,这就是为什么我想重新分区它。 - 我应该
cache()
在重新分区后使用吗?df = df.repartition(10).cache()
? 当我df.first()
第二次执行时,我也得到了一个带有 43k 分区的预定阶段,尽管df.rdd.getNumPartitions()
它返回了 10 个。编辑:分区的数量只是为了尝试。我的问题旨在帮助我了解如何进行正确的重新分区。
注意:最初 Dataframe 是从 Hadoop 中的一系列 parquet 文件中读取的。
我已经将此作为参考阅读Spark partition(ing) 如何处理 HDFS 中的文件?