2

最近,我一直在使用 Akka 开发不同的示例;例如 playtool P2P 模型。为了更好地评估、基准测试和理解,我尝试使用Akka 引入的不同类型的调度程序来运行该示例。

我一直面临的问题是,特别是BalancingDispatcher与 a 一起使用thread-pool-executor(默认情况下应该用于这种类型的调度程序),应用程序往往会进入完全等待状态;这是僵局吗?!当我jstack用来获取应用程序中线程的转储时,突出的观察结果是所有调度程序线程都处于TIMED_WAITINGWAITING状态。

我很想知道这是否是由于应用程序设计造成的死锁?而且,更重要的是,我如何才能调试或深入研究处理此类问题?

此应用程序中最有用的代码片段可以是:

每个中的search方法Node向同行询问搜索查询并返回结果

  def search(query: String): Collection[String] = {
    var result: List[String] = List[String]()
    this.list().toList.foreach { q =>
        if (q.contains(query)) {
            result = query :: result
        }
    }
    implicit val timeout = Timeout(1 hour)
    // println("[" + this + "]'s peers: " + peers)
    for (p <- peers) {
      // println("asking [" + p + "] for [" + query + "]")
      val tmp: Future[Collection[String]] = ask(p.asInstanceOf[ActorRef], _search(query)).mapTo[Collection[String]]
      msgs.incrementAndGet()
      val l = Await.result(tmp, timeout.duration).asInstanceOf[Collection[String]]
      l.toList.foreach { q =>
        if (q.contains(query)) {
            result = q :: result
        }
      }
    }
    result.toList
  }

主要部分中用于评估节点网络的片段:

import context.dispatcher
var start = System.currentTimeMillis
val futures = List.fill(querySize)(ask(root, _search("_" + r.nextInt(100))).mapTo[Collection[String]])
val results = Future.sequence(futures)
Await.ready(results, Timeout(1000 hour).duration)
var end = System.currentTimeMillis
4

0 回答 0