我专门使用 twitter 的 AsyncStream,我需要获取并发处理的结果,并将其转换为 Seq,但我设法开始工作的唯一方法是可怕的。这感觉就像它应该是一个班轮,但我对 Await 和 force 的所有尝试要么挂起要么没有处理工作。
这就是我的工作 - 这样做更惯用的方式是什么?
def processWork(work: AsyncStream[Work]): Seq[Result] = {
// TODO: make less stupid
val resultStream = work.flatMap { processWork }
var results : Seq[Result] = Nil
resultStream.foreach {
result => {
results = results :+ result
}
}
results
}