1

Spark Jobs 的新手,我遇到以下问题。

当我对任何新加入的数据帧进行计数时,该作业会运行很长时间并将内存溢出到磁盘。这里有逻辑错误吗?

    // pass spark configuration
    val conf = new SparkConf()
      .setMaster(threadMaster)
      .setAppName(appName)

    // Create a new spark context
    val sc = new SparkContext(conf)

    // Specify a SQL context and pass in the spark context we created
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)


    // Create three dataframes for sent and clicked files. Mark them as raw, since they will be renamed
    val dfSentRaw = sqlContext.read.parquet(inputPathSent)
    val dfClickedRaw = sqlContext.read.parquet(inputPathClicked)
    val dfFailedRaw  = sqlContext.read.parquet(inputPathFailed)



    // Rename the columns to avoid ambiguity when accessing the fields later
    val dfSent = dfSentRaw.withColumnRenamed("customer_id", "sent__customer_id")
      .withColumnRenamed("campaign_id", "sent__campaign_id")
      .withColumnRenamed("ced_email", "sent__ced_email")
      .withColumnRenamed("event_captured_dt", "sent__event_captured_dt")
      .withColumnRenamed("riid", "sent__riid")


    val dfClicked = dfClickedRaw.withColumnRenamed("customer_id", "clicked__customer_id")
      .withColumnRenamed("event_captured_dt", "clicked__event_captured_dt")
    val dfFailed = dfFailedRaw.withColumnRenamed("customer_id", "failed__customer_id")


    // LEFT Join with CLICKED on two fields, customer_id and campaign_id
    val dfSentClicked = dfSent.join(dfClicked, dfSent("sent__customer_id") === dfClicked("clicked__customer_id")
      && dfSent("sent__campaign_id") === dfClicked("campaign_id"), "left")
     dfSentClicked.count() //THIS WILL NOT WORK

val dfJoined = dfSentClicked.join(dfFailed, dfSentClicked("sent__customer_id") === dfFailed("failed__customer_id")
      && dfSentClicked("sent__campaign_id") === dfFailed("campaign_id"), "left")

为什么不能再计算这两个/三个数据帧?我是否通过重命名弄乱了一些索引?

谢谢!

在此处输入图像描述

4

1 回答 1

1

count调用是您的 Spark 作业的唯一实际实现,因此这并不是真正count的问题,而是为join之前的权利进行的洗牌。您没有足够的内存来进行连接而不溢出到磁盘。在随机播放中溢出到磁盘是一种非常简单的方法,可以让您的 Spark 作业永远耗时 =)。

真正有助于防止随机播放溢出的一件事是拥有更多分区。然后在任何给定时间通过洗牌移动的数据更少。您可以设置spark.sql.shuffle.partitions哪些控制 Spark Sql 在连接或聚合中使用的分区数。它默认为 200,因此您可以尝试更高的设置。http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

您可以通过增加spark.shuffle.memoryFraction(默认为 0.4)和减少spark.storage.memoryFraction(默认为 0.6)来增加本地 Spark 分配的堆大小和/或增加可用于洗牌的内存比例。例如,当您.cache拨打电话时会使用存储分数,而您可能并不关心这一点。

如果您非常倾向于完全避免溢出,您可以通过设置spark.shuffle.spill来关闭溢出false。我相信如果您的内存用完并且需要溢出而不是永远默默地等待,这将引发异常,并且可以帮助您更快地配置内存分配。

于 2015-10-28T16:14:54.943 回答