59

看起来这两个功能非常相似。它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func),并且它们的大理石图看起来完全相同。无法在此处粘贴图片,但这是concatMap的一张,这是flatMap的一张。对 results的描述似乎存在一些细微的差别Observable,其中一个由concatMapcontains 产生的项是由连接结果 Observables 产生的,而一个由flatMapcontains 产生的项是由首先合并产生的 Observables 并发出该合并的结果而产生的。

但是,我完全不清楚这种微妙之处。谁能更好地解释这种差异,最好举一些例子来说明这种差异。

4

7 回答 7

84


正如您所写,这两个函数非常相似,细微的区别在于如何创建输出(在应用映射函数之后)。

平面地图使用合并运算符,而 concatMap 使用concat 运算符

如您所见,concatMap 输出序列是有序的——第一个 Observable 发出的所有项目都在第二个 Observable 发出的任何项目之前发出,
而 flatMap 输出序列是合并的——合并后的 Observable 发出的项目可能出现在任何顺序,无论它们来自哪个源 Observable。

于 2014-07-06T12:07:22.393 回答
35

尽管这里的答案很好,但如果没有示例,也不容易发现差异。因此,我为此创建了一个简单的示例:

@Test
public void flatMapVsConcatMap() throws Exception {
    System.out.println("******** Using flatMap() *********");
    Observable.range(1, 15)
            .flatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
            .subscribe(x -> System.out.print(x + " "));

    Thread.sleep(100);

    System.out.println("\n******** Using concatMap() *********");
    Observable.range(1, 15)
            .concatMap(item -> Observable.just(item).delay(1, TimeUnit.MILLISECONDS))
            .subscribe(x -> System.out.print(x + " "));

    Thread.sleep(100);
}

********* 使用 flatMap() *********

1 2 3 4 5 6 7 9 8 11 13 15 10 12 14

********* 使用 concatMap() *********

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

从输出中可以看出, for 的结果flatMap是无序的,而 forconcatMap他们是。

于 2018-07-13T10:13:59.473 回答
24

一个非常重要的区别:concatMap等待当前发出的 observable 完成而flatMap不是。flatMap尝试尽可能多地开始。简单地说 - 你不能连接无限的东西。只需确保您发出的 observableconcatMap可以完成,否则整个流程将卡住,等待当前 observable 完成连接下一个。

于 2017-02-01T19:38:44.187 回答
13

我发现大多数赞成的答案中的例子不太清楚,所以我发布了一个帮助我理解 flatMap 和 concatMap 之间区别的例子。

FlatMap 从源 observable 获取排放,然后创建新的 observable 并将其合并到原始链,而 concatMap其连接到原始链。

主要区别在于 concatMap() 将按顺序合并每个映射的 Observable 并一次触发一个。它只会在当前调用 onComplete() 时移动到下一个 Observable。

这是flatMap示例:

private void flatMapVsConcatMap() throws InterruptedException {
    Observable.just(5, 2, 4, 1)
            .flatMap(
                    second ->
                            Observable.just("Emit delayed with " + second + " second")
                                    .delay(second, TimeUnit.SECONDS)
            )
            .subscribe(
                    System.out::println,
                    Throwable::printStackTrace
            );

    Thread.sleep(15_000);
}

将导致:

发射延迟 1 秒
发射延迟 2 秒
发射延迟 4 秒
发射延迟 5 秒

这是concatMap示例:

private void flatMapVsConcatMap() throws InterruptedException {
    Observable.just(5, 2, 4, 1)
            .concatMap(
                    second ->
                            Observable.just("Emit delayed with " + second + " second")
                                    .delay(second, TimeUnit.SECONDS)
            )
            .subscribe(
                    System.out::println,
                    Throwable::printStackTrace
            );

    Thread.sleep(15_000);
}

将导致:

发射延迟 5 秒
发射延迟 2 秒
发射延迟 4 秒
发射延迟 1 秒

注意使用 Thread.sleep()因为delay()默认在计算调度器上运行

于 2020-03-03T20:46:35.023 回答
0

首先,flatMap 和 Rxjs 中的 mergeMap 是一样的。这样就少了一种混乱。所以有两个可观察的..

1) o1:来自(['Kitty','Donald','Batman']) 的简单项目列表

2) process_o1():process_o1() 是一个函数,它接受一个参数“item”并对其进行处理并返回一个 Observable,该 Observable 在完成时发出“done with [item]”。

o1.pipe(mergeMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});

在这里,我们将看到:- 用 Kity 完成。

与唐纳德完成。

和蝙蝠侠一起完成。

没有任何保证凯蒂会先于唐纳德,而唐纳德会先于蝙蝠侠。这是因为,一旦外部 observable 发出一个项目,内部 observable 就会被订阅。

=== 但是在 concatMap 的情况下:-

o1.pipe(concatMap(item => process_o1(item))).subscribe(data => {
console.log(data);
});

我们有以下顺序的保证:-

和凯蒂一起完成。

与唐纳德完成。

和蝙蝠侠一起完成。

因为,使用 concatMap 运算符,在之前的内部 Observable 返回之前,内部的 Observable 不会被订阅。

外部 observable 可以自由地继续并发出它的所有值,但是 concatMap 将确保它一个一个地处理这些值中的每一个并保持顺序。因此名称为 concatMap。

关键是如果你热衷于维护做事的顺序,你应该使用concatMap。但是如果你不关心顺序,你可以继续使用 mergeMap,它会立即订阅所有内部的 Observable,并在它们返回时继续发出值。

于 2019-11-15T14:04:08.037 回答
0

flatMap 与 concatMap

flatMap- 合并 - 如果发出新项目,则它具有优先级

concatMap- 连接 - 添加到末尾 - 发出完整序列,只有在那之后(前一个完成)才能发出下一个序列

[地图与平面地图]

于 2021-02-06T16:02:47.370 回答
0

其他人已经指出了答案,但如果不是很明显,则存在使用 flatMap 创建不希望的并行性的风险,如果不希望出现这种情况,您可以使用 concatMap 或重载flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency)

于 2021-08-23T20:19:28.837 回答