问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
222 浏览

python - RxPy3 group_by 创建“groupedobservables”但 flat_map 不起作用 - Reactive Python for Data Science Refactor

在决定学习 RxPy 时,我参加了 O'Reilly 的免费课程 Reactive Python for Data Science

我很快意识到代码是为 Python 3.5 和 RxPy2 编写的,所以我分叉了原始仓库并决定通过重构 RxPy3 的代码来学习

版本 2 的原始代码是:

我已经学会了导入from_operators使用 `.pipe 将运算符串在一起。

到目前为止,我必须:

问题是它ops.group_by提供了一组“groupedobservables”,这些ops.flat_map代码grp.to_list()不会映射到分组列表中。

原始代码在这里:Reactive Python for Data Science

我的重构代码是在这里派生的Reactive Python RxPy3,课程是 code_examples 文件6.4A_grouping_into_lists.py

0 投票
1 回答
227 浏览

python - 为什么 rx.interval 创建任何输出需要这么长时间?

我想创建一个以 100 毫秒间隔发出事件的可观察对象。

我尝试使用rx.interval

该过程未运行并提供一次性作为输出:

我怎样才能完成这项工作?

0 投票
1 回答
230 浏览

python - 使用 rxpy 创建间隔和倒计时

我正在制作一个 Discord 机器人,它允许您创建投票。用户可以将投票的持续时间作为参数。因此,我想每隔 5 秒或 10 秒(或者更多)刷新一次消息,并通过投票编辑用户还剩多少时间。例如,我想从 3600 秒开始倒计时,每 5 秒或 10 秒执行一个函数,该函数将编辑消息,直到时间变​​为 0。机器人端的一切都在我的控制之下,或多或少我知道如何实施它。

所以,我的想法是在当前时间等于开始时间+轮询持续时间时进行间隔并停止。所以我可以使用 rx.interval() 来创建 observable 并使用像 .take_while() 这样的运算符。

这是我的代码:

但我明白了AttributeError: 'Observable' object has no attribute 'take_while'

我想我应该把它放在管道或类似的东西中:

但我明白了TypeError: 'bool' object is not callable

如何使用 take_while?谢谢!

0 投票
1 回答
354 浏览

python - rxpy 高效地组合 observables

简介: 你好。我正在为我的用例探索 python rxpy库 - 我正在使用反应式编程概念构建执行管道。这样我希望我不必操纵太多的状态。虽然我的解决方案似乎是有效的,但我在尝试从其他 Observable 组成一个新的 Observable 时遇到了麻烦。

问题是我编写可观察数据的方式导致一些昂贵的计算被重复两次。为了性能,我真的想防止触发昂贵的计算。

我对反应式编程很陌生。试图挠头并查看互联网资源和参考文档 - 对我来说似乎有点太简洁了。请指教。

以下是一个玩具示例,说明了我在做什么:

输出:

你可以想到foo()bar()是昂贵而复杂的操作。我首先构建一个 observable foo。然后组成一个新的 observablebar_foo包含foo. 稍后将两者压缩在一起以计算最终结果foo(x)+bar(foo(x))

问题:

  1. 我能做些什么来防止foo()单个输入被多次触发?我有非常充分的理由保持foo()bar()分开。另外我也不想明确地 memoize foo()

  2. 任何有在生产中使用 rxpy 经验的人都可以分享他们的经验。与等效的手工制作(但不可维护)代码相比,使用 rxpy 会导致更好的性能或速度下降吗?

0 投票
1 回答
125 浏览

python - Python反应式编程条件执行

RxPy我开始使用该模块学习 Python 中的反应式编程。现在,我有一个案例,我想根据接收到的元素运行不同的功能。我以非常直接的方式实现了它

对我来说,这似乎有点难看,但是,我在操作/Observables 中找不到任何情况、开关或其他方法来根据收到的元素触发不同的执行。

有没有更好的方法来做到这一点?

0 投票
1 回答
163 浏览

python - Python:如何在 rx.subject 订阅 on_next 中调用异步函数

我需要在on_nextpythonrx订阅中调用一个异步函数,如下所示:

但我收到异步错误:

我真的不想使用asyncio.get_event_loop().run_until_complete(...),因为我已经在运行主循环并且我不想开始一个新的循环也不使用嵌套循环。

我搜索了一下,发现一个lambda函数不能是 async。而且我认为这可能是这里的问题,因为我真的不知道如何在没有使用库on_next的 lambda 函数的情况下获取值。rx

我还搜索了async-rx库,但这看起来唯一可以做的不同的事情是await subscribe(...),但这不是我想要的。我想要类似的东西subscribe(on_next=await...)

那可能吗?从我的背景来看,在 a 中启动一个函数javascript很容易,所以这对我来说似乎是一项可能的任务。希望有人能找到解决方案。asyncsubscribe

非常感谢!

0 投票
0 回答
42 浏览

reactive-programming - RxPy:使用可变计数进行缓冲

有谁知道如何使用 RxPy 缓冲具有可变计数的可观察流,其中计数基于缓冲数据的值?

例如,给定序列:

(d,1)-(d,2)-(d,2)-(d,3)-(d,3)-(d,3)-(d,2)-(d,2)-(d,3)-(d,3)-(d,3),

其中d表示一些数据,缓冲区必须根据整数值顺序组合发射的项目,因此我们将得到 5 个缓冲发射,如下所示:

((d,1))-((d,2),(d,2))-((d,3),(d,3),(d,3))-((d,2),(d,2))-((d,3),(d,3),(d,3))

0 投票
1 回答
52 浏览

rxjs - 如何使用 RxPY 重写这个倒数计时器?

这是我试图用 RxPY 重现的 RxJS 代码。

这就是我所经历的,但不是

有人可以帮助我了解我在这里缺少什么吗?

0 投票
1 回答
50 浏览

python - rx.interval 和 rx.timer 不产生结果

我目前正在尝试学习使用 RxPy,但遇到了以下问题。我尝试使用两者rx.timerrx.interval但都没有产生输出。

我的代码:

产生这个结果:

我在这里想念什么?

0 投票
0 回答
9 浏览

python - 使用 RxPY 将函数应用于字典列表

我有一个字典列表,并且我有一个要应用于此列表并获得结果的函数。

我可以使用RxPY得到这个结果吗?谢谢您的帮助。