0

我目前正在尝试将一个可调用对象包装到一个 rxpy observable 中,以通知其观察者必须处理一个新值,但我看不出这是如何轻松完成的,从观点来看,这对我来说似乎很奇怪相对经验丰富的 C++/Qt 开发人员。

我设法让像下面这样的样本工作,使用yield关键字,这在大多数情况下并不方便:

import subprocess
import rx

def run_ping():
    p = subprocess.Popen(["ping", "8.8.8.8"], stdout=subprocess.PIPE, universal_newlines=True)
    for line in iter(p.stdout.readline, ""):
        yield line

source = rx.from_iterable(run_ping())
source.subscribe(lambda s : print(f"!! {s}"))

但是,如果您采用以下示例:

import subprocess
import rx

class MyObservable:
    def __init__(self, my_list: list):
        self.i = 0
        self.l = my_list
    def __call__(self, **kwargs):
        obs = kwargs.get('obs', None)
        scheduler = kwargs.get('scheduler', None)
        i = self.i
        self.i = self.i + 1
        if obs is None:
            return self.l[i]
        if i == len(self.l):
            obs.on_next(-1)
        else:
            obs.on_next(self.l[i])
        obs.on_completed()

o = MyObservable([1, 2, 3, 4])
source = rx.from_callable(o)
source.subscribe(lambda s : print(f"!! {s}"))

我只得到!! 1输出,2并且 3从未4被处理过,我并没有真正看到如何让我的观察者处理它们,但此外,我看不到如何在MyObservable中实现一个将值附加到我的列表的方法,并且触发观察者的处理。

我查看了 的 API Observable,认为也许可以从 启动某些东西source,但似乎并非如此。

通过尝试应用与 Qt 中相同的逻辑,我是否在rx/的哲学中遗漏了一些明显的东西,或者是否有一些简单的方法可以做到这一点,或者我的问题的解决方案是否需要实现适合我需要的专业化?rxpyObservable

无论如何,提前感谢您的任何回答!

编辑:我找到了一种不能完全满足我的方法,使用Subject类,它既是一个Observable又是一个Observer,所以我们可以使用它的on_next方法来通知外部订阅者一个新数据在这里:

import subprocess
import rx
from rx.subject import Subject

class MyBetterObservable:
    def __init__(self):
        self.i = 0
        self.l = []
        self.subject = Subject()
    def subscribe(self, f):
        self.subject.subscribe(f)
    def append(self, i):
        self.l.append(i)
        self.subject.on_next(i)

o = MyBetterObservable()
o.subscribe(lambda s : print(f"!! {s}"))
o.append(1)
o.append(17)

此示例显示了我的期望:

!! 1
!! 17

但是,这绝对是rxpy我们期望我们做这种事情的方式,还是有比包装Subject对象更优雅的方式来做到这一点?

4

0 回答 0