1

我在 C# 中使用 .NET 的 RX 库。谁能向我解释为什么“observer.OnCompleted()”方法在以下代码中什么都不做:

var observableStream = Observable.Create<CustomMessage>(
       (observer) =>
           {
               CustomMessage cm = new CustomMessage();
               CustomMessage.Subscribe(observer.OnNext);

               return Disposable.Create(
               () =>
                   {
                       Console.WriteLine("Disposing...");
                       CustomMessage.Unsubscribe(observer.OnNext);                          
                       observer.OnCompleted();                //***Nothing happens here***
                   }
               );
       });

    //IObserver.OnException()
    public override void OnException(Exception e)
    {
        Console.WriteLine("Exception occurred - " + e.Message);
    }

    //IObserver.OnComplete()
    public override void OnUnsubscribe()
    {
        Console.WriteLine("Unsubscribed...");            
    }

    //IObserver.OnNext()
    public override void HandleNextMsg(IRVMessage msg)
    {
        Console.WriteLine("Instance received a message");
    }

IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);

//At some later point....
myDisposable.Dispose();

该代码旨在订阅 CustomMessages 流。在设置订阅时,它会使用我的 CustomMessage 类型注册 observer.OnNext() 方法。然后在处理订阅时取消注册观察者.OnNext()。所有这些都正常工作。每当收到 CustomMessage 时,都会调用我的“HandleNextMsg()”方法。

稍后,当我希望终止订阅时,我调用“Dispose()”并成功执行以下两行:

Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext); 

然后我不再收到 CustomMessages。然而,尽管执行了以下行,但什么也没做:

observer.OnCompleted(); 

我希望它会拨打电话:

Console.WriteLine("Unsubscribed...");

在某些时候,观察者和“OnUnsubscribe”方法之间的连接丢失了,我想确切地了解发生了什么。'observer.OnNext()' 怎么能成功注销,但是'observer.OnCompleted()' 什么都不做呢?

有人向我指出,仅仅因为我正在处理流并不意味着我应该调用“OnCompleted()”,但我仍然想了解它为什么不起作用。

4

2 回答 2

4

OnCompleted() 旨在通知订阅者上游序列(在您的情况下为 CustomMessage )已经结束。这并不意味着确认订阅者请求取消订阅已成功,这似乎是您尝试使用它的方式。OnCompleted() 是关于序列的通知,针对所有订阅者,而不是针对该序列的单个订阅。

换句话说,您不应该在 Dispose 中调用它。毕竟订阅者自己在退订,为什么需要通知呢?

至于什么都没有发生的实际技术原因,我猜测回调(按设计)在您已经处置时不会执行。只是一个理论,它不是很相关。

于 2012-11-26T15:12:36.760 回答
2

您看到的问题是由Observable.Create包装您传递的函数的方式以及您订阅结果的任何观察者引起的IObservable。基本上,流程是:

  • Observable.Create 返回一个 AnonymousObservable。
  • AnonymousObservable 将观察者包装在订阅中的 AutoDetachObserver 中。
  • AnonymousObservable 从订阅返回 AutoDetachObserver(实现 IDisposable)。
  • AutoDetachObserver.Dispose 设置它的停止标志,然后释放原始订阅函数返回的对象。此标志导致观察者忽略对 OnError 和 OnCompleted 的未来调用,从而导致不调用包装的观察者方法。

这个答案基于 RX 的 v1.x,但我希望这在 v2.0 中没有改变。

如果您有一些无论订阅如何结束(OnError、OnCompleted 或 Dispose)都需要运行的代码,我建议您使用Observable.Finally

于 2012-11-26T16:55:55.793 回答