1

我有两个从单个 Observable 创建的 Observable,如下所示

import monix.reactive.Observable

import scala.collection.immutable
val a: immutable.Seq[(String, String)] = (0 to 10).toList.map(x =>(s"left-$x", s"right-$x"))

val originalStream: Observable[(String, String)] = Observable.fromIterable(a)

val leftStream: Observable[String] = originalStream.map(_._1).map(println)

val rightStream: Observable[String] = originalStream.map(_._2).map(println)

现在如何并行运行 leftStream 和 rightStream 并将它们组合在一起以获得可以订阅的新 Observable?执行 Observable.merge 是按顺序执行它们。

4

1 回答 1

2

leftStream.zip(rightStream)并且应该为两个流提供背压。

但是使用 map 从 originalStream 创建 leftStream 和 rightStream 意味着 originalStream 中的元素将进入这些流之一,而不是两者。

于 2018-02-07T05:54:26.213 回答