2

在学习 Rx 时,我遇到了一个经常重复的关于Observables的规则,该规则在The Observable Contract中有详细说明。

在发出 OnCompleted 或 OnError 通知后,它可能不会再发出任何进一步的通知。

这对我来说很有意义,因为让 Observable 在完成后继续产生值会令人困惑,但是当我在 .NET 中测试 Observable.Range 方法时,我注意到它没有表现出这种行为,事实上许多Observable 违反了这条规则。

var rangeObservable = Observable.Range(0, 5);

rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done first!"));
Console.ReadLine();

rangeObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Done second!"));
Console.ReadLine();

//Output:
//0
//1
//2
//3
//4
//Done first!

//0
//1
//2
//3
//4
//Done second!

显然在第一次之后rangeObservable调用OnComplete了两次并产生了值OnComplete。这让我相信这不是关于Observables的规则,而是关于Subscriptions的规则。也就是说,一个Observable可以产生任意多的终止消息,甚至在它产生之后产生值,只要每个Subscription只接收一个终止消息并且之后不再接收任何消息。

当它说Observable时,它​​们实际上是指Subscription吗?它们真的是不同的东西吗?我对模型有根本的误解吗?

4

2 回答 2

4

Observable.Range返回一个的observable,这意味着它为每个订阅者“重播”它的行为。由于“OnNext* OnComplete|OnError”合同仅适用于订阅,这完全没问题。

有关热/冷 observables 的更多信息,请参阅我对“Rx 中的 IConnectableObservables”的回答

于 2016-12-31T10:21:07.040 回答
4

observable 合约必须对任何被观察的 Observable 有效。在 Observable 未被观察时是否发生任何事情都留给 observable 的实现。

考虑 Enumerable 中的类比会有所帮助 - Observable 是 Enumerable 的对偶。在可枚举中,您将拥有 range = Enumerable.Range(0, 5),并且您将使用与上述类似的范围:

range.ForEach(Console.WriteLine); //prints 0 - 4

range.ForEach(Console.WriteLine); //prints 0 - 4 again

并发现这是完全可以接受的行为,因为实际的数字生成器只有在GetEnumerator被调用时才会创建。同样,在 Observable 中,等价的方法是Subscribe.

range 的实现类似于:

        static IObservable<int> Range(int start, int count)
        {
            return Observable.Create<int>(observer =>
            {
                for (int i = 0; i < count; i++)
                    observer.OnNext(start + i);

                observer.OnCompleted();

                return Disposable.Empty;
            });
        }

在这里,observer => {...}每次有订阅时都会调用该函数。工作在 subscribe 方法中完成。您可以很容易地看到它 (1) 为每个观察者推送相同的序列,(2) 每个观察者只完成一次。

这些只有在你观察到它们时才会发生某些事情的可观察对象称为冷可观察对象。这是一篇描述这个概念的文章。

笔记

Range是一个非常幼稚的实现,仅用于说明目的。该方法在完成之前不会返回一次性用品 - 所以Disposable.Empty是可以接受的。正确的实现将在调度程序上运行工作,并使用已检查的一次性来查看订阅是否已被释放,然后再继续循环。

要点是手动实现可观察合约是困难的,这就是 Rx 库存在的原因——通过组合构建功能。

于 2016-12-31T06:21:25.257 回答