对我来说真正的问题是为什么 Test1 通过?对我来说,该类型似乎与Subject<T>
所有其他实现的规则不同IObservable<T>
。
在更深入的检查(实际上是很好的反射)中,您可以将Subject<T>
DotPeek/Reflector 中的类型分开,并看到在进行OnNext(T)
调用时,它被直接委托给它的_observer
实例。在任何订阅之前,这只是一个 NullObject/NopObserver。订阅后(通常),观察者是一个Observer<T>
实现。这个实现实际上是IObserver<T>
接口的复合模式实现,它只调用OnNext(T)
它的每个实例。
此外,如果我们考虑使用仅接受 OnNext 处理程序的 Subscribe 的扩展方法,我们现在知道我们对 the 的真正实现IObserver<T>
是一个AnonymousObserver<T>
. 打开它,我们看到任何对的调用在OnNext(T)
很大程度上是不受保护的。
现在让我们将其与or运算符的IObservable<T>
实现进行比较。这两种扩展方法都会返回一个扩展类的实现。当订阅这些可观察序列之一时,订阅观察者被一个实现包装。这是关键的区别。Where
Cast
IObservable<T>
Producer<T>
SafeObserver<T>
如果我们查看这个实现,我们会看到对于我们的代码路径,我们拥有的匿名观察者将调用其 MakeSafe 方法。这现在用 try/finally 包装对 OnNext 的任何调用。
public void OnNext(T value)
{
if(this.isStopped!=0)
return;
bool flag = true;
try
{
this._onNext(value);
flag=true; //Flag only set if OnNext doesn't throw!!
}
finally
{
if(!flag)
this._disposable.Dispose();
}
}
请注意,一旦我们有一个如上所述的安全观察者,如果任何OnNext
处理程序抛出,那么flag
将不会设置为 true 并且_disposable
实例被释放。在这种情况下,_disposable
实例代表订阅。
因此,您可以解释为什么 rawSubject
通过测试以及看似无害的操作员在哪里导致行为改变。
As to why Subject<T>
does not behave like this by default, I imagine this is due to the performance improvements that were made in the 2.0 release. I get the feeling that subjects are tuned for raw performance and take the assumption that if you are brave enough to be using them, then you know what you are doing (i.e. not throwing in your OnNext
handlers!). I base this assumption that they also removed the concurrency safety from subjects by default and you have to turn it on with the Synchronize()
extension method, maybe they also thought all of those extra try/finally calls should only be paid for if you opt in. One way of opting in to these safety features is to do what you have done above Where(_=>true)
or more commonly AsObservable()
.