1

我有一个网络应用程序,它会执行大量缓慢的并发工作来计算其结果。我不想让最终用户挂起,我想通过 websocket 流回进度更新。

我的代码库是由 Scalaz 任意一个 (/) 组成的,例如:

type ProcessResult = Error \/ Int

def downloadFile(url: String): Future[Error \/ String] = ???
def doSlowProcessing(data1: String, data2: String): Future[ProcessResult] = ???

/* Very simple however doesn't give any progress update */
def execute(): Future[ProcessResult] = {
 val download1 = downloadFile(...)
 val download2 = downloadFile(...)

 val et = for {
   d1 <- download1
   d2 <- download2
   processed <- doSlowProcessing(d1, d2)
 } yield processed   

 et.run 
}

这很好用,但当然,在我从 Future 中得到任何东西之前,需要完成整个计算。即使我堆叠在一个 Writer monad 上进行日志记录,我也只会在完成后得到日志,不会让我的最终用户更快乐。

我玩弄了使用 scalaz-stream Queue 在代码运行时将日志作为副作用发送,但最终结果非常难看:

def execute(): Process[Task, String \/ ProcessResult] = {
 val (q, src) = async.queue[String \/ ProcessResult]

 val download1 = downloadFile(...)
 val download2 = downloadFile(...)

 val et = for {
   d1 <- q.enqueue("Downloading 1".left); download1
   d2 <- q.enqueue("Downloading 2".left); download2
   processed <- q.enqueue("Doing processing".left); doSlowProcessing(d1, d2)
 } yield processed    

 et.run.onSuccess {
  x =>
   q.enqueue(x.right)
   q.close
 }

 src
}

感觉应该有一种惯用的方法来实现这一目标?如有必要,可以将我的 SIP-14 Scala 期货转换为任务。

4

1 回答 1

1

我认为您不需要使用队列,其中一种方法可以是使用 wye 使用非确定性合并,即

type Result = ???
val download1: Process[Task,File] = ???
val download2: Process[Task,File] = ???


val result: Process[Task,(File,File)] = (download1 yip download2).once 

val processed: Process[Task, Result] = result.flatMap(doSlowProcessing)

// Run asynchronously, 
processed.runLast.runAsync {
  case Some(r) => .... // result computed
  case None => .... //no result, hence download1,2 were empty.
}

//or run synchronously awaiting the result
processed.runLast.run match {
  case Some(r) => .... // result computed
  case None => .... //no result 
}

//to capture the error information while download use 
val withError: Process[Task,Throwable\/File] = download1.attempt

//or to log and recover to other file download
val withError: Process[Task,File] download1 onFailure { err => Log(err); download3 }

这有意义吗?

另请注意,从 0.5.0 开始不推荐使用 async.queue 以支持 async.unboundedQueue

于 2014-08-27T01:30:35.773 回答