12

我试图了解combineAll操作员是如何工作的。我正在使用官方文档中的以下示例:

import { take, map, combineAll } from 'rxjs/operators';
import { interval } from 'rxjs';

const source$ = interval(1000).pipe(take(2));

const example$ = source$.pipe(
  map(val =>
    interval(1000).pipe(
      map(i => `Result (${val}): ${i}`),
      take(5)
    )
  )
);

example$
  .pipe(combineAll())
  .subscribe(console.log);

输出是:

["Result (0): 0", "Result (1): 0"]
["Result (0): 1", "Result (1): 0"]
["Result (0): 1", "Result (1): 1"]
["Result (0): 2", "Result (1): 1"]
["Result (0): 2", "Result (1): 2"]
["Result (0): 3", "Result (1): 2"]
["Result (0): 3", "Result (1): 3"]
["Result (0): 4", "Result (1): 3"]
["Result (0): 4", "Result (1): 4"]

试图弄清楚为什么,我做了这个简单的方案:

combineAll 方案

从文档中,我读到每次任何内部 Observable 发出一个值时,这个发出的值都会与所有其他内部 observable 的最后一个值组合。

在上面的方案中,我们可以看到内部 Observables 在一段时间内发出了 10 个值,所以我希望随着时间的推移得到 10 个值的输出,但它是 9。

此外,在输出的第一行:

["Result (0): 0", "Result (1): 0"]) 

'Result (1): 0' 的 0 是否对应空值?因为 Observable 'inner 2' 还没有发出任何东西?

在这里完成是我期望的输出:

["Result (0): 0", "Result (1): 0"]
["Result (0): 1", "Result (1): 0"]
["Result (0): 1", "Result (1): 0"]
["Result (0): 2", "Result (1): 0"]
["Result (0): 2", "Result (1): 1"]
["Result (0): 3", "Result (1): 1"]
["Result (0): 3", "Result (1): 2"]
["Result (0): 4", "Result (1): 2"]
["Result (0): 4", "Result (1): 3"]
["Result (0): 4", "Result (1): 4"]

这显然是错误的,但我没有发现我的错误,有人可以解释吗?

4

1 回答 1

16

考虑一下combineAll

通过在 Observable-of-Observables 完成时应用combineLatest来展平 Observable-of-Observables。

还有那个combineLatest

实际上会等待所有输入的 Observables 至少发射一次。

因此,combineAll在“Inner 2”可观察对象发出其第一个值之前,不会发生包含“Inner 1”可观察对象的第一个值的可观察对象的第一个发射。所以只会有九个排放——不是十个。

于 2017-07-19T06:36:38.330 回答