6

我目前正在使用 spark 和 foreach 分区处理数据,打开与 mysql 的连接并将其以 1000 个批量插入数据库。如SparkDocumentation中所述,默认值为spark.sql.shuffle.partitions200,但我想保持动态。那么,我该如何计算呢。因此,既不选择非常高的值导致性能下降,也不选择非常小的值导致OOM.

4

2 回答 2

-1

试试下面的选项 -

val numExecutors         = spark.conf.get("spark.executor.instances").toInt

val numExecutorsCores    = spark.conf.get("spark.executor.cores").toInt

val numShufflePartitions = (numExecutors * numExecutorsCores)

spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)

这将帮助您根据用于 spark 作业的 executor 和 executors 核心设置正确数量的 shuffle 分区,而不会影响性能并导致 Out Of Memory 问题。

如果您仍然没有记忆,它们将设置在属性下方 -

spark.conf.set("spark.executor.memoryOverhead", "3G")

其他选项是通过块大小计算Dataframe大小并使用结果数来设置。didviehdfsspark.sql.shuffle.partitions

于 2019-06-22T21:26:18.363 回答
-3

您可以使用df.repartition(numPartitions)方法来执行此操作。您可以根据输入/中间输出做出决定,并将numPartitions 传递给 repartition() 方法。

df.repartition(numPartitions)   or rdd.repartition(numPartitions)
于 2016-06-15T04:46:39.687 回答