2

我有以下代码:

    final Observable<String> a = Observable.just("a1", "a2");   
    final Observable<String> b = Observable.just("b1");

    final Observable<String> c = Observable.combineLatest(a, b, (first, second) -> first + second);

    c.subscribe(res -> System.out.println(res));

什么是预期输出?我本来期望的

a1b1
a2b1

但实际输出是

a2b1

那有意义吗?生成预期序列的正确运算符是什么?

4

2 回答 2

1

好问题!似乎可能是一种竞争条件。combineLatest在两个源都发出之前不会输出任何东西,并且似乎在b生成它的输出时,a已经移动到它的第二个项目。在具有时间间隔的异步事件的“真实世界”应用程序中,您可能会获得所需的行为。

如果你能忍受等待,一个解决方案是a稍微延迟输出。多做一些工作,您可以仅延迟第一个输出(请参阅运算符的各种重载delay)。另外我刚刚注意到有一个运营商delaySubscription可能会做到这一点(延迟你的订阅a直到b发出一些东西)。我敢肯定还有其他的,也许是更好的解决方案(我还在学习自己)。

于 2018-02-20T16:07:38.627 回答
1

正如运营商的名字所暗示的那样,它结合了每个来源的最新值。如果源是同步的或非常快,这可能意味着一个或多个源将运行到它们的完成,并且操作员将只记住每个源的最后一个值。您必须通过某种方式交错源值,例如在项目之间使用具有充足时间的异步源,并避免多个源的项目紧密重叠。

可以通过多种方式生成预期的序列,具体取决于您的初衷。例如,如果您想要所有交叉组合,请使用flatMap

a.flatMap(aValue -> b, (aValue, bValue) -> first + second)
.subscribe(System.out::println);

如果b重新创建的成本很高,请将其缓存:

Observable<String> cachedB = b.cache();
a.flatMap(aValue -> cachedB, (aValue, bValue) -> first + second)
.subscribe(System.out::println);
于 2018-02-20T18:22:50.333 回答