我有一个网络应用程序,它会执行大量缓慢的并发工作来计算其结果。我不想让最终用户挂起,我想通过 websocket 流回进度更新。
我的代码库是由 Scalaz 任意一个 (/) 组成的,例如:
type ProcessResult = Error \/ Int
def downloadFile(url: String): Future[Error \/ String] = ???
def doSlowProcessing(data1: String, data2: String): Future[ProcessResult] = ???
/* Very simple however doesn't give any progress update */
def execute(): Future[ProcessResult] = {
val download1 = downloadFile(...)
val download2 = downloadFile(...)
val et = for {
d1 <- download1
d2 <- download2
processed <- doSlowProcessing(d1, d2)
} yield processed
et.run
}
这很好用,但当然,在我从 Future 中得到任何东西之前,需要完成整个计算。即使我堆叠在一个 Writer monad 上进行日志记录,我也只会在完成后得到日志,不会让我的最终用户更快乐。
我玩弄了使用 scalaz-stream Queue 在代码运行时将日志作为副作用发送,但最终结果非常难看:
def execute(): Process[Task, String \/ ProcessResult] = {
val (q, src) = async.queue[String \/ ProcessResult]
val download1 = downloadFile(...)
val download2 = downloadFile(...)
val et = for {
d1 <- q.enqueue("Downloading 1".left); download1
d2 <- q.enqueue("Downloading 2".left); download2
processed <- q.enqueue("Doing processing".left); doSlowProcessing(d1, d2)
} yield processed
et.run.onSuccess {
x =>
q.enqueue(x.right)
q.close
}
src
}
感觉应该有一种惯用的方法来实现这一目标?如有必要,可以将我的 SIP-14 Scala 期货转换为任务。