问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2909 浏览

python - How can I make an rx.py Observable from a stream such as stdin?

I'm trying to get my head around the rxpy library for functional reactive programming (FRP) and I've already hit a roadblock. I'm writing a small program that expects data to be streamed in via standard input (sys.stdin).

My question is therefore simple: how can I create an rx.Observable instance that will asynchronously read from stdin? Are there built-in mechanisms to create Observable instances from streams?

0 投票
2 回答
802 浏览

reactive-programming - ReactiveX (Rx) - 检测长按事件

我想知道在 Rx 中解决以下问题的规范方法是什么:假设我有两个 observablesmouse_downmouse_up,其元素代表鼠标按钮按下。在一个非常简单的场景中,如果我想检测长按,我可以通过以下方式进行(在这种情况下使用 RxPy,但在任何 Rx 实现中概念上都是相同的):

但是,当我们需要将一些信息从mouse_down可观察对象提升到可观察对象时,就会出现问题mouse_up。例如,考虑是否可观察的元素存储了有关按下了哪个鼠标按钮的信息。显然,我们只想mouse_downmouse_up相应的按钮配对。我想出的一个解决方案是:

如果有更直接的解决方案,我很想听听 - 但据我所知,这是可行的。但是,如果我们还想检测鼠标在mouse_down和之间移动了多远,事情就会变得更加复杂mouse_up。为此,我们需要引入一个新的 observable mouse_move,它携带有关鼠标位置的信息。

但是,这几乎是我卡住的地方。每当一个按钮被按住超过 1 秒时,我都会得到一堆布尔值。但是,我只想在它们全部为假时检测长按,这听起来像是all 运算符的完美案例。感觉好像只少了一小步,但到目前为止我还没有弄清楚如何让它工作。也许我也在以一种倒退的方式做事。期待任何建议。

0 投票
1 回答
1374 浏览

python - 不是 Python 函数

我正在尝试构建一个函数,该函数可以用作我正在映射的 RxPy 流的处理程序。我拥有的函数需要访问定义该变量的范围之外的变量,对我来说,这意味着我需要使用某种闭包。所以我到达 functools.partial 以关闭一个变量并返回一个部分函数,​​我可以将其作为观察者传递给我的流。

但是,这样做会导致以下结果:

这是一些重现问题的示例代码:

问题似乎是我的部分函数False在调用inspect.isfunction.

如何使我的部分函数通过此检查?有没有办法轻松地将部分函数转换为“真实”函数类型?

0 投票
2 回答
1524 浏览

reactive-programming - 当两个 Observables 同时改变时合并 Observables

我正在尝试使用 RxPY 将 ReactiveX 集成到我的 GUI 中。这是一个通用的 ReactiveX 问题。

假设我有一个依赖于多个 Observable 流的可视化,使用combine_latest(stream1, stream2, plot_function). 当一个 Observable 发生变化时,这很有效,比如当用户修改一个值时;使用新值更新可视化。但是,有时两个 Observable 会同时更新,例如当用户从单个文件加载两个流的数据时。从技术上讲,一个 Observable 将在另一个之前更新(以导入函数中的先到者为准)。结果,情节将被更新两次,但出于所有意图和目的,它只需要更新一次。

我的一些可视化计算成本很高,所以我想确保如果两个流同时更新那么组合流只发出一个值。我能想到几个选择:

  1. 在组合流上使用debounce()小超时(如 50 毫秒)。这种方法对我来说似乎很脏。

  2. 不要combine_latest直接使用。将两个流包装在一个新对象中,该对象也具有某种updating标志。如果我将标志设置为 True,那么在我将标志设置为 Falseupdating之前不要发出任何内容。updating这种方法感觉是有状态的,它破坏了流的可组合性。

  3. 告诉所有可视化在所有流都更新之前不要更新。同样,这打破了封装,因为可视化不应该关心上游发生了什么。它应该只从组合流中接收新值并制作漂亮的图片。

  4. 使可视化足够细化,以至于首先更新一个流只会带来很小的性能损失。这对于某些可视化来说是不可能的,例如基于点和网格大小计算网格的可视化。如果点网格大小发生变化,则需要重新计算整个网格。

