我对 Rx 和 RxPy 比较陌生 - 我正在尝试做一件基本的事情,那就是访问我在 Observer 末尾存储的值on_completed
。我有一种感觉,要么我遗漏了一些非常明显的东西,要么我可能将 Observable 概念弯曲成它不应该成为的东西。无论哪种方式,希望我能得到一些指导。
我已经查看了文档,materialize
但它们似乎并不完全匹配。也研究过Do
,Disposable
但找不到很多接近我需要的例子。
import rx
from rx import operators as ops
from rx.core import Observer
if __name__ == "__main__":
class PipelineObserver(Observer):
def __init__(self):
self.status = None
def on_next(self, payload):
print(payload)
def on_error(self, err):
print(err)
def on_completed(self):
self.status = "Done"
return self.status
## This returns a disposable, not the actual value I want. Which in this case is self.status
output = rx.from_([1, 2]).subscribe(
PipelineObserver()
)
print(output) ## Hoping for "Done" which is stored in self.status, not disposable class
无论如何可以从 on_completed 方法中访问一个值吗?没有将某些东西保存为全局变量(这对我来说似乎是个坏主意)我不确定这是否可能?基本上无论它输出什么on_completed
或类似的东西。也许Do
或Finally
?