我目前正在使用 spark 和 foreach 分区处理数据,打开与 mysql 的连接并将其以 1000 个批量插入数据库。如SparkDocumentation中所述,默认值为spark.sql.shuffle.partitions
200,但我想保持动态。那么,我该如何计算呢。因此,既不选择非常高的值导致性能下降,也不选择非常小的值导致OOM
.
问问题
4393 次
2 回答
-1
试试下面的选项 -
val numExecutors = spark.conf.get("spark.executor.instances").toInt
val numExecutorsCores = spark.conf.get("spark.executor.cores").toInt
val numShufflePartitions = (numExecutors * numExecutorsCores)
spark.conf.set("spark.sql.shuffle.partitions", numShufflePartitions)
这将帮助您根据用于 spark 作业的 executor 和 executors 核心设置正确数量的 shuffle 分区,而不会影响性能并导致 Out Of Memory 问题。
如果您仍然没有记忆,它们将设置在属性下方 -
spark.conf.set("spark.executor.memoryOverhead", "3G")
其他选项是通过块大小计算Dataframe
大小并使用结果数来设置。didvie
hdfs
spark.sql.shuffle.partitions
于 2019-06-22T21:26:18.363 回答
-3
您可以使用df.repartition(numPartitions)方法来执行此操作。您可以根据输入/中间输出做出决定,并将numPartitions 传递给 repartition() 方法。
df.repartition(numPartitions) or rdd.repartition(numPartitions)
于 2016-06-15T04:46:39.687 回答