7

问题陈述:我有一个需要以并行方式处理的证券组合。在 Java 中,我使用线程池来处理每个安全性,并使用锁存器进行倒计时。完成后,我会进行一些合并等。

所以我向我的 SecurityProcessor(它是一个演员)发送消息,并等待所有期货完成。最后,我使用 MergeHelper 进行后处理。SecurityProcessor 接受一个安全,做一些 i/o 和处理并回复一个安全

  val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures += (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get

这种设计是否正确,是否有更好/更酷的方法来使用 akka 实现这个常见问题?

4

1 回答 1

8
val futureResult = Future.sequence(
                  portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                ) map { securities => MergeHelper.merge(portfolio, securities) }
于 2011-11-08T09:30:32.293 回答