7

Unlike other "FRP" libraries, Rx doesn't prevent glitches: callbacks invoked with time-mismatched data. Is there a good way to work around this?

As an example, imagine that we have a series of expensive computations derived from a single stream (e.g. instead of _.identity, below, we do a sort, or an ajax fetch). We do distinctUntilChanged to avoid recomputing the expensive things.

sub = new Rx.Subject();
a = sub.distinctUntilChanged().share();
b = a.select(_.identity).distinctUntilChanged().share();
c = b.select(_.identity).distinctUntilChanged();
d = Rx.Observable.combineLatest(a, b, c, function () { return _.toArray(arguments); });
d.subscribe(console.log.bind(console));
sub.onNext('a');
sub.onNext('b');

The second event will end up causing a number of glitchy states: we get three events out, instead of one, which wastes a bunch of cpu and requires us to explicitly work around the mismatched data.

This particular example can be worked around by dropping the distinctUntilChanged, and writing some wonky scan() functions to pass through the previous result if the input hasn't changed. Then you can zip the results, instead of using combineLatest. It's clumsy, but doable.

However if there is asynchrony anywhere, e.g. an ajax call, then zip doesn't work: the ajax call will complete either synchronously (if cached) or asynchronously, so you can't use zip.

Edit

Trying to clarify the desired behavior with a simpler example:

You have two streams, a and b. b depends on a. b is asynchronous, but the browser may cache it, so it can either update independently of a, or at the same time as a. So, a particular event in the browser can cause one of three things: a updates; b updates; both a and b update. The desired behavior is to have a callback (e.g. render method) invoked exactly once in all three cases.

zip does not work, because when a or b fires alone, we get no callback from zip. combineLatest does not work because when a and b fire together we get two callbacks.

4

1 回答 1

5

这个概念

a 和 b 都更新

其中ab都是可观察的,在 Rx 中不作为原语存在。

没有可以定义的无损通用运算符来决定它何时收到通知,a是应该将它传递给下游还是推迟到它收到来自 的通知b。Rx 中的通知本身并不携带“两种”语义,或任何超出 Rx 语法的语义。

此外,Rx 的串行合同阻止操作员利用重叠通知来实现这一目标。(尽管我怀疑依赖竞争条件并不是您想要的方法。)

请参阅Rx 设计指南中的 §§4.2、6.7 。

因此,我上面所说的“没有可以定义的无损通用运算符……”的意思是,给定两个可观察对象ab具有独立的通知,任何尝试决定何时接收通知ab是否必须推送的运算符立即或等待“其他”值,必须依赖于任意时间。这是猜测。所以这个假设的运算符必须要么丢弃值(例如,DistinctUntilChangedThrottle),要么丢弃时间(例如,ZipBuffer),尽管可能两者兼而有之。

因此,如果代理有能力a单独推送,或者b单独推送,或者ab一起推送作为一个通知单元,那么开发者有责任自己具体化这个通知单元的概念。

需要一个三态类型:a | 乙 | {a,b}

(请原谅我糟糕的JS)

var ab = function(a, b) { this.a = a; this.b = b; }
sub.onNext(new ab('a'));        // process a alone
sub.onNext(new ab('a', 'b'));   // process a and b together
sub.onNext(new ab(null, 'c'));  // process c alone

observable 查询的形状不再重要。必须定义观察者以接受此数据类型。生成器有责任根据其内部状态的语义应用任何必要的缓冲或计时计算,以便为其观察者生成正确的通知。

顺便说一句,感谢您在编辑中提供了一个简单的解释(无论如何我似乎很清楚)。我在这个 Rx 论坛讨论中第一次听说“故障” 。如您所见,它从未真正结束。现在我想知道那个OP的问题是否真的像这样简单,当然,假设我已经正确理解了你的问题。:-)

更新:

这是另一个相关的讨论,包括我对为什么 Rx 不是 FRP 的一些想法:

https://social.msdn.microsoft.com/Forums/en-US/bc2c4b71-c97b-428e-ad71-324055a3cd03/another-discussion-on-glitches-and-rx?forum=rx

于 2014-09-11T04:34:47.610 回答