0

说我有以下代码。

def f(x): (Array[Double], Array[Int])={
    val   data: Array[Double]   //1D array
    val   index: Array[Int]    //Data element's index

    //Read data from a file into "data"
    //Generate index (based on value "x") into "index"

    (dset_datas, index)
}

sc.range(0, 10, 1, 10).flatMap.(x => (f(x)._1 zip f(x)._2))

问题:

1) 对于平面图中的每个 x,函数 f(x) 会被调用两次吗?因为我先调用了 f(x)._1,然后调用了 f(x)._2。

2)flapmap会并行执行(尤其是数据读取部分)吗?假设我有 3 个节点,每个节点有 32 个核心。我设置了--num-executors=2 和--executor-cores=32。另一个节点用作驱动节点。

为了回答上述问题,我在文档中搜索了很多 Spark/Scala,但没有从那里得到任何答案。我试图在我自己的系统上运行代码。看起来像这样

1) f(x) 被调用了两次,因为我发现数据分区被处理了两次。但是,我不确定。

2)我注意到在火花日志文件系统下创建了两个执行程序文件夹,每个执行程序也有一些标准输出。但是,我也不确定。

谢谢 !

4

1 回答 1

0

1)每个工作人员将执行f(x)两次,因为它在您的函数文字中被调用两次 - 每次提取结果元组的不同元素。

range2) 你方法的最后一个参数是10,这意味着你的范围 RDD 将有 10 个分区。这意味着并行执行的上限flatMap是 10(如果你有 10 个执行器,flatMap可以在每个执行器上并行执行)。由于您有两个执行器,flatMap因此仍将并行执行,但仅在这两个执行器上执行。

于 2016-06-03T19:27:35.003 回答