5

我有一个名为“edges”的大数据

org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[(String, Int)]] = MappedRDD[27] at map at <console>:52

当我在独立模式下工作时,我能够收集、计数和保存这个文件。现在,在集群上,我收到了这个错误

edges.count
...
Serialized task 28:0 was 12519797 bytes which exceeds spark.akka.frameSize
  (10485760 bytes). Consider using broadcast variables for large values.

与 .saveAsTextFile("edges") 相同

这是来自火花壳。我尝试使用选项
--driver-java-options "-Dspark.akka.frameSize=15"

但是当我这样做时,它只是无限期地挂起。任何帮助,将不胜感激。

** 编辑 **

我的独立模式是 Spark 1.1.0,我的集群是 Spark 1.0.1。

此外,当我对 RDD 进行计数、收集或保存时会发生挂起,但定义它或对其进行过滤器工作得很好。

4

1 回答 1

9

“考虑对大值使用广播变量”错误消息通常表明您在函数闭包中捕获了一些大变量。例如,你可能写过类似的东西

val someBigObject = ...
rdd.mapPartitions { x => doSomething(someBigObject, x) }.count()

这会导致someBigObject您的任务被捕获和序列化。如果您正在做类似的事情,您可以使用广播变量,这将导致仅对对象的引用存储在任务本身中,而实际的对象数据将单独发送。

在 Spark 1.1.0+ 中,没有必要为此使用广播变量,因为任务将自动被广播(有关更多详细信息,请参阅SPARK-2521)。仍然有理由使用广播变量(例如在多个动作/作业之间共享一个大对象),但您不需要使用它来避免帧大小错误。

另一种选择是增加 Akka 帧大小。在任何 Spark 版本中,您都应该能够在创建 SparkContext 之前进行spark.akka.frameSize设置。但是,您可能已经注意到,在为您创建上下文的地方SparkConf,这有点困难。spark-shell在较新版本的 Spark(1.1.0 及更高版本)中,您可以--conf spark.akka.frameSize=16在启动时通过spark-shell. 在 Spark 1.0.1 或 1.0.2 中,您应该可以通过--driver-java-options "-Dspark.akka.frameSize=16"

于 2014-12-01T08:39:55.523 回答