问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2523 浏览

python - 如何使用 asyncio 在单独的线程上通知 RxPY 观察者?

(注意:这个问题的背景很冗长,但是底部有一个SSCCE可以跳过)

背景

我正在尝试开发一个基于 Python 的 CLI 来与 Web 服务交互。在我的代码库中,我有一个CommunicationService类可以处理与 Web 服务的所有直接通信。它公开了一个received_response属性,该属性返回一个Observable(来自 RxPY),其他对象可以订阅该属性,以便在从 Web 服务接收到响应时得到通知。

我的 CLI 逻辑基于click库,其中我的子命令之一实现如下:

这里发生的事情(在response_handler不是的情况下None)是子命令表现得像一个协程,它等待来自 Web 服务 ( self.on_response == CommunicationService.received_response) 的响应并从它可以处理的第一个响应中返回一些处理后的值。

CommunicationService我试图通过创建完全模拟的测试用例来测试我的 CLI 的行为;一个假Subject的被创建(它可以作为一个Observable)并被CommunicationService.received_response模拟返回它。作为测试的一部分,on_next调用主体的方法将模拟 Web 服务响应传回生产代码:

我使用了一个 click 'result callback' 函数,该函数在 CLI 调用结束时被调用并阻塞,直到协程(子命令)完成:

问题

在测试开始时,我跑去CliRunner.invoke发射整个shebang。问题是这是一个阻塞调用,并且会阻塞线程,直到 CLI 完成并返回结果,如果我需要我的测试线程继续运行以便它可以同时产生模拟 Web 服务响应,这将没有帮助。

我想我需要做的是CliRunner.invoke使用ThreadPoolExecutor. 这允许测试逻辑在原始线程上继续并执行@when上面发布的步骤。但是,发布的通知 mock_received_response_subject.on_next 似乎不会触发在子命令中继续执行

我相信解决方案将涉及使用 RxPY's AsyncIOScheduler,但我发现这方面的文档有点稀疏且无益。

SSCCE

下面的片段抓住了我希望是问题的本质。如果可以对其进行修改以使其工作,我应该能够将相同的解决方案应用于我的实际代码,以使其按我想要的方式运行。

当前行为

程序在运行时挂起,在result = loop.run_until_complete(task).

验收标准

程序终止并foostdout.

更新 1

根据文森特的帮助,我对我的代码进行了一些更改。

Relay.enabled(等待来自 Web 服务的响应以处理它们的子命令)现在实现如下:

我不太确定observable 的await行为如何RxPY——它们会在生成的每个元素上将执行返回给调用者,还是仅在 observable 完成(或出错?)时返回执行。我现在知道是后者,老实说,这感觉像是更自然的选择,并且让我让这个函数的实现感觉更加优雅和反应灵敏。

我还修改了生成模拟 Web 服务响应的测试步骤:

不幸的是,这将无法正常工作,因为 CLI 正在其自己的线程中被调用...

并且 CLI 在调用时会创建自己的线程私有事件循环......

我认为我需要一种方法来允许我的测试步骤在新线程上调用 CLI ,然后获取它正在使用的事件循环

更新 2

似乎没有一种简单的方法来获取特定线程为其自身创建和使用的事件循环,因此我接受了 Victor 的建议并模拟asyncio.new_event_loop返回一个我的测试代码创建和存储的事件循环:

我将“收到的模拟网络响应”测试步骤更改为执行以下操作:

好消息是,当执行此步骤时,我实际上正在Relay.enabled触发协程!

现在唯一的问题是最后的测试步骤,我等待在自己的线程中执行 CLI 并验证 CLI 是否正在发送它的未来stdout

我已经尝试过解决这个问题,但我似乎无法context.async_result(从 存储未来loop.run_in_executor)很好地过渡到done并返回结果。在当前的实现中,第一个测试( )出现错误,第二个测试(1.1)出现无限期挂起1.2

第 3 章:结局

搞砸所有这些异步多线程的东西,我太笨了。

首先,不要像这样描述场景......

我们是这样描述的:

实施新的给定步骤:

繁荣

0 投票
2 回答
135 浏览

python - 为什么重复(n)不适用于反应式扩展中的创建

74 78 94 59 79 76

序列,而我希望每个数字将重复 6 次

所以“重复”永远不会对使用 create 方法创建的 observables 起作用。

0 投票
1 回答
291 浏览

