问题标签 [rx-py]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - RxPy3 group_by 创建“groupedobservables”但 flat_map 不起作用 - Reactive Python for Data Science Refactor
在决定学习 RxPy 时,我参加了 O'Reilly 的免费课程 Reactive Python for Data Science
我很快意识到代码是为 Python 3.5 和 RxPy2 编写的,所以我分叉了原始仓库并决定通过重构 RxPy3 的代码来学习
版本 2 的原始代码是:
我已经学会了导入from_
和operators
使用 `.pipe 将运算符串在一起。
到目前为止,我必须:
问题是它ops.group_by
提供了一组“groupedobservables”,这些ops.flat_map
代码grp.to_list()
不会映射到分组列表中。
原始代码在这里:Reactive Python for Data Science
我的重构代码是在这里派生的Reactive Python RxPy3,课程是 code_examples 文件6.4A_grouping_into_lists.py
python - 为什么 rx.interval 创建任何输出需要这么长时间?
我想创建一个以 100 毫秒间隔发出事件的可观察对象。
我尝试使用rx.interval
:
该过程未运行并提供一次性作为输出:
我怎样才能完成这项工作?
python - 使用 rxpy 创建间隔和倒计时
我正在制作一个 Discord 机器人,它允许您创建投票。用户可以将投票的持续时间作为参数。因此,我想每隔 5 秒或 10 秒(或者更多)刷新一次消息,并通过投票编辑用户还剩多少时间。例如,我想从 3600 秒开始倒计时,每 5 秒或 10 秒执行一个函数,该函数将编辑消息,直到时间变为 0。机器人端的一切都在我的控制之下,或多或少我知道如何实施它。
所以,我的想法是在当前时间等于开始时间+轮询持续时间时进行间隔并停止。所以我可以使用 rx.interval() 来创建 observable 并使用像 .take_while() 这样的运算符。
这是我的代码:
但我明白了AttributeError: 'Observable' object has no attribute 'take_while'
。
我想我应该把它放在管道或类似的东西中:
但我明白了TypeError: 'bool' object is not callable
如何使用 take_while?谢谢!
python - rxpy 高效地组合 observables
简介: 你好。我正在为我的用例探索 python rxpy库 - 我正在使用反应式编程概念构建执行管道。这样我希望我不必操纵太多的状态。虽然我的解决方案似乎是有效的,但我在尝试从其他 Observable 组成一个新的 Observable 时遇到了麻烦。
问题是我编写可观察数据的方式导致一些昂贵的计算被重复两次。为了性能,我真的想防止触发昂贵的计算。
我对反应式编程很陌生。试图挠头并查看互联网资源和参考文档 - 对我来说似乎有点太简洁了。请指教。
以下是一个玩具示例,说明了我在做什么:
输出:
你可以想到foo()
和bar()
是昂贵而复杂的操作。我首先构建一个 observable foo
。然后组成一个新的 observablebar_foo
包含foo
. 稍后将两者压缩在一起以计算最终结果foo(x)+bar(foo(x))
。
问题:
我能做些什么来防止
foo()
单个输入被多次触发?我有非常充分的理由保持foo()
和bar()
分开。另外我也不想明确地 memoizefoo()
。任何有在生产中使用 rxpy 经验的人都可以分享他们的经验。与等效的手工制作(但不可维护)代码相比,使用 rxpy 会导致更好的性能或速度下降吗?
python - Python反应式编程条件执行
RxPy
我开始使用该模块学习 Python 中的反应式编程。现在,我有一个案例,我想根据接收到的元素运行不同的功能。我以非常直接的方式实现了它
对我来说,这似乎有点难看,但是,我在操作/Observables 中找不到任何情况、开关或其他方法来根据收到的元素触发不同的执行。
有没有更好的方法来做到这一点?
python - Python:如何在 rx.subject 订阅 on_next 中调用异步函数
我需要在on_next
pythonrx
订阅中调用一个异步函数,如下所示:
但我收到异步错误:
我真的不想使用asyncio.get_event_loop().run_until_complete(...)
,因为我已经在运行主循环并且我不想开始一个新的循环也不使用嵌套循环。
我搜索了一下,发现一个lambda
函数不能是 async。而且我认为这可能是这里的问题,因为我真的不知道如何在没有使用库on_next
的 lambda 函数的情况下获取值。rx
我还搜索了async-rx库,但这看起来唯一可以做的不同的事情是await subscribe(...)
,但这不是我想要的。我想要类似的东西subscribe(on_next=await...)
。
那可能吗?从我的背景来看,在 a 中启动一个函数javascript
很容易,所以这对我来说似乎是一项可能的任务。希望有人能找到解决方案。async
subscribe
非常感谢!
reactive-programming - RxPy:使用可变计数进行缓冲
有谁知道如何使用 RxPy 缓冲具有可变计数的可观察流,其中计数基于缓冲数据的值?
例如,给定序列:
(d,1)-(d,2)-(d,2)-(d,3)-(d,3)-(d,3)-(d,2)-(d,2)-(d,3)-(d,3)-(d,3)
,
其中d
表示一些数据,缓冲区必须根据整数值顺序组合发射的项目,因此我们将得到 5 个缓冲发射,如下所示:
((d,1))-((d,2),(d,2))-((d,3),(d,3),(d,3))-((d,2),(d,2))-((d,3),(d,3),(d,3))
rxjs - 如何使用 RxPY 重写这个倒数计时器?
这是我试图用 RxPY 重现的 RxJS 代码。
这就是我所经历的,但不是
有人可以帮助我了解我在这里缺少什么吗?
python - rx.interval 和 rx.timer 不产生结果
我目前正在尝试学习使用 RxPy,但遇到了以下问题。我尝试使用两者rx.timer
,rx.interval
但都没有产生输出。
我的代码:
产生这个结果:
我在这里想念什么?
python - 使用 RxPY 将函数应用于字典列表
我有一个字典列表,并且我有一个要应用于此列表并获得结果的函数。
我可以使用RxPY得到这个结果吗?谢谢您的帮助。