我可以为Apache Spark发言。它可以用下面的代码做你正在寻找的东西。但它不是为这种并行计算而设计的。它专为并行计算而设计,您还可以将大量并行数据分布在许多机器上。所以这个解决方案看起来有点傻,因为我们在一台机器上分配一个整数,例如(for f(1)
)。
此外,Spark 旨在对所有数据运行相同的计算。所以运行g1()
和g2()
并行有点违背设计。(如您所见,这是可能的,但并不优雅。)
// Distribute the input (1) across 1 machine.
val rdd1 = sc.parallelize(Seq(1), numSlices = 1)
// Run f() on the input, collect the results and take the first (and only) result.
val fx = rdd1.map(f(_)).collect.head
// The next stage's input will be (1, fx), (2, fx) distributed across 2 machines.
val rdd2 = sc.parallelize(Seq((1, fx), (2, fx)), numSlices = 2)
// Run g1() on one machine, g2() on the other.
val gxs = rdd2.map {
case (1, x) => g1(x)
case (2, x) => g2(x)
}.collect
val g1x = gxs(0)
val g2x = gxs(1)
// Same deal for h() as for f(). The input is (g1x, g2x), distributed to 1 machine.
val rdd3 = sc.parallelize(Seq((g1x, g2x)), numSlices = 1)
val res = rdd3.map { case (g1x, g2x) => h(g1x, g2x) }.collect.head
您可以看到 Spark 代码基于RDD的概念。一个 RDD 就像一个数组,除了它是在多台机器上分区的。sc.parallelize()
从本地集合创建这样的并行集合。例如rdd2
,在上面的代码中,将从本地集合中创建Seq((1, fx), (2, fx))
并拆分到两台机器上。一台机器将拥有Seq((1, fx))
,另一台将拥有Seq((2, fx))
。
接下来我们对 RDD进行转换。map
是一种常见的转换,它通过对每个元素应用一个函数来创建一个相同长度的新 RDD。(与 Scala 的 相同。map
)map
我们运行的rdd2
将替换(1, x)
为g1(x)
和(2, x)
。g2(x)
因此,它会在一台机器g1()
上运行,而在另一台机器上g2()
运行。
只有当您想要访问结果时,转换才会延迟运行。访问结果的方法称为操作。最直接的例子是collect
,它将整个 RDD 的内容从集群下载到本地机器。(正好相反sc.parallelize()
。)
如果您下载 Spark、startbin/spark-shell
并将您的函数定义和上述代码复制到 shell 中,您可以尝试查看所有这些。