1

我有一个Process[Task, A],我需要A => B在每个流上运行一个函数,其运行时间范围从瞬时到非常长,A以产生一个Process[Task, B].

问题是,我想尽快处理每个A结果,并在获得结果后立即传递结果,而不管收到 sExecutionContext的顺序如何。A

一个具体的例子是下面的代码,我希望立即打印所有奇数,大约 500 毫秒后打印偶数。相反,会打印(奇数,偶数)对,并以 500 毫秒的暂停交错:

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))

  Process.range(0, 100).flatMap { i =>
    Process.eval(Task.apply {
      if(i % 2 == 0) Thread.sleep(500)
      i
    }(executor))
  }.to(io.printStreamSink(System.out)(_ println _))
  .run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
4

1 回答 1

1

原来答案是使用渠道。这是似乎完全符合我要求的更新代码:

import java.util.concurrent.{TimeUnit, Executors}
import scala.concurrent.ExecutionContext

import scalaz.stream._
import scalaz.concurrent.Task

object Test extends App {
  val executor = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(8))
  val chan = channel.lift[Task, Int, Int] { i => Task {
    if(i % 2 == 0) Thread.sleep(500)
    i
  }}

  merge.mergeN(8)(Process.range(0, 100).zipWith(chan)((i, f) => Process.eval(f(i))))
    .to(io.printStreamSink(System.out)(_ println _)).run.run

  executor.shutdown()
  executor.awaitTermination(10, TimeUnit.MINUTES)
}
于 2015-08-11T10:22:25.530 回答