问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
709 浏览

python - RxPY - flat_map 排放等待下一个生成器值

你好!我正在尝试完成我的第一个 RxPY 项目,但是我在
理解 Python 中 flat_map 的行为时遇到了一些问题。

在这个项目中,有一个从生成器(Kafka 消费者)创建的 Observable。它在收到消息时发出值,然后根据消息执行查询,并为每个结果发出一个值。

我对代码进行了一些更改,以使其更易于重现。Kafka 消费者被一个生成器取代,该生成器在两次发射之间需要很长时间,查询结果被一个发出 3 个值的 Observable 取代。行为仍然相同。

输出:

我期待这样的事情:

这种行为的原因是什么?我应该怎么做才能得到预期的结果?

谢谢!:)

0 投票
0 回答
95 浏览

python - rxpy:带有随机抖动的buffer_with_time?

我正在使用buffer_with_time,它接受一个关键字参数:

但是,时间戳是固定的。我想给它添加一个随机抖动。

如果我的并行任务同时启动,那么每五秒钟就会有一个固定的任务在同一时间timespan=5000卡住。如何使timespan更多的动态,比如 5000±300ms,以便逐渐均匀地触发缓冲事件?

0 投票
1 回答
3920 浏览

python - 如何从异步函数 websocket.send() 数据

我正在学习 asyncio 并试图弄清楚如何将数据从一个异步函数传递到另一个异步函数中的 websocket 循环。

在我的场景中,某些第三方将数据发布到 Web API。我想将 POST 数据回显到连接的 websocket 客户端。

Sanic 和 Rx 不是必需的,但这是我开始的路径。到目前为止,这是我想出的:

这显然是行不通的,因为 observable_message() 需要消息作为 arg,而我正试图用它来 start_async(),所以我很难过。我怎样才能把这些东西连接起来?

客户端可以是微不足道的:

0 投票
0 回答
121 浏览

flatmap - RxPy:如何将元素集流转换为单个元素流

我有两段代码 st
- 一段在系统中生成一组活动警报流。
- 第二个消耗警报的引发/下降事件。

假设第一部分产生以下流
["a", "b"],
["c"],
["e", "f", "g"],
我想将它们推送为
("a", True),
("b", True),
("c", True),
("a", False),
("b", False),
("e", True),
("f", True),
("g", True),
("c", False).
到系统的第二部分。

我可以做到以下几点

产生以下结果,没关系

但我希望有一个没有主题的解决方案,如下所示

alerts = Observable\ .from_(events)\ .map(lambda x : set(x))\ .scan(lambda (prev, events), curr : (curr, {(i, True) for i in curr - prev} .union({(i, False) for i in prev - curr})), (set(), set()))\ .map(lambda (prev, events) : events)

但它会产生以下内容,这是不正确的,因为您无法从中重建活动事件,最终c变为活动状态。

flat_map不保留顺序,您认为还有其他解决方案吗?

谢谢你,
迈克尔

0 投票
1 回答
555 浏览

reactivex - 反应式:如何使行为主体从可观察到的发射

我将在 android 应用程序中使用 rxandroid。我现在正在尝试在 rxpy 中对行为进行建模,因为它对我来说是最容易设置和使用的。在下面的示例中,source3 发出了正确的数据;这是需要一些时间的初始化和我刚刚伪造的永久订阅的串联。我想要 BehaviorSubject 因为我需要立即使用最后一个值进行字段初始化。

我无法弄清楚如何将 BehaviorSubject 链接到 source3 之上,以便它在记住最后一个值的同时发出源 3。我在互联网上搜索了两天,没有找到关于这个用例的明确方向。这是我的代码,问题是为什么我没有从观察者那里得到任何排放。

0 投票
1 回答
1313 浏览

python - RxPy:如何从外部回调创建 hot observable 并订阅多个异步进程?

我有一个外部服务(ExternalDummyService),我在其中注册了一个回调。我想从该回调创建一个 observable 并订阅多个异步进程。

pyfiddle 中的完整代码:https ://pyfiddle.io/fiddle/da1e1d53-2e34-4742-a0b9-07838f2c13df * 请注意,在 pyfiddle 版本中,“睡眠”被替换为“for i in range(10000): foo + = i" 因为睡眠不能正常工作。

