我有一个Process[Task, A]
,我需要A => B
在每个流上运行一个函数,其运行时间范围从瞬时到非常长,A
以产生一个Process[Task, B]
.
问题是,我想尽快处理每个A
结果,并在获得结果后立即传递结果,而不管收到 sExecutionContext
的顺序如何。A
一个具体的例子是下面的代码,我希望立即打印所有奇数,大约 500 毫秒后打印偶数。相反,会打印(奇数,偶数)对,并以 500 毫秒的暂停交错:
import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext
import scalaz.stream._
import scalaz.concurrent.Task
object Test extends App {
val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
Process.range(0, 100).flatMap { i =>
Process.eval(Task.apply {
if(i % 2 == 0) Thread.sleep(500)
i
}(executor))
}.to(io.printStreamSink(System.out)(_ println _))
.run.run
executor.shutdown()
executor.awaitTermination(10, TimeUnit.MINUTES)
}