2

我有一个两部分的问题,所以让我先介绍一些背景。我知道可以做类似于我想要的事情:

import scalaz.concurrent._
import scalaz.stream._

val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.dequeue

q.enqueueAll(1 to 2).run

val p1: Process1[Int, Int] = process1.take(1)

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 1

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// Answer: 2

p.pipe(p1).map(x => println(s"Answer: $x")).run.run
// hangs awaiting next input

还有其他一些p1我可以使用的东西可以给我下面的输出而不挂起(就像process1.awaitOption)?

Answer: Some(1)
Answer: Some(2)
Answer: None

如果是,我认为回答下一个问题会很容易。还有其他一些p1我可以使用的东西可以给我下面的输出而不挂起(就像process1.chunkAll)?

Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()

编辑:

补充问题以使其更易于理解。如果我有这样的循环:

for (i <- 1 to 4) {
  p.pipe(p1).map(x => println(s"Answer: $x")).run.run
}

结果可能是:

Answer: Seq()
// if someone pushes some values into the queue, like: q.enqueueAll(1 to 2).run
Answer: Seq(1, 2)
Answer: Seq()
Answer: Seq()

我希望现在很清楚我要做什么。问题是我无法控制循环,如果队列中没有值,我不能阻止它。

4

2 回答 2

1

我不确定我是否理解您尝试拥有的语义,但通常可以通过从外部关闭队列或使用 wye 来中断该过程(这意味着取消以等待某个值)。打断。

当您想让进程终止而不是等待下一个入队值时?如果假设您希望将其放在空队列中,则存在“大小”过程,如果大小为空,您可以使用它来中断等待队列,例如:

val empty : Process[Task,Boolean] = q.size.continuous.map(_ <= 0)

val deq : Process[Task,Int] = empty.wye(q.enqueue)(wye.interrupt)
于 2015-04-20T04:19:43.343 回答
0

虽然我无法让Pavel 的答案按我想要的方式工作,但这是转折点,我可以使用他的建议来使用 size 信号。

如果有人需要,我会在这里发布我的答案:

import scalaz.concurrent._
import scalaz.stream._

val q = async.unboundedQueue[Int]
val p: Process[Task, Int] = q.size.continuous.take(1).flatMap { n => q.dequeue |> process1.take(n) }

q.enqueueAll(1 to 2).run

p.map(x => println(s"Answer: $x")).run.run
// Answer: 1
// Answer: 2

p.map(x => println(s"Answer: $x")).run.run
// not hanging awaiting next input

p.map(x => println(s"Answer: $x")).run.run
// not hanging awaiting next input

q.enqueueAll(1 to 2).run

p.map(x => println(s"Answer: $x")).run.run
// Answer: 1
// Answer: 2

我意识到它并没有完全回答这个问题,因为我没有明确的p1,但这对我的目的来说很好。

于 2015-04-23T22:28:34.260 回答