主要代码是这样的:

我得到的输出是这个,两个进程同步运行,并且 ExternalDummyService 在两个进程完成每次执行之前不会发出新值:

我想得到这样的东西,服务发出消息而不等待进程运行并且进程异步运行:

我尝试过使用 share()、ThreadPoolScheduler 和其他我不知道我在做什么的事情。

谢谢!

0 投票
1 回答
506 浏览

python - RxPy:在(慢)扫描执行之间对热可观察对象进行排序

TL;DR 我正在寻求帮助来实现下面的大理石图。目的是尽可能对未排序的值进行排序,而无需在扫描执行之间等待时间。

我不是要求全面实施。欢迎任何指导。 未消耗最小大理石图 我有一个无限热可观察的异步慢速(强制用于测试目的)扫描。这是相关代码:

这是完整版:https ://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd

这是当前输出(值是随机生成的):

我想对扫描执行之间的待处理消息进行排序。因此,第一个发出的消息将始终是第一个被消费的消息,但下一个消费的消息将是直到该点发出的和未消费的消息的最小值(值)(所有这些消息都在当前版本中,因为即时发射)。依此类推……我认为大理石图比我的解释要好。

请注意,扫描不是在等待完成事件,在发出最后一条消息后它没有开始的唯一原因是因为睡眠。在这里,您有另一个版本,其中睡眠已从扫描中删除并放入 ExternalDummyService。您可以看到这些值在它们发出的那一刻就被消耗掉了。这也显示在大理石图中。

我尝试了to_sorted_list,这是我在 RxPy 中找到的唯一排序方法,但我无法让它工作。

我正在寻找的是这样的:

谢谢

0 投票
2 回答
163 浏览

python - 如何监控一系列功能的执行?也许是 RxPy?

我有一个想要执行的函数列表,比如在管道中。

(基本上我编写列表中的函数,请参阅:https ://mathieularose.com/function-composition-in-python/ ):

我想观察函数何时被调用、何时完成、是否失败等,并将其记录到文件或数据库或 MongoDB 等。每个函数都返回一些 python 对象,所以我想使用返回值并记录其属性,例如

如果f1返回一个list我想登录f1 was completed at 23:00 on 04/22/2018. It returned a list of length 5等等

我的问题不是关于执行函数,而是关于观察函数的行为。我希望这些功能与我如何编码管道无关。

我想知道如何在这里实现观察者模式。我知道“观察者模式”听起来太“面向对象”了,所以我的第一个想法是使用装饰器,但在寻找这方面的指南时我发现了RxPy.

因此,我正在寻找有关如何解决此问题的指南。

0 投票
1 回答
296 浏览

python - 订阅响应式源的 Python Web 服务在对象中产生奇怪的行为

我已经使用Falcon实现了一个 Web 服务。此服务存储一个状态机(pytransitions),该状态机在构造函数中传递给服务的资源。该服务使用gunicorn运行。

Web 服务在启动时使用RxPy启动一个进程。中返回的事件on_next(event)用于触发状态机中的转换。

错误

我希望状态机在服务和资源中具有一致的状态,但似乎在资源中状态永远不会改变。

我们有一个尝试重现此行为的测试,但令人惊讶的是,该测试有效

问题

TochoLevel为什么当我使用 gunicorn 启动服务并在rxpyon_next方法中传播状态时,状态机不会更改资源中 的状态

0 投票
2 回答
486 浏览

python-3.x - 如何使用 rxpy/rxjs 延迟事件发射?

我有两个事件流。一个来自电感回路,另一个来自 IP 摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相隔 N 毫秒,我想将它们组合起来(汽车总是首先进入循环),但我也希望每个流中的不匹配事件(任何一个硬件都可能发生故障)都合并到一个流中。像这样的东西:

现在,我当然可以通过良好的 ole Subject 反模式来解决问题:

这不仅相当 hacky,而且虽然我没有观察到它,但我很确定当我使用threading.Timer. 鉴于过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用的情况下做到这一点Subject,但我无法弄清楚。如何做到这一点?

编辑

尽管出于组织和操作方面的原因,我更愿意坚持使用 Python,但我将采用 JavaScript rxjs 的答案并将其移植,甚至可能在 node.js 中重写整个脚本。