你好!我正在尝试完成我的第一个 RxPY 项目,但是我在
理解 Python 中 flat_map 的行为时遇到了一些问题。
在这个项目中,有一个从生成器(Kafka 消费者)创建的 Observable。它在收到消息时发出值,然后根据消息执行查询,并为每个结果发出一个值。
我对代码进行了一些更改,以使其更易于重现。Kafka 消费者被一个生成器取代,该生成器在两次发射之间需要很长时间,查询结果被一个发出 3 个值的 Observable 取代。行为仍然相同。
from rx import Observable
generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) \
.flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) \
.subscribe(on_next=lambda i: print(i))
输出:
a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c
我期待这样的事情:
a
b
c
(...waits a long time...)
a
b
c
这种行为的原因是什么?我应该怎么做才能得到预期的结果?
谢谢!:)