问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
268 浏览

python-3.x - Python 在生产环境中从命令式转变为函数式

我被函数式编程和反应式方法感染了。对于灵感和想法,我使用Haskell一篇很棒的 Rick Hickey 文章。在 python 世界中,我为自己找到了RxPyfuncy库。现在我有数千行命令式代码,我想让它们发挥作用。如果一切都被简化了,我就有了一个用于数据库的 getter 和 setter 接口,以及一个像状态机一样工作的内核。这是它在伪代码上的样子:

我有很多iffor我的状态处理程序中的命令式业务逻辑。我真的很尴尬如何从当前模型转向反应性和功能性。这里一切都非常简单,但是有过类似经历的人会理解我的。我需要关于下一步的建议,从想法到实践,不仅仅是简单的实用程序或功能风格的琐碎 REST api。此外,我将通过链接到真正大型项目的源代码获得很大帮助,只要我能得到想法。感谢所有回复的人,他们在将命令式代码移植到函数式代码中获得了真实的体验。是的,我知道它不会移植代码,而是从头开始重写。同样,我需要具有大量业务逻辑的项目示例,其中涉及数据和数据突变。无论如何,谢谢你。

0 投票
0 回答
114 浏览

python - VisualStudio 中是否有针对 Reactive Extension rx.py 的智能感知/自动完成功能?

我是 Visual Studio,我在 Python 3 环境中启动了一个新的 Python 项目。Rx 已安装 pip,我有以下代码

它运行良好,但是当我输入时,Observable.ran...我希望看到 Intellisense 弹出 `range(...)" 作为选项,就像通常那样,但它从来没有出现过。

为什么会这样,我怎样才能让 Intellisense 为它工作?

0 投票
1 回答
240 浏览

operators - rxpy 将项目注入 observable

这个问题是关于rxpy的。

我正在尝试构建一个反应系统来处理来自可观察源的消息。除此之外,我正在尝试将它与基于 zookeeper 的领导选举系统集成。

这种组合将只允许进程群中的一个领导者处理消息流。以下是我正在尝试构建的代码的要点。

它工作正常,但我需要在两者之间注入skip_until一块take_until来处理回填。这旨在处理领导进程失败与另一个担任领导的进程之间的潜在差距。每条处理过的消息都会留下一条记录,以便新的领导者可以在继续处理流之前赶上丢失的消息(如果有的话)。

我尝试了start_with运算符但没有成功。我不是以一种不适合使用的方式接近它吗?

最终,我正在寻找的解决方案是在由另一个流中的事件触发的流中注入特定数量的项目。

0 投票
1 回答
325 浏览

python-3.x - 如何正确重组分组的 observables?

我正在尝试创建一个分析股票价格的工具。

我有不同股票的价格数据流,并且我希望有一个可观察的,以便在它收到一组新的、不同的和完整的价格时发出事件。

我的计划:将流分组为不同股票的不同子流,并重新组合它们的最新值。

假设我有这样的事件流:

这是我的第一个(天真的)方法:

这正确地给了我:

然而,我觉得这应该更好地处理group_by,而不是几个过滤,所以这里是一个重写:

但这一次,我得到:

我在这里错过了什么?我如何“解开”嵌套的 observables?

0 投票
3 回答
2938 浏览

python - 如何等待 RxPy 并行线程完成

基于这个优秀的 SO 答案,我可以让多个任务在 RxPy 中并行工作,我的问题是你如何等待它们全部完成?我知道我可以使用线程,.join()但 Rx 调度程序似乎没有任何这样的选项。.to_blocking()也无济于事, MainThread 在所有通知都被触发并且完整的处理程序被调用之前完成。这是一个例子:

预期产出

实际输出

如果我取消注释睡眠呼叫,则实际输出

0 投票
0 回答
654 浏览

python - 使用 Python (RxPy) 中的响应式编程流式传输 csv 文件

我开始使用 RxPy,但我找不到流式传输表示带有时间戳的股票行情的 csv 文件的方法。有人可以帮助我了解如何使用调度程序返回到我的第一个刻度,然后以确切的时间戳从 csv 流式传输每一行吗?

您可以在此处下载 csv 文件。

编辑

我试图从这个博客转换 RxJS 代码但没有成功:

有人可以帮我解决这个问题吗?

0 投票
1 回答
772 浏览

python - RxPy 将值传递给观察者

有没有办法根据用户输入将值传递给观察者(这意味着传递的值并不总是固定的)?

我试图四处寻找以了解 RxPy,但网络中几乎没有任何示例......

0 投票
5 回答
529 浏览

python - 如何使用 rxpython 使长时间运行的程序超时?

假设我有一个看起来像这样的长时间运行的 python 函数?

我希望能够设置超时1000ms

所以我在做类似的事情,创建一个 observable 并通过上述密集计算对其进行映射。

现在对于每个发出的值,如果它需要超过 1000 毫秒,我想结束可观察的,一旦我达到1000ms使用on_erroron_completed

上面的语句确实超时,并调用on_error,但它继续完成计算强度计算,然后才返回到下一条语句。有没有更好的方法来做到这一点?

最后一条语句打印以下内容

这个想法是,如果一个特定的函数超时,我希望能够继续我的程序,并忽略结果。

我想知道intns函数是否在单独的线程上完成,我猜主线程在超时后继续执行,但我仍然想停止intns线程上的计算函数,或者以某种方式杀死它。

0 投票
1 回答
55 浏览

system.reactive - Rx:获取第一个项目,但是当第二个项目到达时抛出异常

使用 Rx observable 流,有没有一种有效的方法来获取第一项,完成源 observable,但如果第二项到达则抛出异常?听起来使用 Rx 的情况不太好,但有没有聪明的方法来处理它?

0 投票
0 回答
151 浏览

python - 使用关闭选择器功能的 RxPY 缓冲区示例?

寻找一个使用 RxPy 的示例,其中使用关闭选择器函数来决定何时关闭缓冲区来完成缓冲。在我的示例中,我希望能够根据在 observables 数据本身中看到的值来关闭缓冲区边界。我假设使用关闭选择器是这样做的方法。