3

简介: 你好。我正在为我的用例探索 python rxpy库 - 我正在使用反应式编程概念构建执行管道。这样我希望我不必操纵太多的状态。虽然我的解决方案似乎是有效的,但我在尝试从其他 Observable 组成一个新的 Observable 时遇到了麻烦。

问题是我编写可观察数据的方式导致一些昂贵的计算被重复两次。为了性能,我真的想防止触发昂贵的计算。

我对反应式编程很陌生。试图挠头并查看互联网资源和参考文档 - 对我来说似乎有点太简洁了。请指教。

以下是一个玩具示例,说明了我在做什么:

import rx
from rx import operators as op
from rx.subject import Subject

root = Subject()

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r)))
    )

bar_foo = foo.pipe(
        op.map( lambda x : x * 2 ),
        op.do_action(lambda r: print("bar(foo(x)) = %s" % str(r)))
    )

bar_foo.pipe(
        op.zip(foo),
        op.map(lambda i: i[0]+i[1]),
        op.do_action(lambda r: print("foo(x) + bar(foo(x)) = %s" % str(r)))
    ).subscribe()


print("-------------")
root.on_next(10)
print("-------------")

输出:

-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) = 11 (expensive)
foo(x) + bar(foo(x)) = 33
-------------

你可以想到foo()bar()是昂贵而复杂的操作。我首先构建一个 observable foo。然后组成一个新的 observablebar_foo包含foo. 稍后将两者压缩在一起以计算最终结果foo(x)+bar(foo(x))

问题:

  1. 我能做些什么来防止foo()单个输入被多次触发?我有非常充分的理由保持foo()bar()分开。另外我也不想明确地 memoize foo()

  2. 任何有在生产中使用 rxpy 经验的人都可以分享他们的经验。与等效的手工制作(但不可维护)代码相比,使用 rxpy 会导致更好的性能或速度下降吗?

4

1 回答 1

2

op.share()在管道中进行昂贵的计算之后立即添加在foo这里可能很有用。因此将foo管道更改为:

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r))),
        op.share() # added to pipeline
    )

将导致:

-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) + bar(foo(x)) = 33
-------------

我相信这.share()使得昂贵操作的发出事件在下游订阅者之间共享,因此单个昂贵计算的结果可以多次使用。

关于你的第二个问题;我也是 RxPy 的新手,所以对更有经验的用户的回答很感兴趣。到目前为止,我注意到作为初学者,您可以轻松地创建(坏)管道,其中消息和计算在后台重复。.share()似乎在一定程度上减少了这种情况,但不确定后台发生了什么。

于 2021-07-22T09:07:38.953 回答