0

在 ReactiveX 中,我可以从多个 observables 中获取最新值,每个 observables 可能会或可能不会以不同的频率发射,如下所示(使用 RxPY):

from __future__ import print_function

from rx import Observable
import time
import IPython
import random

random.seed(123)

x1 = Observable.interval(random.randint(50, 500))
x2 = Observable.interval(random.randint(50, 500))
x3 = Observable.interval(random.randint(50, 500))

xc = Observable.combine_latest(x1, x2, x3, lambda a1, a2, a3: [a1, a2, a3])

xc.subscribe(lambda s: print(s))

input("Press Enter to end")

但是,当使用 group_by 创建所述可观察对象时,每当任何可观察对象发出值时,我将如何做同样的事情,即从一组可观察对象中打印最新值?

from __future__ import print_function

from rx import Observable
import time
import IPython
import random

random.seed(123)

n = 5
xx = Observable.interval(random.randint(50, 500)).group_by(lambda x: x % 5) # create n observables
print(xx)

事情是这会返回一个可观察的组对象:

<rx.core.anonymousobservable.AnonymousObservable object at 0xb631bb10>

那么对于任何给定的 n,我将如何对这个对象执行相同的 combine_latest 操作?

我知道在这个程式化的示例中,可观察对象将以相同的速率发射,但我需要根据顶部的显式示例推广到不同发射频率的解决方案。

鉴于 RxPY 和 RxJS 的结构非常相似,我很乐意考虑类似的 RxJS 答案。

4

1 回答 1

1

编辑:如果组的数量是有限的并且非常小,这是一种更简洁的方法,您可以在不牺牲太多性能的情况下进行此操作:

groups$.scan(lambda prev, group$: [
               Observable.combineLatest(prev + [group$])\
                         .map(lambda nest: nest[0] + [nest[1]] if len(nest) == 2 else nest)
             ], [])

在这里,scan构建一个 Observable,它在任何组发出的任何时候发出。它通过嵌套上一次迭代中组列表中的排放来做到这一点。因为prev是可观察的数组,所以我们使用mapwith list concat 进行展平。包装scan结果的数组适应第一组的基本情况。


编辑:在将答案写在该行下方之后,我意识到它的行为(在所有流都发出另一个项之前不会发出任何项)可能是不需要的。在我的脑海中,最短但肯定不是最干净的解决方案是存储最新的值并使用它们启动每个流,如下所示:

def group_combine(groups$):
  last = []

  def mutate_last(v): # too bad this isn't JS :/
    last = v

  return groups$.scan(lambda streams, next_stream: streams + [next_stream], [])\
                .switch_map(lambda streams: Observable.combine_latest(
                  [stream.start_with(last[i]) if i < len(last) else stream\
                     for i, stream in enumerate(streams)]
                ))\
                .for_each(mutate_last)

试试这个:

// groups$ = x.group_by(...)
groups$.scan(lambda streams, next_stream: streams + [next_stream], [])\
       .switch_map(lambda streams: Observable.combine_latest(streams))\
       .subscribe(lambda values: ...)

首先,我们将组流累积到一个不断增长的列表中,然后我们不断切换到该combine_latest列表之上。

于 2017-10-23T00:27:07.967 回答