我有一个两部分的问题,所以让我先介绍一些背景。我知道可以做类似于我想要的事情:
import scalaz.concurrent._
import scalaz.stream._
val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue
q.enqueueAll(1 to 2).run
val p1: Process1[Int, Int] = process1.take(1)
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input
还有其他一些p1
我可以使用的东西可以给我下面的输出而不挂起(就像process1.awaitOption
)?
Answer: Some(1)
Answer: Some(2)
Answer: None
如果是,我认为回答下一个问题会很容易。还有其他一些p1
我可以使用的东西可以给我下面的输出而不挂起(就像process1.chunkAll
)?
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
编辑:
补充问题以使其更易于理解。如果我有这样的循环:
for (i <- 1 to 4) {
p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}
结果可能是:
Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()
我希望现在很清楚我要做什么。问题是我无法控制循环,如果队列中没有值,我不能阻止它。