问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
249 浏览

python - 为什么 RxPY 的 TwistedScheduler 会引发 AlreadyCalled 错误?

最小的工作示例 - 使用RxPY (v.1.2.4) 逐字发送'Hello world'到回显服务器的客户端。 Observable

客户

服务器:来自扭曲示例的简单回显服务器——echoserv.py

在服务器运行的情况下,启动客户端会给出:

我究竟做错了什么?

0 投票
1 回答
292 浏览

python - 如何找到我正在使用的 RxPY 版本?

我不记得我安装了哪个版本的RxPY,很明显似乎不起作用:

编辑

此外,help不包含版本:

0 投票
2 回答
2485 浏览

rx-java - Rx:一个类似 zip 的运算符,在其中一个流结束后继续?

我正在寻找组合异步开始和结束的流(observables):

我需要它:将音频流添加在一起。它们是音频“块”流,但我将在这里用整数表示它们。所以播放了第一个剪辑:

然后第二个开始,稍晚一点:

将它们按总和组合的结果应该是:

但是,如果任何压缩流结束,标准 zip 就会完成。即使其中一个流结束,我也希望这个 optional_zip 继续运行。有没有办法在 Rx 中做到这一点,或者我必须通过修改现有的 Zip 自己实现它?

注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在各种语言中都很通用,所以我也将它标记为 rx-java 和 rx-js。

0 投票
2 回答
260 浏览

python - 在 RX 中构建双重无限轮询行为

问题是用 RX 模拟双循环行为:

如果将两个循环替换为两个可观察对象,则将是干净的,其中一个充当外部的观察者,而do_something(value)可以自己替换为观察者。任何异常也可以很好地处理。外循环需要阻塞,但内循环可能不是,因为我正在尝试使用外循环来处理异常,使用带有退避函数的重试函数。

到目前为止,我可以使用以下方法构建序列:

但是我怎样才能在外部的阻塞模式下制作类似的结构呢?

0 投票
2 回答
1231 浏览

python - 为什么 RxPY Observable 充当无限迭代?

我有一个错误,它不小心将 anObservable用作可迭代对象。对于大多数对象,这通常很容易检测到:

然而,对于一个 Rx observable,它会默默地使 Python 崩溃:


MWE:

没有回溯,也没有迹象表明有任何问题。这使得已经很难调试的并发响应式代码更难调试——我花了 2 个小时才最终找到这个。


仔细观察,迭代 anObservable似乎会创建新的 observables,尽管我不知道从哪里可以观察到 observable 没有__iter__方法。

这是一个错误,还是一个功能?Observables是否可以迭代?

0 投票
1 回答
385 浏览

python - 如何在完成另一个 observable 时处理 observable?

我有一个sourceobservable,我订阅了一个logger观察者来进行日志记录。

我也订阅了,source所以我可以执行计算。当我的计算完成后,我已经完成source并且我想处理logger

但是,logger并没有在正确的时间完全处理——通常会发生一两个额外的滴答声:

MWE

但我得到:

7是如何被记录的?我们的计算应该在发出 5 次后终止source,此时logger会处理掉。

我究竟做错了什么?

0 投票
1 回答
288 浏览

python - 在 RxPy 中使用 first()

如何从ObservableRxPy 中的序列中恢复元素

应该打印 1,但它返回另一个AnonymousObservable,而不是元素。

一般来说,从Observable序列中恢复元素的最佳运算符是什么?

0 投票
1 回答
976 浏览

python - 如何从 Rx Observable 中提取值?

我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清洁的方法。

我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息以返回特定组。我一生都无法弄清楚如何从 Observable 中获取这些值(不将它们分配给全局变量)。

代替.subscribe()该流的末尾,我尝试使用.to_list()来获取一个列表,我可以在该列表上迭代“正常方式”,但它只返回一个类型的值:

<class 'rx.anonymousobservable.AnonymousObservable'>

我在这里做错了什么?

我见过的每个 Rx 示例都只打印结果。如果我希望它们在我可以同步使用的数据结构中怎么办?

0 投票
1 回答
138 浏览

python - RxPy with_latest_from 产生不一致的结果

我正在使用group_by一个Observable但是对于每个新创建的组,我想捕捉导致该组创建使用的元素(使用新键)with_latest_from

我希望看到以下两个都被打印出来,但我每次只看到一个。

在奇怪的情况下,我还看到 snap 元素为 2:

任何想法出了什么问题?

0 投票
1 回答
158 浏览

python - RxPy 有 `flatten` 运算符吗?

RxPy 是否有一个方便的flatten运算符,相当于flat_map(identity)?还是已经默认了flat_mapselectoridentity