3

你好!我正在尝试完成我的第一个 RxPY 项目,但是我在
理解 Python 中 flat_map 的行为时遇到了一些问题。

在这个项目中,有一个从生成器(Kafka 消费者)创建的 Observable。它在收到消息时发出值,然后根据消息执行查询,并为每个结果发出一个值。

我对代码进行了一些更改,以使其更易于重现。Kafka 消费者被一个生成器取代,该生成器在两次发射之间需要很长时间,查询结果被一个发出 3 个值的 Observable 取代。行为仍然相同。

from rx import Observable

generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) \
    .flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) \
    .subscribe(on_next=lambda i: print(i))

输出:

a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c

我期待这样的事情:

a
b
c
(...waits a long time...)
a
b
c

这种行为的原因是什么?我应该怎么做才能得到预期的结果?

谢谢!:)

4

1 回答 1

0

最近遇到了 flat_map 运算符和 ImmediateScheduler 在这里帮助的相同问题。

为 RxPy 3 更新了一些初始代码:

import rx
from rx.operators import flat_map


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'])
    )
).subscribe(on_next=lambda i: print(i))

输出略有不同,但问题是相同的:

(... waits a long time ...)
a
b
c
a
b
c

对 flat_map 内部的 observable 应用 ImmediateScheduler:

import rx
from rx.operators import flat_map
from rx.scheduler import ImmediateScheduler


generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'], scheduler=ImmediateScheduler())
    )
).subscribe(on_next=lambda i: print(i))

并得到了预期的结果:

a
b
c
(...waits a long time...)
a
b
c
于 2021-01-12T11:22:20.033 回答