我的 spark 工作目前在 59 分钟内运行。我想优化它,以便我花费更少的时间。我注意到作业的最后一步需要很长时间(55 分钟)(请参阅下面 Spark UI 中的 spark 作业的屏幕截图)。
我需要将一个大数据集与一个较小的数据集连接起来,在这个连接的数据集上应用转换(创建一个新列)。
最后,我应该有一个基于列重新分区的数据集PSP
(参见下面的代码片段)。我还在最后执行排序(根据 3 列对每个分区进行排序)。
所有详细信息(基础架构、配置、代码)都可以在下面找到。
我的代码片段:
spark.conf.set("spark.sql.shuffle.partitions", 4158)
val uh = uh_months
.withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
"ddMMMyyyy")).cast(TimestampType)))
.withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
.drop("UHDIN_YYYYMMDD")
.drop("january")
.drop("DVA")
.persist()
val uh_flag_comment = new TransactionType().transform(uh)
uh.unpersist()
val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
.select(
uh.col("*"),
smallDF.col("PSP"),
smallDF.col("minrel"),
smallDF.col("Label"),
smallDF.col("StartDate"))
.withColumnRenamed("DVA_1", "DVA")
smallDF.unpersist()
val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP"))
val uh_final = uh_joined.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))
uh_final
已编辑 - 重新分区逻辑
val sqlContext = spark.sqlContext
sqlContext.udf.register("randomUDF", (partitionCount: Int) => {
val r = new scala.util.Random
r.nextInt(partitionCount)
// Also tried with r.nextInt(partitionCount) + col("PSP")
})
val uh_to_be_sorted = uh_joined
.withColumn("tmp", callUDF("RandomUDF", lit("4158"))
.repartition(4158, col("tmp"))
.drop(col("tmp"))
val uh_final = uh_to_be_sorted.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))
uh_final
smallDF
是我广播的一个小数据集(535MB)。
TransactionType
uh
是一个类,我根据 3 列 ( MMED
, DEBCRED
, ) 的值向我的数据框中添加一列新的字符串元素,NMTGP
使用正则表达式检查这些列的值。
由于未找到随机播放块,我以前遇到过很多问题(工作失败)。我发现我正在溢出到磁盘并且有很多 GC 内存问题,所以我将“spark.sql.shuffle.partitions”增加到 4158。
为什么是 4158?
Partition_count = (stage input data) / (target size of your partition)
所以Shuffle partition_count = (shuffle stage input data) / 200 MB = 860000/200=4300
我有16*24 - 6 =378 cores availaible
。因此,如果我想一次性运行所有任务,我应该将 4300 除以 378,大约是 11。然后11*378=4158
火花版本:2.1
集群配置:
- 24 个计算节点(工作者)
- 每个 16 个 vcore
- 每个节点 90 GB RAM
- 6 个内核已被其他进程/作业使用
当前 Spark 配置:
-主人:纱线
-执行器内存:26G
-执行器核心:5
-驱动内存:70G
-num-executors:70
-spark.kryoserializer.buffer.max=512
-spark.driver.cores=5
-spark.driver.maxResultSize=500m
-spark.memory.storageFraction=0.4
-spark.memory.fraction=0.9
-spark.hadoop.fs.permissions.umask-mode=007
作业如何执行:
我们使用 IntelliJ 构建一个工件(jar),然后将其发送到服务器。然后执行一个 bash 脚本。这个脚本:
导出一些环境变量(SPARK_HOME、HADOOP_CONF_DIR、PATH 和 SPARK_LOCAL_DIRS)
使用上面 spark 配置中定义的所有参数启动 spark-submit 命令
检索应用程序的纱线日志
Spark 用户界面截图
有向无环图