0

我正在使用group_by一个Observable但是对于每个新创建的组,我想捕捉导致该组创建使用的元素(使用新键)with_latest_from

>>> from __future__ import print_function
>>> from rx import Observable

>>> # sequence 1, 2, 3, ... every half a second
>>> observable=Observable.interval(500).map(lambda x: x + 1)

>>> # groups into numbers that are divisible by 3 (True) and those that are not (False)
>>> grouped = observable.group_by(lambda x: bool(x%3))

>>> # groups paired with the first element that kicked off the group
>>> grouped.with_latest_from(observable, lambda group, element: (group, element)).subscribe(print)

我希望看到以下两个都被打印出来,但我每次只看到一个。

(<rx.linq.groupedobservable.GroupedObservable object at 0xabc>, 1)  # 1 is the element that created group with key=False
(<rx.linq.groupedobservable.GroupedObservable object at 0xdef>, 3)  # 3 is the element that created group with key=True

在奇怪的情况下,我还看到 snap 元素为 2:

(<rx.linq.groupedobservable.GroupedObservable object at 0x0313EB10>, 2)

任何想法出了什么问题?

4

1 回答 1

0

来自达格布拉特利

问题似乎是您基本上是在带有自身的流上使用 with_latest_from() 。因此,如果 source 或 latest 将首先触发,则无法保证。如果新组的源之前的最新触发器,那么它将丢失。解决它的一种方法是获取每个组中第一个元素的单独流,并将其与组流一起压缩:

# A stream with the first element if each group
firsts = grouped.flat_map(lambda group: group.first())

# groups paired with the first element that kicked off the group
grouped.zip(firsts, lambda g, k: (g, k)).subscribe(print)

另请注意,每个组还包含作为模运算符结果的键,如果它可以用来代替元素:

grouped.map(lambda gr: (gr, gr.key)).subscribe(print)
于 2016-05-28T13:12:24.743 回答