我希望能够在已经恰好是 BehaviorSubject 的流上使用重播运算符。本质上,我希望打印订阅接收预先发送的排放量:0、1、2、3、4、5。
我试图使 observable 变热.subscribe()
。另外,我确实理解将源设为 aReplaySubject
可以解决问题,但这不是我的选择。
与大多数语言不同,Pythonrx v3.0
使用管道命令来链接操作 - 例如replay()
或ref_count()
或publish()
- 而不是通常的 '.' 链。这是管道命令的链接:https ://rxpy.readthedocs.io/en/latest/migration.html#pipe-based-operator-chaining
我很确定这个问题与我给 replay(): 的第一个论点有关lambda x: x
。
import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(lambda x: x, buffer_size=100))
replayable_observable.subscribe()
stream.subscribe()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)
我希望收到0-10
;或者也许1-10
。但相反,我收到了5-10
.
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10