我将在 android 应用程序中使用 rxandroid。我现在正在尝试在 rxpy 中对行为进行建模,因为它对我来说是最容易设置和使用的。在下面的示例中,source3 发出了正确的数据;这是需要一些时间的初始化和我刚刚伪造的永久订阅的串联。我想要 BehaviorSubject 因为我需要立即使用最后一个值进行字段初始化。
我无法弄清楚如何将 BehaviorSubject 链接到 source3 之上,以便它在记住最后一个值的同时发出源 3。我在互联网上搜索了两天,没有找到关于这个用例的明确方向。这是我的代码,问题是为什么我没有从观察者那里得到任何排放。
from rx import Observable, Observer
from rx.subjects import BehaviorSubject
import time, random
def fake_initialization(observer):
time.sleep(5) # It takes some time
observer.on_next("Alpha")
observer.on_completed()
def fake_subscription(observer):
iter = 0 # Subscription emits forever
while True:
observer.on_next("message %02d"%(iter))
time.sleep(random.randrange(2,5))
iter += 1
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
#bsubject.on_next(value)
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source1 = Observable.create(fake_initialization)
source2 = Observable.create(fake_subscription)
source3 = source1 + source2
bsubject = BehaviorSubject(False)
source4 = source3.multicast(bsubject)
source4.connect()
source4.subscribe(PrintObserver())