0

filter (which uses halt inside) terminates other branch even if it has some side-effects:

scala> val p = Process("1","2", "3")
scala> val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> val p2 = p.filter(_ => false).map(_ + "p2").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1

scala> val p2 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
scala> (p1 yip p2).run.run
1p1
1p2
2p1
2p2
3p1
3p2

Seems logical as there is no value to be returned to yip after that filter. But what about side-effects, specified with observe?

My current solution is to use flatMap to specify default value:

scala> val p1 = p.map(_ + "p1").flatMap(x => Process.emit(x).observe(io.stdOutLines))

scala> val p2 = p.map(_ + "p2").flatMap(x => Process.emit(""))

scala> (p1 yip p2).run.run
1p1
2p1
3p1

But maybe there is a way to use filter?

P.S. merge combinator executes side-effects for other branch (as it doesn't require value to be returned), but it doesn't wait for other branch if one halts (even if it has side-effects).

4

2 回答 2

0

实际上它应该是这样的:

in.map(emit).flatMap{ p =>
  val p1 = p.map(_ + "p1").filter(_ => true).observe(out)
  val p2 = p.map(_ + "p2").filter(_ => false).observe(out)
  p1 merge p2
}.run.run

它使所有副作用都井井有条,因为filter不能获得超过一个值(由发射产生)

于 2015-11-13T03:23:33.940 回答
0

要在 p2 终止后运行效果,需要有明确的default行为。所以大概有这些解决方案:

  1. 定义p2终止后提供默认值
  2. either如果我们真的不需要元组,请使用wye 来获得左右

也许(1)更接近问题,代码看起来像:

val p = Process("1","2", "3")
val p1 = p.filter(_ => true).map(_ + "p1").observe(io.stdOutLines)
val p2 = p.filter(_ => false).map(_ + "p2")
         .observe(io.stdOutLines).map(Some(_)) ++ emit(None).repeat
// alternativelly
// val p2 = p.map { v =>  if (pred(v)) right(v) else left(v) }
//          .observeO(o.stdOutLines).flatMap { _.toOption }
//          ++ emit(None).repeat             

(p1 yip p2).run.run
于 2015-11-11T11:17:33.203 回答