python-3.x - 当对象的任何属性随 RxPy 发生变化时触发函数

在 RxPy 中,有什么类似于这里INotifyPropertyChanged提到的 .NET 框架的东西吗?我正在尝试向对象添加观察者,以便对象的任何属性发生更改,都会调用一个函数。

0 投票
1 回答
502 浏览

python - RxPy - 为什么排放与合并运营商交错?

所以我在做 RxJava 和 RxKotlin 两年后开始学习 RxPy。我注意到的一件事是某些运算符会导致在 RxJava 中不会发生的疯狂交错。

例如,对于一个简单的源,flat_map()将导致排放无序交错。Observable

输出:

然而,使用 RxJava 或 RxKotlin,一切都保持有序。

输出:

我确认一切都在运行MainThread并且没有奇怪的异步调度正在进行(我认为)。

为什么 RxPy 会这样?我注意到这几乎发生在任何处理多个Observable源合并在一起的操作员身上。默认调度程序到底在做什么?

另外,为什么concat_map()RxPy 中没有?我的印象是,调度的工作方式在某种程度上是不可能的......

0 投票
1 回答
99 浏览

python - buffer_with_count 行为在可观察的区间上与在可观察的范围上不同。为什么?

我正在尝试RxPy,但我不理解buffer_with_count操作员的这种行为:

场景 1:可观察的区间

这个按我的预期工作。

无缓冲

带缓冲区(计数=3)

场景 2:可观察的范围

这个不会产生预期的缓冲输出

无缓冲

带缓冲区(计数=3)

在第二种情况下发生了什么?

非常感谢!

0 投票
0 回答
111 浏览

python - Rx - 实时触发股票价格变化?

作为一个类的一个例子,我试图找出最强大的方法来创建一个热点Observable,它为给定的股票代码发出当前价格,但只有在它发生变化时才会发出价格。我自己能想出的最好办法是创建一个间隔源Observable,它将每 3 秒查询一次 Google Finance,解析价格,并且只有在价值通过distinct_until_changed().

有谁知道一种更有效的方法来做到这一点,并且不必每隔一段时间重新查询,而是在价格实际变化时触发源?如果它过于复杂并且需要一些专门的库,我会坚持我所拥有的。我只是觉得我错过了一个更好的方法来做到这一点......

0 投票
1 回答
1139 浏览

python - RxPy - 将实时 Twitter 流变成 Rx Observable?

我按照这个很棒的教程使用 tweepy 在 Python 中利用实时 Twitter 流。这将实时打印提及 RxJava、RxPy、RxScala 或 ReactiveX 的推文。

这是通过RxPy变成ReactiveX Observable的完美候选。但是我究竟如何把它变成一个热源呢?我似乎无法在任何地方找到有关如何执行...ObservableObservable.create()

0 投票
1 回答
382 浏览

python - 从 RxPy observable 更新 pyqtgraph 图

在以下脚本中,将创建一个绘图窗口并values正确传递给plot.update. 然而,剧情并没有更新。我究竟做错了什么?

0 投票
1 回答
298 浏览

python - RxPY - 如何使用 stop_and_wait?

我尝试在 Python 中翻译以下 JavaScript 代码示例:

该片段取自RxJS 发行说明。我的 Python 解释如下所示:

不幸的是,虽然 JavaScript 版本运行良好,但 Python 版本由于“StopAndWaitObservable”对象没有“订阅”属性而失败。

0 投票
2 回答
394 浏览

python - 在 reactx python 中总结 Observables

使用 Python 中的 ReactiveX,我如何总结 Observables 流?

我有一个字典流是 {"user": "...", "date": ...}。我想创建一个函数,我可以应用该函数为每个用户累积具有最新日期的字典,然后在流结束时发出累积的 observables(它就像 max,但必须查看用户字段,并且会发出多个值)。

示例 - 输入流:

预期输出(顺序无关紧要)

我在https://ninmesara.github.io/RxPY/api/operators/index.html阅读了“Filtering Observables”、“Transforming Observables”、“Combining Observables”和“Decision Tree of Observable Operators” ,并查看了 reduce /aggregate(仅在末尾发出单个值)和 flat_map(不知道如何检测流的结尾)。many_select 和 window (尤其是)看起来很有希望,但我很难理解它们。

我如何使用 rx 来做到这一点(通过使用现有的运算符之一,或者通过制作自定义运算符 [我还不知道该怎么做]?)