0

给定以下函数对象,

val f : Int => Double = (i:Int) => i + 0.1

val g1 : Double => Double = (x:Double) => x*10

val g2 : Double => Double = (x:Double) => x/10

val h : (Double,Double) => Double = (x:Double,y:Double) => x+y

例如 3 个远程服务器或节点(IP xxx.xxx.xxx.1、IP 2 和 IP 3),如何分配该程序的执行,

val fx = f(1)
val g1x = g1( fx )
val g2x = g2( fx )
val res = h ( g1x, g2x )

以便

  • fx在 IP 1 中计算,
  • g1x在 IP 2 中计算,
  • g2x在 IP 3 中计算,
  • res在 IP 1 中计算

Scala Akka 或 Apache Spark 可以提供一个简单的方法吗?

更新

  • @pkinsky建议的RPC(远程过程调用)Finagle 可能是一个可行的选择。
  • 将负载平衡策略视为一种选择执行节点的机制,至少是任何可用的免费节点策略。
4

1 回答 1

1

我可以为Apache Spark发言。它可以用下面的代码做你正在寻找的东西。但它不是为这种并行计算而设计的。它专为并行计算而设计,您还可以将大量并行数据分布在许多机器上。所以这个解决方案看起来有点傻,因为我们在一台机器上分配一个整数,例如(for f(1))。

此外,Spark 旨在对所有数据运行相同的计算。所以运行g1()g2()并行有点违背设计。(如您所见,这是可能的,但并不优雅。)

// Distribute the input (1) across 1 machine.
val rdd1 = sc.parallelize(Seq(1), numSlices = 1)
// Run f() on the input, collect the results and take the first (and only) result.
val fx = rdd1.map(f(_)).collect.head
// The next stage's input will be (1, fx), (2, fx) distributed across 2 machines.
val rdd2 = sc.parallelize(Seq((1, fx), (2, fx)), numSlices = 2)
// Run g1() on one machine, g2() on the other.
val gxs = rdd2.map {
  case (1, x) => g1(x)
  case (2, x) => g2(x)
}.collect
val g1x = gxs(0)
val g2x = gxs(1)
// Same deal for h() as for f(). The input is (g1x, g2x), distributed to 1 machine.
val rdd3 = sc.parallelize(Seq((g1x, g2x)), numSlices = 1)
val res = rdd3.map { case (g1x, g2x) => h(g1x, g2x) }.collect.head

您可以看到 Spark 代码基于RDD的概念。一个 RDD 就像一个数组,除了它是在多台机器上分区的。sc.parallelize()从本地集合创建这样的并行集合。例如rdd2,在上面的代码中,将从本地集合中创建Seq((1, fx), (2, fx))并拆分到两台机器上。一台机器将拥有Seq((1, fx)),另一台将拥有Seq((2, fx))

接下来我们对 RDD进行转换。map是一种常见的转换,它通过对每个元素应用一个函数来创建一个相同长度的新 RDD。(与 Scala 的 相同。mapmap我们运行的rdd2将替换(1, x)g1(x)(2, x)g2(x)因此,它会在一台机器g1()上运行,而在另一台机器上g2()运行。

只有当您想要访问结果时,转换才会延迟运行。访问结果的方法称为操作。最直接的例子是collect,它将整个 RDD 的内容从集群下载到本地机器。(正好相反sc.parallelize()。)

如果您下载 Spark、startbin/spark-shell并将您的函数定义和上述代码复制到 shell 中,您可以尝试查看所有这些。

于 2014-11-04T12:42:54.973 回答