问题标签 [rx-py]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - Python 在生产环境中从命令式转变为函数式
我被函数式编程和反应式方法感染了。对于灵感和想法,我使用Haskell和一篇很棒的 Rick Hickey 文章。在 python 世界中,我为自己找到了RxPy和funcy库。现在我有数千行命令式代码,我想让它们发挥作用。如果一切都被简化了,我就有了一个用于数据库的 getter 和 setter 接口,以及一个像状态机一样工作的内核。这是它在伪代码上的样子:
我有很多if
和for
我的状态处理程序中的命令式业务逻辑。我真的很尴尬如何从当前模型转向反应性和功能性。这里一切都非常简单,但是有过类似经历的人会理解我的。我需要关于下一步的建议,从想法到实践,不仅仅是简单的实用程序或功能风格的琐碎 REST api。此外,我将通过链接到真正大型项目的源代码获得很大帮助,只要我能得到想法。感谢所有回复的人,他们在将命令式代码移植到函数式代码中获得了真实的体验。是的,我知道它不会移植代码,而是从头开始重写。同样,我需要具有大量业务逻辑的项目示例,其中涉及数据和数据突变。无论如何,谢谢你。
python - VisualStudio 中是否有针对 Reactive Extension rx.py 的智能感知/自动完成功能?
我是 Visual Studio,我在 Python 3 环境中启动了一个新的 Python 项目。Rx 已安装 pip,我有以下代码
它运行良好,但是当我输入时,Observable.ran...
我希望看到 Intellisense 弹出 `range(...)" 作为选项,就像通常那样,但它从来没有出现过。
为什么会这样,我怎样才能让 Intellisense 为它工作?
operators - rxpy 将项目注入 observable
这个问题是关于rxpy的。
我正在尝试构建一个反应系统来处理来自可观察源的消息。除此之外,我正在尝试将它与基于 zookeeper 的领导选举系统集成。
这种组合将只允许进程群中的一个领导者处理消息流。以下是我正在尝试构建的代码的要点。
它工作正常,但我需要在两者之间注入skip_until
一块take_until
来处理回填。这旨在处理领导进程失败与另一个担任领导的进程之间的潜在差距。每条处理过的消息都会留下一条记录,以便新的领导者可以在继续处理流之前赶上丢失的消息(如果有的话)。
我尝试了start_with
运算符但没有成功。我不是以一种不适合使用的方式接近它吗?
最终,我正在寻找的解决方案是在由另一个流中的事件触发的流中注入特定数量的项目。
python-3.x - 如何正确重组分组的 observables?
我正在尝试创建一个分析股票价格的工具。
我有不同股票的价格数据流,并且我希望有一个可观察的,以便在它收到一组新的、不同的和完整的价格时发出事件。
我的计划:将流分组为不同股票的不同子流,并重新组合它们的最新值。
假设我有这样的事件流:
这是我的第一个(天真的)方法:
这正确地给了我:
然而,我觉得这应该更好地处理group_by
,而不是几个过滤,所以这里是一个重写:
但这一次,我得到:
我在这里错过了什么?我如何“解开”嵌套的 observables?
python - 如何等待 RxPy 并行线程完成
基于这个优秀的 SO 答案,我可以让多个任务在 RxPy 中并行工作,我的问题是你如何等待它们全部完成?我知道我可以使用线程,.join()
但 Rx 调度程序似乎没有任何这样的选项。.to_blocking()
也无济于事, MainThread 在所有通知都被触发并且完整的处理程序被调用之前完成。这是一个例子:
预期产出
实际输出
如果我取消注释睡眠呼叫,则实际输出
python - RxPy 将值传递给观察者
有没有办法根据用户输入将值传递给观察者(这意味着传递的值并不总是固定的)?
我试图四处寻找以了解 RxPy,但网络中几乎没有任何示例......
python - 如何使用 rxpython 使长时间运行的程序超时?
假设我有一个看起来像这样的长时间运行的 python 函数?
我希望能够设置超时1000ms
。
所以我在做类似的事情,创建一个 observable 并通过上述密集计算对其进行映射。
现在对于每个发出的值,如果它需要超过 1000 毫秒,我想结束可观察的,一旦我达到1000ms
使用on_error
或on_completed
上面的语句确实超时,并调用on_error
,但它继续完成计算强度计算,然后才返回到下一条语句。有没有更好的方法来做到这一点?
最后一条语句打印以下内容
这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果。
我想知道intns
函数是否在单独的线程上完成,我猜主线程在超时后继续执行,但我仍然想停止intns
线程上的计算函数,或者以某种方式杀死它。
system.reactive - Rx:获取第一个项目,但是当第二个项目到达时抛出异常
使用 Rx observable 流,有没有一种有效的方法来获取第一项,完成源 observable,但如果第二项到达则抛出异常?听起来使用 Rx 的情况不太好,但有没有聪明的方法来处理它?
python - 使用关闭选择器功能的 RxPY 缓冲区示例?
寻找一个使用 RxPy 的示例,其中使用关闭选择器函数来决定何时关闭缓冲区来完成缓冲。在我的示例中,我希望能够根据在 observables 数据本身中看到的值来关闭缓冲区边界。我假设使用关闭选择器是这样做的方法。