问题标签 [rx-py]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - How can I make an rx.py Observable from a stream such as stdin?
I'm trying to get my head around the rxpy
library for functional reactive programming (FRP) and I've already hit a roadblock. I'm writing a small program that expects data to be streamed in via standard input (sys.stdin
).
My question is therefore simple: how can I create an rx.Observable
instance that will asynchronously read from stdin? Are there built-in mechanisms to create Observable
instances from streams?
reactive-programming - ReactiveX (Rx) - 检测长按事件
我想知道在 Rx 中解决以下问题的规范方法是什么:假设我有两个 observablesmouse_down
和mouse_up
,其元素代表鼠标按钮按下。在一个非常简单的场景中,如果我想检测长按,我可以通过以下方式进行(在这种情况下使用 RxPy,但在任何 Rx 实现中概念上都是相同的):
但是,当我们需要将一些信息从mouse_down
可观察对象提升到可观察对象时,就会出现问题mouse_up
。例如,考虑是否可观察的元素存储了有关按下了哪个鼠标按钮的信息。显然,我们只想mouse_down
与mouse_up
相应的按钮配对。我想出的一个解决方案是:
如果有更直接的解决方案,我很想听听 - 但据我所知,这是可行的。但是,如果我们还想检测鼠标在mouse_down
和之间移动了多远,事情就会变得更加复杂mouse_up
。为此,我们需要引入一个新的 observable mouse_move
,它携带有关鼠标位置的信息。
但是,这几乎是我卡住的地方。每当一个按钮被按住超过 1 秒时,我都会得到一堆布尔值。但是,我只想在它们全部为假时检测长按,这听起来像是all 运算符的完美案例。感觉好像只少了一小步,但到目前为止我还没有弄清楚如何让它工作。也许我也在以一种倒退的方式做事。期待任何建议。
python - 不是 Python 函数
我正在尝试构建一个函数,该函数可以用作我正在映射的 RxPy 流的处理程序。我拥有的函数需要访问定义该变量的范围之外的变量,对我来说,这意味着我需要使用某种闭包。所以我到达 functools.partial 以关闭一个变量并返回一个部分函数,我可以将其作为观察者传递给我的流。
但是,这样做会导致以下结果:
这是一些重现问题的示例代码:
问题似乎是我的部分函数False
在调用inspect.isfunction
.
如何使我的部分函数通过此检查?有没有办法轻松地将部分函数转换为“真实”函数类型?
reactive-programming - 当两个 Observables 同时改变时合并 Observables
我正在尝试使用 RxPY 将 ReactiveX 集成到我的 GUI 中。这是一个通用的 ReactiveX 问题。
假设我有一个依赖于多个 Observable 流的可视化,使用combine_latest(stream1, stream2, plot_function)
. 当一个 Observable 发生变化时,这很有效,比如当用户修改一个值时;使用新值更新可视化。但是,有时两个 Observable 会同时更新,例如当用户从单个文件加载两个流的数据时。从技术上讲,一个 Observable 将在另一个之前更新(以导入函数中的先到者为准)。结果,情节将被更新两次,但出于所有意图和目的,它只需要更新一次。
我的一些可视化计算成本很高,所以我想确保如果两个流同时更新,那么组合流只发出一个值。我能想到几个选择:
在组合流上使用
debounce()
小超时(如 50 毫秒)。这种方法对我来说似乎很脏。不要
combine_latest
直接使用。将两个流包装在一个新对象中,该对象也具有某种updating
标志。如果我将标志设置为 True,那么在我将标志设置为 Falseupdating
之前不要发出任何内容。updating
这种方法感觉是有状态的,它破坏了流的可组合性。告诉所有可视化在所有流都更新之前不要更新。同样,这打破了封装,因为可视化不应该关心上游发生了什么。它应该只从组合流中接收新值并制作漂亮的图片。
使可视化足够细化,以至于首先更新一个流只会带来很小的性能损失。这对于某些可视化来说是不可能的,例如基于点和网格大小计算网格的可视化。如果点或网格大小发生变化,则需要重新计算整个网格。
Rx 中是否有一些工具可以处理“同时”更新多个流?我觉得我要求的是魔法。
对于使用 Rx 制作 GUI 程序的任何人:除了通过流发送新值之外,我还应该为模型使用一些更好的架构吗?
如果这个问题不清楚,请在评论中告诉我,我会尝试做一个更具体的例子。
例子
这是一个示例 Python RxPY 程序:
这打印:
我怎样才能同时更新 和 的值,stream1
以便stream2
以下成为结果?
换句话说,我怎么能combine_latest
以这样一种方式进行修改,以便我可以在下游告诉它“嘿,等一下,我会在你发出下一个值之前更新其他流”?
python - subscribe_on 与 RxPY 中的 from_iterable/range
我正在尝试在 python 的响应式扩展中进行调度。我想用来subscribe_on
并行处理多个可观察对象。如果 observable 是用创建的,这很好用just
,但如果使用range
orfrom_
则不行。
just
默认为Scheduler.immediate
,而其他生成器默认为Scheduler.current_thread
. 这导致了差异,但对我来说感觉不一致。可能是因为我没有掌握完整的问题。
考虑以下示例:
如果调度程序直接传递给生成器,它可以工作observe_on
,但我想将可观察的创建与处理分离,并实现如下效果:
我误会了subscribe_on
吗?我的方法错了吗?
python - 集成响应式扩展和扭曲的基本示例?
我正在寻找一个关于如何使用响应式扩展(RxPY)和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小 hello 应用程序。
我想使用 RxPY 库来挂钩这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作......
我可以找到的所有 RxPY 示例都可以生成它们自己的流,例如从可迭代的,以下代码流整数 0-9:
或者包含在更复杂的示例中(例如子类化 WebSocket 处理程序)。知道如何拦截打印消息吗?EG,从 Twisted 反应器生成一个可观察的流?
python - 如何使用 Reactive Extensions for Python (RxPY) 创建滴答时间序列?
我的设置:
我有一个(时间,价格)元组中的股票价格数据列表:
我想把它做成一个RxPY Observable
,以便我可以逐个回测交易策略:
我希望从 2015 年 1 月 12 日开始回测,所以我假设我必须使用以下调度程序:
要运行我的回测,我会:
问题:
我希望看到:
但我得到了
问题是:
- 时间戳错误:我得到的是今天的日期
2015-12-20 08:43:45.882000
而不是历史日期(例如datetime(2015, 1, 12)
) - 价格仍然包含时间成分
- 调度程序没有按照我的要求在 2015 年 1 月 12 日启动,因为我看到 2015 年 1 月 9 日的数据点仍在使用中。
我也尝试过使用scheduler.now()
:
date=2015-01-12 00:00:00
但后来由于某种原因日期被卡住了:
如何解决上述问题并获得我最初预期的结果?
python - 当多个输入同时滴答时,如何防止 combine_latest 多次触发?
每当任何输入打勾时,我都会使用Reactive Extensions来combine_latest
执行操作。问题是,如果多个输入同时滴答,然后combine_latest
在每个单独的输入滴答后触发多次。这会让人头疼,因为combine_latest
它实际上是用过时的值虚假地滴答作响。
最小工作示例,其中fast
每 10 毫秒一个slow
可观察的滴答声,每 30 毫秒一个可观察的滴答声:
每 30 毫秒,当fast
和slow
同时滴答时,combine_latest
不希望地触发两次——输出如下:
如何防止combine_latest
虚假滴答作响?
python - 使用 RxPY 观察按钮按下
如何使用 Python“收听”按键事件流?
我想做这样的事情:
但我怎样才能创造这个click_stream
?