Rx 中是否有一些工具可以处理“同时”更新多个流?我觉得我要求的是魔法。

对于使用 Rx 制作 GUI 程序的任何人:除了通过流发送新值之外,我还应该为模型使用一些更好的架构吗?

如果这个问题不清楚,请在评论中告诉我,我会尝试做一个更具体的例子。

例子

这是一个示例 Python RxPY 程序:

这打印:

我怎样才能同时更新 和 的值,stream1以便stream2以下成为结果?

换句话说,我怎么能combine_latest以这样一种方式进行修改,以便我可以在下游告诉它“嘿,等一下,我会在你发出下一个值之前更新其他流”?

0 投票
1 回答
2108 浏览

python - subscribe_on 与 RxPY 中的 from_iterable/range

我正在尝试在 python 的响应式扩展中进行调度。我想用来subscribe_on并行处理多个可观察对象。如果 observable 是用创建的,这很好用just,但如果使用rangeorfrom_则不行。

just默认为Scheduler.immediate,而其他生成器默认为Scheduler.current_thread. 这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:

如果调度程序直接传递给生成器,它可以工作observe_on,但我想将可观察的创建与处理分离,并实现如下效果:

我误会了subscribe_on吗?我的方法错了吗?

0 投票
1 回答
1035 浏览

python - 集成响应式扩展和扭曲的基本示例?

我正在寻找一个关于如何使用响应式扩展(RxPY)和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小 hello 应用程序。

我想使用 RxPY 库来挂钩这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作......

我可以找到的所有 RxPY 示例都可以生成它们自己的流,例如从可迭代的,以下代码流整数 0-9:

或者包含在更复杂的示例中(例如子类化 WebSocket 处理程序)。知道如何拦截打印消息吗?EG,从 Twisted 反应器生成一个可观察的流?

0 投票
2 回答
1424 浏览

python - 如何在 Python 中使用响应式扩展 (Rx) LINQ?

我下载了 RxPY并在看Rx 教程时遇到:

在此处输入图像描述

那么我将如何使用 RxPY 在 Python 中完成上述工作?特别是,查询q = from x in xs where...不是语法上有效的 Python 代码——那么如何改变呢?

0 投票
1 回答
719 浏览

python - 如何使用 Reactive Extensions for Python (RxPY) 创建滴答时间序列?

我的设置:

我有一个(时间,价格)元组中的股票价格数据列表:

我想把它做成一个RxPY Observable,以便我可以逐个回测交易策略:

我希望从 2015 年 1 月 12 日开始回测,所以我假设我必须使用以下调度程序:

要运行我的回测,我会:

问题:

我希望看到:

但我得到了

问题是:

  • 时间戳错误:我得到的是今天的日期2015-12-20 08:43:45.882000而不是历史日期(例如datetime(2015, 1, 12)
  • 价格仍然包含时间成分
  • 调度程序没有按照我的要求在 2015 年 1 月 12 日启动,因为我看到 2015 年 1 月 9 日的数据点仍在使用中。

我也尝试过使用scheduler.now()

date=2015-01-12 00:00:00但后来由于某种原因日期被卡住了:

如何解决上述问题并获得我最初预期的结果?

0 投票
1 回答
2374 浏览

python - 当多个输入同时滴答时,如何防止 combine_latest 多次触发?

每当任何输入打勾时,我都会使用Reactive Extensionscombine_latest执行操作。问题是,如果多个输入同时滴答,然后combine_latest在每个单独的输入滴答后触发多次。这会让人头疼,因为combine_latest它实际上是用过时的值虚假地滴答作响。

最小工作示例,其中fast每 10 毫秒一个slow可观察的滴答声,每 30 毫秒一个可观察的滴答声:

每 30 毫秒,当fastslow同时滴答时,combine_latest不希望地触发两次——输出如下:

如何防止combine_latest虚假滴答作响?

0 投票
1 回答
177 浏览

python - 使用 RxPY 观察按钮按下

如何使用 Python“收听”按键事件流?

我想做这样的事情:

但我怎样才能创造这个click_stream