1

我只想将 Akka 演员用作邮箱,即我想创建 n 个线程,每个线程创建 1 个远程演员。

每个线程获取对其他线程的所有远程参与者的引用,以便它们可以通过各自的参与者相互发送消息。

演员定义如下:

case class Receive
case class GroupReceive(id: Symbol)
case class GroupMsg[T](id: Symbol, msg: T)
class FooParActor(val distributor: Distributor) extends Actor
  with Stash {
  import context._

  val globalRank: Int = distributor.globalRank

  def doReceive(realSender: ActorRef, ID: Symbol) {
    unstashAll()
    become({
      case GroupMsg(ID, msg) =>
        realSender ! msg
        unbecome()
      case GroupMsg(otherId, msg) =>
        println(globalRank + ": stashing " + otherId)
        unbecome()
      case x => sys.error("bad msg: " + x)
    }, discardOld = false)
  }

  def receive = {
    case GroupReceive(id) =>
      doReceive(sender, id)
    case GroupMsg(id, x) =>
      stash()
    case x => sys.error("bad msg: " + x)
  }

}

为了阅读消息,所有者线程发送GroupReceive('someSymbol)给他的本地actor,后者又将 GroupMsg 转发给线程。从线程的角度来看,读取消息的代码如下所示:

def groupRcv[T](id:Symbol) = Await.result(aref ? GroupReceive(id), timeout.duration).asInstanceOf[T]

wherearef是对该线程的本地参与者的引用。

我有时会遇到上述模式的死锁(超时 5 秒),即使使用极其简单且消息很少。我将问题缩小到演员在收到GroupReceive(id)消息后停滞不前,但在进入 doReceive(...): 的第一个案例之前case GroupMsg(ID, msg) =>

我进行了打印输出跟踪,以检查演员在进行 doReceive 调用之前是否在存储中确实有消息,而且似乎出于某种原因,他们只是不处理它们。我上面提供的代码可以进入 a从 a的存储中GroupMsg()丢失的状态吗?FooParActor或者有没有其他方法可以让actor在收到GroupReceive()消息后陷入僵局?

4

1 回答 1

2

您正在使用Await.result()但不共享您执行此操作的位置:如果您调用groupRcv了您的演员应该在其上运行的线程,那么您当然可以饿死(即目标演员没有可用于运行的线程,所以它永远不会完成请求)。

似乎您正在以一种不健康的方式将基于线程的并发与参与者混合,但由于您只是暗示它并且没有显示代码,因此我只能给您广泛的建议,不要这样做。在给actor编程时,忘记线程;这些由 Akka 管理。特别是不要滥用 Akka 的线程(即Await.result可能会在您自己的外部线程池上工作,尽管几乎总是有更好的选择)。

最后,如果您使用 actor 只是为了创建“带有邮箱的线程”,那么 Akka 无法帮助您,您将遇到所有常见的传统并发陷阱。

于 2013-03-06T20:08:29.957 回答