0

我专门使用 twitter 的 AsyncStream,我需要获取并发处理的结果,并将其转换为 Seq,但我设法开始工作的唯一方法是可怕的。这感觉就像它应该是一个班轮,但我对 Await 和 force 的所有尝试要么挂起要么没有处理工作。

这就是我的工作 - 这样做更惯用的方式是什么?

  def processWork(work: AsyncStream[Work]): Seq[Result] = {
    // TODO: make less stupid
    val resultStream = work.flatMap { processWork }
    var results : Seq[Result] = Nil
    resultStream.foreach {
      result => {
        results = results :+ result
      }
    }
    results
  }
4

1 回答 1

1

就像@MattFowler 指出的那样-您可以强制流并等到它完成使用:

Await.result(resultStream.toSeq, 1.second)

toSeq将开始实现流并返回一个Future[Seq[A]]在所有元素都解决后完成的流,如此所述。

然后,您可以使用Await.result.

确保您的流是有限的!通话Await.result(resultStream.toSeq, 1.second)将永远挂起。

于 2017-05-15T18:03:23.157 回答