4

测试 1 通过。为什么 Test2 和 Test3 失败?我正在使用 .NET 4.0 和 Rx 2.0。

[TestClass]
public class RxQuestion
{
   private Subject<string> sequence;
   [TestInitialize] public void Intialize() { sequence = new Subject<string>(); }
   [TestMethod] public void Test1() { Test(sequence); }
   [TestMethod] public void Test2() { Test(sequence.Where(s => true)); }
   [TestMethod] public void Test3() { Test(sequence.OfType<string>()); }
   private void Test(IObservable<string> observable)
   {
      var observed = string.Empty;
      observable.Subscribe(s => { observed = s; if (s == "a") throw new Exception(); });
      try { sequence.OnNext("a"); } catch { }
      sequence.OnNext("b");
      Assert.AreEqual("b", observed);
   }
}
4

2 回答 2

13

对我来说真正的问题是为什么 Test1 通过?对我来说,该类型似乎与Subject<T>所有其他实现的规则不同IObservable<T>

在更深入的检查(实际上是很好的反射)中,您可以将Subject<T>DotPeek/Reflector 中的类型分开,并看到在进行OnNext(T)调用时,它被直接委托给它的_observer实例。在任何订阅之前,这只是一个 NullObject/NopObserver。订阅后(通常),观察者是一个Observer<T>实现。这个实现实际上是IObserver<T>接口的复合模式实现,它只调用OnNext(T)它的每个实例。

此外,如果我们考虑使用仅接受 OnNext 处理程序的 Subscribe 的扩展方法,我们现在知道我们对 the 的真正实现IObserver<T>是一个AnonymousObserver<T>. 打开它,我们看到任何对的调用在OnNext(T)很大程度上是不受保护的。

现在让我们将其与or运算符的IObservable<T>实现进行比较。这两种扩展方法都会返回一个扩展类的实现。当订阅这些可观察序列之一时,订阅观察者被一个实现包装。这是关键的区别WhereCastIObservable<T>Producer<T>SafeObserver<T>

如果我们查看这个实现,我们会看到对于我们的代码路径,我们拥有的匿名观察者将调用其 MakeSafe 方法。这现在用 try/finally 包装对 OnNext 的任何调用。

public void OnNext(T value)
{
    if(this.isStopped!=0)
        return;
    bool flag = true;
    try
    {
        this._onNext(value);
        flag=true;        //Flag only set if OnNext doesn't throw!!
    }
    finally
    {
        if(!flag)
            this._disposable.Dispose();
    }
}

请注意,一旦我们有一个如上所述的安全观察者,如果任何OnNext处理程序抛出,那么flag将不会设置为 true 并且_disposable实例被释放。在这种情况下,_disposable实例代表订阅。

因此,您可以解释为什么 rawSubject通过测试以及看似无害的操作员在哪里导致行为改变。

As to why Subject<T> does not behave like this by default, I imagine this is due to the performance improvements that were made in the 2.0 release. I get the feeling that subjects are tuned for raw performance and take the assumption that if you are brave enough to be using them, then you know what you are doing (i.e. not throwing in your OnNext handlers!). I base this assumption that they also removed the concurrency safety from subjects by default and you have to turn it on with the Synchronize() extension method, maybe they also thought all of those extra try/finally calls should only be paid for if you opt in. One way of opting in to these safety features is to do what you have done above Where(_=>true) or more commonly AsObservable().

于 2013-04-02T09:52:50.900 回答
2

根据定义, anIObservable<T>不应OnErroran或 an之后发出任何通知OnCompleted。因此,当您打电话时,sequence.OnNext("b");您实际上违反了IObservable<T>应遵守的隐含合同。

这样做的原因WhereOfType行为是因为它们(适当地)在OnError您生成之后忽略任何通知。

于 2013-04-01T18:36:41.640 回答