在 ReactiveX 中,我可以从多个 observables 中获取最新值,每个 observables 可能会或可能不会以不同的频率发射,如下所示(使用 RxPY):
from __future__ import print_function
from rx import Observable
import time
import IPython
import random
random.seed(123)
x1 = Observable.interval(random.randint(50, 500))
x2 = Observable.interval(random.randint(50, 500))
x3 = Observable.interval(random.randint(50, 500))
xc = Observable.combine_latest(x1, x2, x3, lambda a1, a2, a3: [a1, a2, a3])
xc.subscribe(lambda s: print(s))
input("Press Enter to end")
但是,当使用 group_by 创建所述可观察对象时,每当任何可观察对象发出值时,我将如何做同样的事情,即从一组可观察对象中打印最新值?
from __future__ import print_function
from rx import Observable
import time
import IPython
import random
random.seed(123)
n = 5
xx = Observable.interval(random.randint(50, 500)).group_by(lambda x: x % 5) # create n observables
print(xx)
事情是这会返回一个可观察的组对象:
<rx.core.anonymousobservable.AnonymousObservable object at 0xb631bb10>
那么对于任何给定的 n,我将如何对这个对象执行相同的 combine_latest 操作?
我知道在这个程式化的示例中,可观察对象将以相同的速率发射,但我需要根据顶部的显式示例推广到不同发射频率的解决方案。
鉴于 RxPY 和 RxJS 的结构非常相似,我很乐意考虑类似的 RxJS 答案。