最近,我一直在使用 Akka 开发不同的示例;例如 playtool P2P 模型。为了更好地评估、基准测试和理解,我尝试使用Akka 引入的不同类型的调度程序来运行该示例。
我一直面临的问题是,特别是BalancingDispatcher
与 a 一起使用thread-pool-executor
(默认情况下应该用于这种类型的调度程序),应用程序往往会进入完全等待状态;这是僵局吗?!当我jstack
用来获取应用程序中线程的转储时,突出的观察结果是所有调度程序线程都处于TIMED_WAITING
或WAITING
状态。
我很想知道这是否是由于应用程序设计造成的死锁?而且,更重要的是,我如何才能调试或深入研究处理此类问题?
此应用程序中最有用的代码片段可以是:
每个中的search
方法Node
向同行询问搜索查询并返回结果
def search(query: String): Collection[String] = {
var result: List[String] = List[String]()
this.list().toList.foreach { q =>
if (q.contains(query)) {
result = query :: result
}
}
implicit val timeout = Timeout(1 hour)
// println("[" + this + "]'s peers: " + peers)
for (p <- peers) {
// println("asking [" + p + "] for [" + query + "]")
val tmp: Future[Collection[String]] = ask(p.asInstanceOf[ActorRef], _search(query)).mapTo[Collection[String]]
msgs.incrementAndGet()
val l = Await.result(tmp, timeout.duration).asInstanceOf[Collection[String]]
l.toList.foreach { q =>
if (q.contains(query)) {
result = q :: result
}
}
}
result.toList
}
主要部分中用于评估节点网络的片段:
import context.dispatcher
var start = System.currentTimeMillis
val futures = List.fill(querySize)(ask(root, _search("_" + r.nextInt(100))).mapTo[Collection[String]])
val results = Future.sequence(futures)
Await.ready(results, Timeout(1000 hour).duration)
var end = System.currentTimeMillis