0

我正在尝试通过在 spark 数据帧上使用分区来调整 spark 的性能。这是代码:

file_path1 = spark.read.parquet(*paths[:15])
df = file_path1.select(columns) \
    .where((func.col("organization") == organization)) 
df = df.repartition(10)
#execute an action just to make spark execute the repartition step
df.first()

在执行期间,first()我检查了 Spark UI 中的作业阶段,在这里我发现了什么: 工作详情 阶段 7 细节

  • 为什么舞台上没有repartition台阶?
  • 为什么还有第8阶段?我只请求了一项操作first()。是因为 shuffle 造成的repartition吗?
  • 有没有办法更改镶木地板文件的重新分区而不必进行此类操作?最初当我阅读时,df您可以看到它已分区超过 43k 分区,这确实很多(与我将其保存到 csv 文件时的大小相比:4 MB 和 13k 行)并在进一步的步骤中产生问题,这就是为什么我想重新分区它。
  • 我应该cache()在重新分区后使用吗?df = df.repartition(10).cache()? 当我df.first()第二次执行时,我也得到了一个带有 43k 分区的预定阶段,尽管 df.rdd.getNumPartitions()它返回了 10 个。编辑:分区的数量只是为了尝试。我的问题旨在帮助我了解如何进行正确的重新分区。

注意:最初 Dataframe 是从 Hadoop 中的一系列 parquet 文件中读取的。

我已经将此作为参考阅读Spark partition(ing) 如何处理 HDFS 中的文件?

4

1 回答 1

0
  • 只要有洗牌,就有新的舞台。并且
    重新分区会导致洗牌,这就是为什么你有两个阶段。
  • 当您多次使用数据帧以避免读取两次时,将使用缓存。

使用合并而不是重新分配。我认为它会减少洗牌,因为它只会减少分区的数量。

于 2019-02-25T14:12:10.753 回答