4

我正在尝试通过将一个数据源与来自同一数据源的过滤版本合并来使用 scalaz-stream 创建一个循环过程。这是我到目前为止的一个简单示例:

val s1 = Process.emitAll(1 to 10).toSource

val w = wye.merge[Int]

val s2 = w.filter(_ < 5)

val w2 = s1.wye(s2)(w)

但它不会按原样编译,而是必须是s2a 。Process[Process.Env[Int,Int]#Y,Int]Process[Task,Int]

如何指定 s2 既是输入(带有s1)又是输出w

4

2 回答 2

3

我认为问题出在 st2 中,它被定义为用 t2 压缩 wye (w)。这没有任何意义,因此 wye 只是描述如何合并流程。

我认为 t2 是 Process[Task,Duration] 所以你需要在左侧另一个 Process[Task,Duration] 然后你可以使用 wye.merge[Duration] 将它们合并在一起,如:

val t1: Process[Task,Duration] = ???
val t2: Process[Task,Duration] = Process.awakeEvery(3 second)

val st2: Process[Task.Duration] = t1.filter(_ < 5 seconds).zip(t2).map(_._1)
val w2: Process[Task.Duration] = t1.wye(st2)(wye.merge) //or simply t1.merge(t2)

也许在每一行上键入注释将指导您的路径。

于 2014-09-08T05:11:55.410 回答
2

在 scalaz-mailing list 上查看这个答案简短的回答是不可能完全按照您的要求做,但通常有其他方式来表达您的问题。

于 2014-09-17T13:36:20.197 回答