1

所以我在做 RxJava 和 RxKotlin 两年后开始学习 RxPy。我注意到的一件事是某些运算符会导致在 RxJava 中不会发生的疯狂交错。

例如,对于一个简单的源,flat_map()将导致排放无序交错。Observable

items = Observable.from_( ("Alpha","Beta","Gamma","Delta","Epsilon"))

items.flat_map(lambda s: Observable.from_(list(s))).subscribe(print)

输出:

A
l
B
p
e
G
h
t
a
D
a
a
m
e
E
m
l
p
a
t
s
a
i
l
o
n

然而,使用 RxJava 或 RxKotlin,一切都保持有序。

fun main(args: Array<String>) {
    Observable.just("Alpha","Beta","Gamma","Delta","Epsilon")
        .flatMap {
            Observable.from(it.toCharArray().asIterable())
        }.subscribe(::println)
}

输出:

A
l
p
h
a
B
e
t
a
G
a
m
m
a
D
e
l
t
a
E
p
s
i
l
o
n

我确认一切都在运行MainThread并且没有奇怪的异步调度正在进行(我认为)。

为什么 RxPy 会这样?我注意到这几乎发生在任何处理多个Observable源合并在一起的操作员身上。默认调度程序到底在做什么?

另外,为什么concat_map()RxPy 中没有?我的印象是,调度的工作方式在某种程度上是不可能的......

4

1 回答 1

3

如前所述, flatMap不保证顺序。RxPy 没有实现concat_map为不同的运算符,但您可以使用mapandconcat_all运算符获得相同的效果

Observable.from_( ("Alpha","Beta","Gamma","Delta","Epsilon"))\
          .map(lambda s: Observable.from_(list(s)))\
          .concat_all()\
          .subscribe(print)
于 2016-10-28T09:56:10.450 回答