我正在使用 Scala 和 Akka 建模一个简单的 P2P:
class Node() extends Peer with Actor {
var peers: List[ActorRef] = List()
def receive = {
case _register(peer: ActorRef, p: Option[Int]) => {
println("registering [" + peer + "] for [" + this + "]")
peers = peer :: peers
}
}
}
sealed case class _register(val peer: ActorRef, var p: Option[Int] = None)
然后是一个简单的网络:
class Network() extends Actor {
def this(name: String) = {
this()
val system = ActorSystem(name)
val s1 = system.actorOf(Props(new Node()), name = "s1")
val s2 = system.actorOf(Props(new Node()), name = "s2")
val c1 = system.actorOf(Props(new Node()), name = "c1")
val c2 = system.actorOf(Props(new Node()), name = "c2")
val c3 = system.actorOf(Props(new Node()), name = "c3")
val c4 = system.actorOf(Props(new Node()), name = "c4")
implicit val timeout = Timeout(5 second)
s1 ? _register(c1)
s1 ? _register(c2)
s1 ? _register(c3)
val lastRegistered = s2 ? _register(c4)
Await.ready(lastRegistered, timeout.duration)
println("initialized nodes")
}
}
我得到的输出总是这样的:
registering [Actor[akka://p2p/user/c1]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c2]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c3]] for [nl.cwi.crisp.examples.p2p.scala.Node@14b5f4a]
registering [Actor[akka://p2p/user/c4]] for [nl.cwi.crisp.examples.p2p.scala.Node@13c0b53]
[ERROR] [04/10/2012 22:07:04.34] [main-akka.actor.default-dispatcher-1] [akka://main/user/p2p] error while creating actor
java.util.concurrent.TimeoutException: Futures timed out after [5000] milliseconds
at akka.dispatch.DefaultPromise.ready(Future.scala:834)
at akka.dispatch.DefaultPromise.ready(Future.scala:811)
at akka.dispatch.Await$.ready(Future.scala:64)
at nl.cwi.crisp.examples.p2p.scala.Network.<init>(Node.scala:136)
at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
at nl.cwi.crisp.examples.p2p.scala.Main$$anonfun$11.apply(Node.scala:164)
at akka.actor.ActorCell.newActor(ActorCell.scala:488)
at akka.actor.ActorCell.create$1(ActorCell.scala:506)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:591)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:191)
at akka.dispatch.Mailbox.run(Mailbox.scala:160)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:997)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1495)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
我在 Akka 参考文档上遵循了Futures的文档。交换Await.ready
没有Await.result
任何效果。日志显示上次注册成功。
我应该如何解决这个问题?