问题标签 [rx-py]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 为什么 RxPY 的 TwistedScheduler 会引发 AlreadyCalled 错误?
最小的工作示例 - 使用RxPY (v.1.2.4) 逐字发送'Hello world'
到回显服务器的客户端。 Observable
客户:
服务器:来自扭曲示例的简单回显服务器——echoserv.py
在服务器运行的情况下,启动客户端会给出:
我究竟做错了什么?
rx-java - Rx:一个类似 zip 的运算符,在其中一个流结束后继续?
我正在寻找组合异步开始和结束的流(observables):
我需要它:将音频流添加在一起。它们是音频“块”流,但我将在这里用整数表示它们。所以播放了第一个剪辑:
然后第二个开始,稍晚一点:
将它们按总和组合的结果应该是:
但是,如果任何压缩流结束,标准 zip 就会完成。即使其中一个流结束,我也希望这个 optional_zip 继续运行。有没有办法在 Rx 中做到这一点,或者我必须通过修改现有的 Zip 自己实现它?
注意:我使用的是 RxPy,但这里的社区似乎很小,而且 Rx 运算符似乎在各种语言中都很通用,所以我也将它标记为 rx-java 和 rx-js。
python - 在 RX 中构建双重无限轮询行为
问题是用 RX 模拟双循环行为:
如果将两个循环替换为两个可观察对象,则将是干净的,其中一个充当外部的观察者,而do_something(value)
可以自己替换为观察者。任何异常也可以很好地处理。外循环需要阻塞,但内循环可能不是,因为我正在尝试使用外循环来处理异常,使用带有退避函数的重试函数。
到目前为止,我可以使用以下方法构建序列:
但是我怎样才能在外部的阻塞模式下制作类似的结构呢?
python - 为什么 RxPY Observable 充当无限迭代?
我有一个错误,它不小心将 anObservable
用作可迭代对象。对于大多数对象,这通常很容易检测到:
然而,对于一个 Rx observable,它会默默地使 Python 崩溃:
MWE:
没有回溯,也没有迹象表明有任何问题。这使得已经很难调试的并发响应式代码更难调试——我花了 2 个小时才最终找到这个。
仔细观察,迭代 anObservable
似乎会创建新的 observables,尽管我不知道从哪里可以观察到 observable 没有__iter__
方法。
这是一个错误,还是一个功能?Observable
s是否可以迭代?
python - 如何在完成另一个 observable 时处理 observable?
我有一个source
observable,我订阅了一个logger
观察者来进行日志记录。
我也订阅了,source
所以我可以执行计算。当我的计算完成后,我已经完成source
并且我想处理logger
:
但是,logger
并没有在正确的时间完全处理——通常会发生一两个额外的滴答声:
MWE
但我得到:
7是如何被记录的?我们的计算应该在发出 5 次后终止source
,此时logger
会处理掉。
我究竟做错了什么?
python - 在 RxPy 中使用 first()
如何从Observable
RxPy 中的序列中恢复元素
应该打印 1,但它返回另一个AnonymousObservable
,而不是元素。
一般来说,从Observable
序列中恢复元素的最佳运算符是什么?
python - 如何从 Rx Observable 中提取值?
我正在尝试将一些 ReactiveX 概念集成到现有项目中,认为这可能是一种很好的做法,也是一种使某些任务更清洁的方法。
我打开一个文件,从它的行创建一个 Observable,然后进行一些过滤,直到我得到我想要的行。现在,我想使用 re.search() 从其中两行中提取一些信息以返回特定组。我一生都无法弄清楚如何从 Observable 中获取这些值(不将它们分配给全局变量)。
代替.subscribe()
该流的末尾,我尝试使用.to_list()
来获取一个列表,我可以在该列表上迭代“正常方式”,但它只返回一个类型的值:
<class 'rx.anonymousobservable.AnonymousObservable'>
我在这里做错了什么?
我见过的每个 Rx 示例都只打印结果。如果我希望它们在我可以同步使用的数据结构中怎么办?
python - RxPy with_latest_from 产生不一致的结果
我正在使用group_by
一个Observable
但是对于每个新创建的组,我想捕捉导致该组创建使用的元素(使用新键)with_latest_from
:
我希望看到以下两个都被打印出来,但我每次只看到一个。
在奇怪的情况下,我还看到 snap 元素为 2:
任何想法出了什么问题?
python - RxPy 有 `flatten` 运算符吗?
RxPy 是否有一个方便的flatten
运算符,相当于flat_map(identity)
?还是已经默认了flat_map
?selector
identity