问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
176 浏览

python - 如何存储 Observer on_completed 结果

我对 Rx 和 RxPy 比较陌生 - 我正在尝试做一件基本的事情,那就是访问我在 Observer 末尾存储的值on_completed。我有一种感觉,要么我遗漏了一些非常明显的东西,要么我可能将 Observable 概念弯曲成它不应该成为的东西。无论哪种方式,希望我能得到一些指导。

我已经查看了文档,materialize但它们似乎并不完全匹配。也研究过DoDisposable但找不到很多接近我需要的例子。

无论如何可以从 on_completed 方法中访问一个值吗?没有将某些东西保存为全局变量(这对我来说似乎是个坏主意)我不确定这是否可能?基本上无论它输出什么on_completed或类似的东西。也许DoFinally

0 投票
1 回答
28 浏览

observable - Observable - 仅订阅和处理每次调用可用的最新消息

假设您有一个 Observable 值流,它可以非常快速地推送值。

现在假设您有一个订阅者需要使用来自该可观察流的最新值来更新数据库中的记录,即有点慢的 I/O 绑定消费者。

换句话说,想象

除了可观察对象在不同的​​ on_next 调用之间创建的最新值之外,还有什么方法可以“跳过”所有内容?

换句话说:假设在上面的示例中,obs 开始推送值 1,2,3... 订阅者调用值为“1”的 do_some_io_bound_operation。这需要一段时间 - 完成时,值 2 和 3 可用。但是,与其对两个新的可用值都调用 do_some_io_bound_operation,理想情况下,订阅者应该跳过值“2”并直接移动到值 3。

有点难以描述 - 我希望意图很明确。有什么办法可以做到这一点?

我想 .Buffer() 进入这个方向 - 但我见过的大多数应用程序只是缓冲一些固定数量的元素或时间跨度,而我需要动态缓冲(缓冲在执行 on_next 时发生的所有事情)

谢谢

0 投票
2 回答
1066 浏览

python-3.x - 我在 python rx=3.0.1 中收到“AttributeError: type object 'Observable' has no attribute 'from_'”错误

这是我在笔记本电脑上尝试过的“使用 python 进行反应式编程”一书中的代码。

如果我跑

输出应如下所示:

0 投票
1 回答
465 浏览

javascript - 如何将 RxPy 数据流发送到前端 javascript

我正在尝试将 python ReactiveX 流(使用 RxPy 库)发送到 Web UI 组件上的 javascript,但我似乎找不到这样做的方法。此外,我可能需要将进入 Javascript 的数据流放入各种 RxJS Observable 中以进行进一步处理。你能帮我理解如何实现这一目标吗?我仍然掌握 ReactiveX,所以也许我缺少一些基本概念,但我正在努力在网上找到类似的东西。

这个问题出现在我正在开发一个桌面应用程序时,该应用程序从 csv 或 zeromq 端点获取数据,并将其流式传输到将动态绘制数据的 UI(随着新数据的到来更新绘图)。我正在使用 Electron 构建我的应用程序,使用 python 作为我的后端代码。Python 是必须的,因为我将使用一些 TensorFlow 模型扩展应用程序。

多年来,作为初始结构的示例非常好,我编写了一些示例代码来玩,但我似乎无法让它工作。我设法从 UI 按钮一直到 python 脚本,但我陷入了 PriceApi.get_stream(...) 方法的返回。

索引.html

前端是笔直的。

api.py:

ZeroRPC 服务器文件与上述链接中的文件类似。

core_operator.py

这是主要逻辑将从 zeroMQ 订阅获取价格的文件,但目前只是从 csv 创建一个 Observable。

渲染.js

最后,应该接收流的 javascript:

我一直在研究使用 WebSockets,但未能理解如何实现它。我确实找到了一些使用 Tornado 服务器的示例,但是我试图使其尽可能纯净,而且,我已经有了来自 Electron 的客户端/服务器结构,我无法直接使用它,这感觉很奇怪。此外,我正在尝试将整个系统维护为 PUSH 结构,因为数据要求不允许 PULL 类型的模式,以及定期轮询等。

非常感谢您随时可以为此付出代价,如果您需要任何进一步的详细信息或解释,请告诉我。

0 投票
1 回答
560 浏览

python-3.x - 我可以将 python 3.7 代码转换为较低版本吗?

在基于 python 的项目中,我想使用功能数据类和库,例如当前的 RxPY 版本。

但是,我们绑定到只有 Python 3.5.5 的环境(Yocto 系统)

从 TypeScript/JavaScript 我知道某些语言允许将代码转换为较低的语言版本。

可以将 Python 3.7 代码转换为较低版本吗?

非常感谢

0 投票
0 回答
218 浏览

python - 如何发布带有可观察的芹菜状态?

我正在构建一个使用 celery 执行异步任务的进度条应用程序。该任务以 GraphQL 突变开始,并每秒更新自己的状态:

当我想通过 GraphQL 订阅发布该状态时,属性设置不正确。我究竟做错了什么?

我想我不明白 Observable 的权利。有人能帮我吗?提前致谢

0 投票
1 回答
902 浏览

python - 在 Python Flask 服务器中使用 RxPY 处理基于推送的事件的 websocket 断开连接

语境

我正在使用RxPY启用通过 websockets 发送基于推送的事件。我在带有geventFlask服务器中使用烧瓶套接字。events 类包含一个充当事件发布者的rx.subject.BehaviorSubject,而 websocket 客户端订阅更改。

问题

我希望能够检测到客户端何时断开连接,以便正确处理资源。问题是当套接字断开连接并ws.send引发异常但它在 lambda 内时。

解决方案?

有没有办法将异常传递给父函数?

另一种解决方案是在不调用的情况下检测 websocket 断开连接,并且可以在 lambda 之外进行检查,尽管我在烧瓶套接字库中ws.send找不到这种方法。

0 投票
1 回答
141 浏览

python - 在 RxPy 中使用 combine_latest 和超过 2 个 observables

如果我有这样的事情:

它给了我这样的输出:

不过,如何将其扩展到 2 个以上的可观察对象?例如,如果我这样做:

我得到:

但是,我真正想要的是平面列表中的最新 3 个,例如:

我怎样才能做到这一点?

0 投票
2 回答
317 浏览

python - 如何在 Python 中同步 rx 管道?

我使用 RxPy 来处理文件,我想构建管道加载序列

但是我得到了一个我没有预料到的结果:似乎每个管道都是异步运行的,因为我没有表示“停止点”。我希望第二个和第三个管道只有在第一个管道完成后才开始。如何解决?

0 投票
0 回答
79 浏览

python - python中基于事件的反应式应用程序的无限循环实现

我想编写一个可以无限生成事件并以不同方式处理它们的应用程序。准确地说,假设我想每分钟测量一次温度,并根据调整气候控制。我决定使用RxPY,想法是定期从一个线程发送事件,但将它们的处理安排在池中的其他线程上。

为此实现连续无限过程的最佳方法是什么?我目前的想法是有一个好的旧while True:循环,但由于某种原因,它似乎不适合我。如果可以以不同的方式实施,有人有什么建议吗?

从主要方法我这样做:

一段时间后,以下函数会像这样处理这些事件:

像这样的 main 方法似乎更合适,但是如何使主线程不立即终止?