12

我在Akka 文档中读到,关闭来自封闭参与者的变量是危险的。

警告

在这种情况下,您需要小心避免关闭包含 Actor 的引用,即不要从匿名 Actor 类中调用封闭 Actor 的方法。这会破坏参与者封装,并可能引入同步错误和竞争条件,因为其他参与者的代码将同时调度到封闭参与者。

现在,我有两个演员,其中一个向第二个请求一些东西,并对结果做一些事情。在下面我整理的这个示例中,actor Accumulator从actor NumberGenerator 中检索数字并将它们相加,同时报告总和。

这可以通过至少两种不同的方式完成,如本例所示,使用两个不同的接收函数(AB)。两者的区别在于A不关闭counter变量;相反,它等待一个整数并将其相加,而B创建一个Future关闭计数器并进行求和。如果我正确理解它是如何工作的,这发生在一个仅为处理 onSuccess 而创建的匿名参与者中。

import com.esotericsoftware.minlog.Log

import akka.actor.{Actor, Props}
import akka.pattern.{ask, pipe}
import akka.util.Timeout
import akka.util.duration._

case object Start
case object Request


object ActorTest {
  var wake = 0

  val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator")
  val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator")

  Log.info("ActorTest", "Starting !")

  accRef ! Start
}

class Accumulator extends Actor {
  var counter = 0

  implicit val timeout = Timeout(5 seconds)

  // A: WITHOUT CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
  }
  // B: WITH CLOSURE
  def receive = {
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess {
      case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start
    }
  }
}

class NumberGenerator extends Actor {
  val rand = new java.util.Random()

  def receive = {
    case Request => sender ! rand.nextInt(11)-5
  }
}

在这种情况下使用闭包绝对是邪恶的吗?当然,我可以使用 AtomicInteger 而不是 Int,或者在某些网络场景中使用netty ,在线程安全通道上发出写入操作,但这不是我的意思。

冒着问可笑的风险:Future 的 onSuccess 有没有办法在这个演员而不是匿名的中间演员中执行,而不在接收函数中定义一个案例?

编辑

更清楚地说,我的问题是:有没有办法强制一系列 Futures 与给定的 Actor 在同一个线程中运行?

4

2 回答 2

5

问题是 将在onSuccess与参与者将要运行的线程不同的线程receive中运行。您可以使用该pipeTo方法,或使用Agent。制作counteranAtomicInteger可以解决问题,但它并不那么干净——也就是说,它破坏了 Actor 模型。

于 2012-06-21T02:04:13.097 回答
5

实现这种设计的最简单方法是使用“即发即弃”语义:

class Accumulator extends Actor {
  private[this] var counter = 0

  def receive = {
    case Start => ActorTest.genRef ! Request
    case x: Int => {
      counter += x
      Log.info("Accumulator", "counter = " + counter)
      self ! Start
    }
  }
}

此解决方案是完全异步的,您不需要任何超时。

于 2012-06-21T05:31:56.180 回答