问题标签 [rx-py]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
50 浏览

python - 如何向反应性主题添加许多值?

我们有一个s = Subject()对象和一组对其值的订阅。我们在存储桶中接收数据(比如 1000 个项目),当前执行:Observable.from_(data.items).subscribe( lambda x: s.on_next(x))激活订阅者并将数据传递给他们。是否有任何更简单(代码更短)的表达式来填充Subject新项目?

0 投票
2 回答
169 浏览

python - 反应式编程将结果存储在一个变量中

快速(琐碎)问题:我找不到将一系列操作的输出存储在外部变量中的可观察对象的方法。例如这样的:

不确定这是否非常“反应性”,但应该是微不足道的。

提前致谢

C

0 投票
1 回答
353 浏览

reactivex - ReactiveX Observable 的批处理结果

假设我有一个看起来像这样的可观察对象(这是 Python,但应该对所有语言都通用):

rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

我希望最终能够将整数批处理成长度为 5 的列表,并能够将它们传递给函数,所以是这样的:

实际上,传入的数据将是(可能为空的)列表的无限流。我只是想确保batch_function在累积 5 个实际值之前不会进行后续调用。谢谢你的帮助。

0 投票
1 回答
1238 浏览

python - 通过 rxpy 从 observable 返回一个值

将 rx.Observable 对象转换为函数中的“普通”对象的优雅方法是什么?

例如:

我知道 subscribe 的返回是一个一次性对象,实现它的“丑陋”方式是:

有没有更好的方法来做到这一点?

谢谢。

0 投票
2 回答
568 浏览

python - 在 ReactiveX 中,如何将其他参数传递给 Observer.create?

使用 RxPY 进行说明。

我想从一个函数创建一个 observable,但该函数必须带参数。这个特定示例必须以随机间隔返回我要发送给它的许多预定义代码之一。到目前为止,我的解决方案是使用闭包:

有没有更简单的方法?是否可以将不需要闭包的更多参数发送到 Observable.create 的第一个参数函数?什么是规范建议?

0 投票
1 回答
561 浏览

rxjs - 如何将 RxPY(或 RxJS)combine_latest 与 group_by observable 一起使用

在 ReactiveX 中,我可以从多个 observables 中获取最新值,每个 observables 可能会或可能不会以不同的频率发射,如下所示(使用 RxPY):

但是,当使用 group_by 创建所述可观察对象时,每当任何可观察对象发出值时,我将如何做同样的事情,即从一组可观察对象中打印最新值?

事情是这会返回一个可观察的组对象:

那么对于任何给定的 n,我将如何对这个对象执行相同的 combine_latest 操作?

我知道在这个程式化的示例中,可观察对象将以相同的速率发射,但我需要根据顶部的显式示例推广到不同发射频率的解决方案。

鉴于 RxPY 和 RxJS 的结构非常相似,我很乐意考虑类似的 RxJS 答案。

0 投票
0 回答
17 浏览

python - 为什么这个不同的 RxPY observables 列表总是发出相同的东西?

使用列表推导,我创建了一个包含 10 个不同 observables 的列表。然而,尽管它们的时间不同,它们总是打印相同的东西(“ticker_9”)。

首先,创建一个包含 10 个代码的列表,命名为ticker_0 到ticker_9。然后,它使用列表推导式创建 10 个可观察对象,这些可观察对象以 100 毫秒(加 10)的倍数递增。每个 observable(应该)发出不同的代码。然而,它们都发出ticker_9 (即列表中的最后一个ticker),即使列表理解中的x 变量识别,因为它们以不同的频率发出。

这里发生了什么?为什么我没有得到 olist[0] 的ticker_0、olist[1] 的ticker_1 等?

观察:

或者:

......同样的事情,即。整个时间ticker_9。应该是ticker_0 和ticker_5,即使在第二种情况下,排放速度要慢得多。

为什么我的列表理解不能为每个可观察对象创建不同的ticker_x,即使它间隔内有效?

0 投票
1 回答
45 浏览

python - 用于调用不带参数的 on_next 函数的 Void/Unit 类型

我觉得我可能在这里遗漏了一些非常明显的东西,但无论如何......

使用 RxPY 时,是否有等效的 C# RXSystem.Reactive.Unit类型来调用on_next不带参数的函数?

如果有人好奇我为什么要这样做,那是因为我希望在没有x参数的情况下调用下面代码中的 start 函数

0 投票
1 回答
427 浏览

rxjs - 延迟订阅的开始和停止事件的 RX 流

我正在尝试制作开始和停止事件的主题,其中迟到的订阅者只会收到未完成的开始事件。IE。那些没有相应停止事件的人。

这是一些 RxPY 代码:

这给出了输出:

而我想:

我认为像扫描运算符这样的东西是必需的,但不能完全放在一起。感激地收到任何想法:)

0 投票
4 回答
386 浏览

python - 将单词拆分成字母字典 rxpy

我正在学习 RxPY ,所以我需要编写一些代码,可以将每个单词的第一个字符分开。结果必须如下所示:

我试过的。

那么,我该如何解决这个问题呢?我正在考虑按每个单词的第一个字符对它进行分组,但我不知道如何。