我目前正在尝试将一个可调用对象包装到一个 rxpy observable 中,以通知其观察者必须处理一个新值,但我看不出这是如何轻松完成的,从观点来看,这对我来说似乎很奇怪相对经验丰富的 C++/Qt 开发人员。
我设法让像下面这样的样本工作,使用yield
关键字,这在大多数情况下并不方便:
import subprocess
import rx
def run_ping():
p = subprocess.Popen(["ping", "8.8.8.8"], stdout=subprocess.PIPE, universal_newlines=True)
for line in iter(p.stdout.readline, ""):
yield line
source = rx.from_iterable(run_ping())
source.subscribe(lambda s : print(f"!! {s}"))
但是,如果您采用以下示例:
import subprocess
import rx
class MyObservable:
def __init__(self, my_list: list):
self.i = 0
self.l = my_list
def __call__(self, **kwargs):
obs = kwargs.get('obs', None)
scheduler = kwargs.get('scheduler', None)
i = self.i
self.i = self.i + 1
if obs is None:
return self.l[i]
if i == len(self.l):
obs.on_next(-1)
else:
obs.on_next(self.l[i])
obs.on_completed()
o = MyObservable([1, 2, 3, 4])
source = rx.from_callable(o)
source.subscribe(lambda s : print(f"!! {s}"))
我只得到!! 1
输出,2
并且 3
从未4
被处理过,我并没有真正看到如何让我的观察者处理它们,但此外,我看不到如何在MyObservable
中实现一个将值附加到我的列表的方法,并且触发观察者的处理。
我查看了 的 API Observable
,认为也许可以从 启动某些东西source
,但似乎并非如此。
通过尝试应用与 Qt 中相同的逻辑,我是否在rx
/的哲学中遗漏了一些明显的东西,或者是否有一些简单的方法可以做到这一点,或者我的问题的解决方案是否需要实现适合我需要的专业化?rxpy
Observable
无论如何,提前感谢您的任何回答!
编辑:我找到了一种不能完全满足我的方法,使用Subject
类,它既是一个Observable
又是一个Observer
,所以我们可以使用它的on_next
方法来通知外部订阅者一个新数据在这里:
import subprocess
import rx
from rx.subject import Subject
class MyBetterObservable:
def __init__(self):
self.i = 0
self.l = []
self.subject = Subject()
def subscribe(self, f):
self.subject.subscribe(f)
def append(self, i):
self.l.append(i)
self.subject.on_next(i)
o = MyBetterObservable()
o.subscribe(lambda s : print(f"!! {s}"))
o.append(1)
o.append(17)
此示例显示了我的期望:
!! 1
!! 17
但是,这绝对是rxpy
我们期望我们做这种事情的方式,还是有比包装Subject
对象更优雅的方式来做到这一点?