3

问题是关于Akka 演员库。A 想要将一项大任务拆分为较小的任务,然后将它们的结果折叠成一个“大”结果。这会给我更快的计算利润。如果较小的任务是独立的,则可以并行计算它们。

假设我们需要像这样计算一些东西。函数count2X很耗时,因此在一个线程中多次使用它并不是最佳选择。

//NOT OPTIMAL
def count2X(x: Int) = {
  Thread.sleep(1000)
  x * 2
}

val sum = count2X(1) + count2X(2) + count2X(3)
println(sum)

问题来了。

如何调度任务并收集结果然后折叠它们,所有这些都使用 akka 演员? Akka 是否已经提供了这样的功能,还是我需要自己实现?这种方法的最佳做法是什么。

这是我的问题的“视觉”解释:

             /-> [SMALL_TASK_1] -\
[BIG_TASK] -+--> [SMALL_TASK_1] --> [RESULT_FOLD]
             \-> [SMALL_TASK_1] -/

下面是我的脚手架实现缺少/错误的实现:)

case class Count2X(x: Int)

class Count2XActor extends Actor {
  def receive = {
    case Count2X(x) => count2X(x); // AND NOW WHAT ?
  }
}

case class CountSumOf2X(a: Int, b: Int, c: Int)

class SumOf2XActor extends Actor {
  val aCounter = context.actorOf(Props[Count2XActor])
  val bCounter = context.actorOf(Props[Count2XActor])
  val cCounter = context.actorOf(Props[Count2XActor])

  def receive = {
    case CountSumOf2X(a, b, c) => // AND NOW WHAT ? aCounter ! Count2X(a); bCounter ! Count2X(b); cCounter ! Count2X(c);
  }
}

val aSystem = ActorSystem("mySystem")
val actor = aSystem.actorOf(Props[SumOf2XActor])

actor ! CountSumOf2X(10, 20, 30)

谢谢你的帮助。

4

2 回答 2

2

在 Akka,我会做这样的事情:

val a = aCounter ? Count2X(10) mapTo[Int]
val b = bCounter ? Count2X(10) mapTo[Int]
val c = cCounter ? Count2X(10) mapTo[Int]
Await.result(Future.sequence(a, b, c) map (_.sum), 1 second).asInstanceOf[Int]

我确信有更好的方法 - 在这里你开始对结果求和,毕竟Future-s 是并行完成的,对于简单的任务没关系,但通常你不应该等那么久

于 2012-12-24T00:49:21.437 回答
2

你可以做两件事:

1) 使用 Akka 期货。这些允许您分派操作并以异步方式折叠它们。查看http://doc.akka.io/docs/akka/2.0.4/scala/futures.html了解更多信息。

2)您可以将工作分派给多个“工人”演员,然后让“主”演员聚合它们,通过将信息存储在消息本身中来跟踪哪些消息正在等待/处理。我在这里有一个使用 Akka 演员的简单股票报价示例:https ://github.com/ryanlecompte/quotes

于 2012-12-22T19:13:34.367 回答