在注释 1 中的这段代码中,listbuffer 项的长度正确显示,但在第二条注释中,代码永远不会执行。为什么会发生?

val conf = new SparkConf().setAppName("app").setMaster("local")
val sc = new SparkContext(conf)

var wktReader: WKTReader = new WKTReader(); 
val dataSet = sc.textFile("dataSet.txt")

val items = new ListBuffer[String]() 
dataSet.foreach { e =>
  items += e
  println("len = " + items.length) //1. here length is ok

items.foreach { x => print(x)} //2. this code doesn't be executed


Apache Spark 不提供共享内存,因此在这里:

dataSet.foreach { e =>
  items += e
  println("len = " + items.length) //1. here length is ok


items.foreach { x => print(x) }




val acc = sc.collectionAccumulator[String]("Items")
dataSet.foreach(e => acc.add(e))
于 2016-11-20T08:41:42.243 回答

Spark 在执行器中运行并返回结果。上面的代码没有按预期工作。如果需要添加元素,foreach则需要收集驱动程序中的数据并添加到current_set. 但是,当您拥有大量数据时,收集数据是个坏主意。

val items = new ListBuffer[String]()

val rdd = spark.sparkContext.parallelize(1 to 10, 4)
rdd.collect().foreach(data => items += data.toString())


ListBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
于 2018-07-06T10:54:44.443 回答