1

我正在尝试使用 RxPY 将 ReactiveX 集成到我的 GUI 中。这是一个通用的 ReactiveX 问题。

假设我有一个依赖于多个 Observable 流的可视化,使用combine_latest(stream1, stream2, plot_function). 当一个 Observable 发生变化时,这很有效,比如当用户修改一个值时;使用新值更新可视化。但是,有时两个 Observable 会同时更新,例如当用户从单个文件加载两个流的数据时。从技术上讲,一个 Observable 将在另一个之前更新(以导入函数中的先到者为准)。结果,情节将被更新两次,但出于所有意图和目的,它只需要更新一次。

我的一些可视化计算成本很高,所以我想确保如果两个流同时更新那么组合流只发出一个值。我能想到几个选择:

  1. 在组合流上使用debounce()小超时(如 50 毫秒)。这种方法对我来说似乎很脏。

  2. 不要combine_latest直接使用。将两个流包装在一个新对象中,该对象也具有某种updating标志。如果我将标志设置为 True,那么在我将标志设置为 Falseupdating之前不要发出任何内容。updating这种方法感觉是有状态的,它破坏了流的可组合性。

  3. 告诉所有可视化在所有流都更新之前不要更新。同样,这打破了封装,因为可视化不应该关心上游发生了什么。它应该只从组合流中接收新值并制作漂亮的图片。

  4. 使可视化足够细化,以至于首先更新一个流只会带来很小的性能损失。这对于某些可视化来说是不可能的,例如基于点和网格大小计算网格的可视化。如果点网格大小发生变化,则需要重新计算整个网格。

Rx 中是否有一些工具可以处理“同时”更新多个流?我觉得我要求的是魔法。

对于使用 Rx 制作 GUI 程序的任何人:除了通过流发送新值之外,我还应该为模型使用一些更好的架构吗?

如果这个问题不清楚,请在评论中告诉我,我会尝试做一个更具体的例子。

例子

这是一个示例 Python RxPY 程序:

import rx

stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)

rx.Observable\
  .combine_latest(stream1, stream2, lambda x, y: (x, y))\
  .subscribe(print)

stream1.on_next(3)
stream2.on_next(4)

这打印:

(1, 2)
(3, 2)
(3, 4)

我怎样才能同时更新 和 的值,stream1以便stream2以下成为结果?

(1, 2)
(3, 4)

换句话说,我怎么能combine_latest以这样一种方式进行修改,以便我可以在下游告诉它“嘿,等一下,我会在你发出下一个值之前更新其他流”?

4

2 回答 2

0

我找到了一个可能的答案,但这不是最好的,我想要其他答案。

我发现了pausable组合器。通过传入发出 True 或 False 的流,您可以控制是否暂停序列。这是我的示例的修改:

import rx

stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)

pauser = rx.subjects.BehaviorSubject(True)
rx.Observable\
         .combine_latest(stream1, stream2, lambda x, y: (x, y))\
         .pausable(pauser)\
         .subscribe(print)

# Begin updating simultaneously
pauser.on_next(False)
stream1.on_next(3)
stream2.on_next(4)

# Update done, resume combined stream
pauser.on_next(True)

# Prints:
# (1, 2)
# (3, 4)

要应用于我的 GUI,我可以在我的模型中创建一个BehaviorSubject调用updating,它会发出整个模型是否正在更新。例如,如果stream1stream2正在同时更新,那么我可以设置updating为 True。在任何制作成本高昂的可视化上,我可以应用 的值updating来暂停组合流。

于 2015-08-19T22:52:49.363 回答
0

这适用于 C# 的 Rx:

var throttled = source.Publish(hot => hot.Buffer(() => hot.Throttle(dueTime));

这里的dueTime值是TimeSpan.NET 中的 a。它只是说明您希望在它产生价值之前处于不活动状态的时间窗口是多少。这基本上吞噬了一段时间内“同时”产生的值。

source这种情况下,这将是您的.combine_latest(...)可观察的。

于 2015-08-19T23:04:15.120 回答