0

我的 spark 工作目前在 59 分钟内运行。我想优化它,以便我花费更少的时间。我注意到作业的最后一步需要很长时间(55 分钟)(请参阅下面 Spark UI 中的 spark 作业的屏幕截图)。

我需要将一个大数据集与一个较小的数据集连接起来,在这个连接的数据集上应用转换(创建一个新列)。

最后,我应该有一个基于列重新分区的数据集PSP参见下面的代码片段)。我还在最后执行排序(根据 3 列对每个分区进行排序)。

所有详细信息(基础架构、配置、代码)都可以在下面找到。

我的代码片段:

    spark.conf.set("spark.sql.shuffle.partitions", 4158)

    val uh = uh_months
      .withColumn("UHDIN", datediff(to_date(unix_timestamp(col("UHDIN_YYYYMMDD"), "yyyyMMdd").cast(TimestampType)),
        to_date(unix_timestamp(col("january"), "yyyy-MM-dd").cast(TimestampType))))
"ddMMMyyyy")).cast(TimestampType)))
      .withColumn("DVA_1", date_format(col("DVA"), "dd/MM/yyyy"))
      .drop("UHDIN_YYYYMMDD")
      .drop("january")
      .drop("DVA")
      .persist()

    val uh_flag_comment = new TransactionType().transform(uh)
    uh.unpersist()

    val uh_joined = uh_flag_comment.join(broadcast(smallDF), "NO_NUM")
      .select(
        uh.col("*"),
        smallDF.col("PSP"),
        smallDF.col("minrel"),
        smallDF.col("Label"),
        smallDF.col("StartDate"))
      .withColumnRenamed("DVA_1", "DVA")

    smallDF.unpersist()

    val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP"))
    val uh_final = uh_joined.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    uh_final

已编辑 - 重新分区逻辑

    val sqlContext = spark.sqlContext
    sqlContext.udf.register("randomUDF", (partitionCount: Int) => {
      val r = new scala.util.Random
      r.nextInt(partitionCount)
      // Also tried with r.nextInt(partitionCount) + col("PSP")
    })

    val uh_to_be_sorted = uh_joined
        .withColumn("tmp", callUDF("RandomUDF", lit("4158"))
        .repartition(4158, col("tmp"))
        .drop(col("tmp"))
    val uh_final = uh_to_be_sorted.sortWithinPartitions(col("NO_NUM"), col("UHDIN"), col("HOURMV"))

    uh_final

smallDF是我广播的一个小数据集(535MB)。

TransactionTypeuh是一个类,我根据 3 列 ( MMED, DEBCRED, ) 的值向我的数据框中添加一列新的字符串元素,NMTGP使用正则表达式检查这些列的值。

由于未找到随机播放块,我以前遇到过很多问题(工作失败)。我发现我正在溢出到磁盘并且有很多 GC 内存问题,所以我将“spark.sql.shuffle.partitions”增加到 4158。

为什么是 4158?

Partition_count = (stage input data) / (target size of your partition)

所以Shuffle partition_count = (shuffle stage input data) / 200 MB = 860000/200=4300

我有16*24 - 6 =378 cores availaible。因此,如果我想一次性运行所有任务,我应该将 4300 除以 378,大约是 11。然后11*378=4158

火花版本:2.1

集群配置:

  • 24 个计算节点(工作者)
  • 每个 16 个 vcore
  • 每个节点 90 GB RAM
  • 6 个内核已被其他进程/作业使用

当前 Spark 配置:

-主人:纱线

-执行器内存:26G

-执行器核心:5

-驱动内存:70G

-num-executors:70

-spark.kryoserializer.buffer.max=512

-spark.driver.cores=5

-spark.driver.maxResultSize=500m

-spark.memory.storageFraction=0.4

-spark.memory.fraction=0.9

-spark.hadoop.fs.permissions.umask-mode=007

作业如何执行:

我们使用 IntelliJ 构建一个工件(jar),然后将其发送到服务器。然后执行一个 bash 脚本。这个脚本:

  • 导出一些环境变量(SPARK_HOME、HADOOP_CONF_DIR、PATH 和 SPARK_LOCAL_DIRS)

  • 使用上面 spark 配置中定义的所有参数启动 spark-submit 命令

  • 检索应用程序的纱线日志

Spark 用户界面截图

有向无环图

有向无环图

分阶段所有作业

需要改进的工作的详细阶段

需要很多时间的阶段

4

1 回答 1

1

@阿里

从摘要指标中,我们可以说您的数据是倾斜的(最大持续时间:49 分钟和最大随机读取大小/记录:2.5 GB/23,947,440,平均而言,它需要大约 4-5 分钟并且处理少于 200 MB/1.2 MM 行)

既然我们知道问题可能是少数分区中的数据倾斜,我认为我们可以val uh_to_be_sorted = uh_joined.repartition(4158, col("PSP"))通过选择某些东西(如其他列或向 PSP 添加任何其他列)来更改重新分区逻辑来解决这个问题

关于数据倾斜和修复的几个链接

https://dzone.com/articles/optimize-spark-with-distribute-by-cluster-by

https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

希望这可以帮助

于 2019-10-22T18:36:37.393 回答