2

我希望能够在已经恰好是 BehaviorSubject 的流上使用重播运算符。本质上,我希望打印订阅接收预先发送的排放量:0、1、2、3、4、5。

我试图使 observable 变热.subscribe()。另外,我确实理解将源设为 aReplaySubject可以解决问题,但这不是我的选择。

与大多数语言不同,Pythonrx v3.0使用管道命令来链接操作 - 例如replay()ref_count()publish()- 而不是通常的 '.' 链。这是管道命令的链接:https ://rxpy.readthedocs.io/en/latest/migration.html#pipe-based-operator-chaining

我很确定这个问题与我给 replay(): 的第一个论点有关lambda x: x

import rx.subject
from rx import operators as op

stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(lambda x: x, buffer_size=100))
replayable_observable.subscribe()
stream.subscribe()
for x in [1, 2, 3, 4, 5]:
    stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
    stream.on_next(x)

我希望收到0-10;或者也许1-10。但相反,我收到了5-10.

Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
4

1 回答 1

0

看起来replaymapper函数)的第一个参数旨在允许您在多播后将更多运算符链接到源可观察对象。例如,如果您将其从 更改为lambda x: xlambda x: x.pipe(op.map(lambda y: y * 2))您将获得翻倍的值。

文档replay似乎已经过时,因为他们提供的示例函数mapper仍然使用旧式的方法链接而不是管道方法。此外,所有实际使用参数的非默认值的测试似乎都已在该项目的 GitHub 存储库中注释掉,因此没有明确的示例说明如何正确使用该参数。replaymapper

通过查看源代码,我可以说的是,当您使用replay而不指定 a时mapper,您会得到 aConnectableObservable而不是Observable. ConnectableObservable在值通过主题正确推送之前连接它可以缓冲结果。

import rx.subject
from rx import operators as op

stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(buffer_size=100))
replayable_observable.connect()
for x in [1, 2, 3, 4, 5]:
    stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
    stream.on_next(x)

# Received 0
# Received 1
# Received 2
# Received 3
# Received 4
# Received 5
# Received 6
# Received 7
# Received 8
# Received 9
# Received 10
于 2019-08-26T19:06:17.503 回答