我只想将 Akka 演员用作邮箱,即我想创建 n 个线程,每个线程创建 1 个远程演员。
每个线程获取对其他线程的所有远程参与者的引用,以便它们可以通过各自的参与者相互发送消息。
演员定义如下:
case class Receive
case class GroupReceive(id: Symbol)
case class GroupMsg[T](id: Symbol, msg: T)
class FooParActor(val distributor: Distributor) extends Actor
with Stash {
import context._
val globalRank: Int = distributor.globalRank
def doReceive(realSender: ActorRef, ID: Symbol) {
unstashAll()
become({
case GroupMsg(ID, msg) =>
realSender ! msg
unbecome()
case GroupMsg(otherId, msg) =>
println(globalRank + ": stashing " + otherId)
unbecome()
case x => sys.error("bad msg: " + x)
}, discardOld = false)
}
def receive = {
case GroupReceive(id) =>
doReceive(sender, id)
case GroupMsg(id, x) =>
stash()
case x => sys.error("bad msg: " + x)
}
}
为了阅读消息,所有者线程发送GroupReceive('someSymbol)
给他的本地actor,后者又将 GroupMsg 转发给线程。从线程的角度来看,读取消息的代码如下所示:
def groupRcv[T](id:Symbol) = Await.result(aref ? GroupReceive(id), timeout.duration).asInstanceOf[T]
wherearef
是对该线程的本地参与者的引用。
我有时会遇到上述模式的死锁(超时 5 秒),即使使用极其简单且消息很少。我将问题缩小到演员在收到GroupReceive(id)
消息后停滞不前,但在进入 doReceive(...): 的第一个案例之前case GroupMsg(ID, msg) =>
。
我进行了打印输出跟踪,以检查演员在进行 doReceive 调用之前是否在存储中确实有消息,而且似乎出于某种原因,他们只是不处理它们。我上面提供的代码可以进入 a从 a的存储中GroupMsg()
丢失的状态吗?FooParActor
或者有没有其他方法可以让actor在收到GroupReceive()
消息后陷入僵局?