8

我正在运行 Spark 作业来聚合数据。我有一个名为 Profile 的自定义数据结构,它基本上包含一个mutable.HashMap[Zone, Double]. 我想使用以下代码合并所有共享给定密钥(UUID)的配置文件:

def merge = (up1: Profile, up2: Profile) => { up1.addWeights(up2); up1}
val aggregated = dailyProfiles
  .aggregateByKey(new Profile(), 3200)(merge, merge).cache()

奇怪的是,Spark 失败并出现以下错误:

org.apache.spark.SparkException:作业因阶段故障而中止:116318 个任务的序列化结果的总大小(1024.0 MB)大于 spark.driver.maxResultSize(1024.0 MB)

显而易见的解决方案是增加“spark.driver.maxResultSize”,但有两件事让我感到困惑。

  1. 我得到的 1024.0 大于 1024.0 太巧合了
  2. 我在谷歌上搜索此特定错误和配置参数时发现的所有文档和帮助表明它会影响将值返回给驱动程序的函数。(比如说take()collect()),但我没有向驱动程序带任何东西,只是从 HDFS 读取、聚合、保存回 HDFS。

有谁知道我为什么会收到这个错误?

4

1 回答 1

1

的,它失败了,因为我们在异常消息中看到的值被四舍五入,并且以字节为单位进行比较

该序列化输出必须大于 1024.0 MB 且小于 1024.1 MB

检查添加的 Apache Spark 代码片段,非常有趣且很少出现此错误。:)

这里totalResultSize > maxResultSize两者都是 Long 类型,并且 in 保存以字节为单位的值。但msg保留从 的四舍五入值 Utils.bytesToString()

//TaskSetManager.scala
  def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
    totalResultSize += size
    calculatedTasks += 1
    if (maxResultSize > 0 && totalResultSize > maxResultSize) {
      val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
        s"(${Utils.bytesToString(totalResultSize)}) is bigger than spark.driver.maxResultSize " +
        s"(${Utils.bytesToString(maxResultSize)})"
      logError(msg)
      abort(msg)
      false
    } else {
      true
    }
  }

Apache Spark 1.3 - 源代码


//Utils.scala
  def bytesToString(size: Long): String = {
    val TB = 1L << 40
    val GB = 1L << 30
    val MB = 1L << 20
    val KB = 1L << 10

    val (value, unit) = {
      if (size >= 2*TB) {
        (size.asInstanceOf[Double] / TB, "TB")
      } else if (size >= 2*GB) {
        (size.asInstanceOf[Double] / GB, "GB")
      } else if (size >= 2*MB) {
        (size.asInstanceOf[Double] / MB, "MB")
      } else if (size >= 2*KB) {
        (size.asInstanceOf[Double] / KB, "KB")
      } else {
        (size.asInstanceOf[Double], "B")
      }
    }
    "%.1f %s".formatLocal(Locale.US, value, unit)
  }

Apache Spark 1.3 - 源代码

于 2016-11-12T16:59:37.180 回答