我在 C# 中使用 .NET 的 RX 库。谁能向我解释为什么“observer.OnCompleted()”方法在以下代码中什么都不做:
var observableStream = Observable.Create<CustomMessage>(
(observer) =>
{
CustomMessage cm = new CustomMessage();
CustomMessage.Subscribe(observer.OnNext);
return Disposable.Create(
() =>
{
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
observer.OnCompleted(); //***Nothing happens here***
}
);
});
//IObserver.OnException()
public override void OnException(Exception e)
{
Console.WriteLine("Exception occurred - " + e.Message);
}
//IObserver.OnComplete()
public override void OnUnsubscribe()
{
Console.WriteLine("Unsubscribed...");
}
//IObserver.OnNext()
public override void HandleNextMsg(IRVMessage msg)
{
Console.WriteLine("Instance received a message");
}
IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);
//At some later point....
myDisposable.Dispose();
该代码旨在订阅 CustomMessages 流。在设置订阅时,它会使用我的 CustomMessage 类型注册 observer.OnNext() 方法。然后在处理订阅时取消注册观察者.OnNext()。所有这些都正常工作。每当收到 CustomMessage 时,都会调用我的“HandleNextMsg()”方法。
稍后,当我希望终止订阅时,我调用“Dispose()”并成功执行以下两行:
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);
然后我不再收到 CustomMessages。然而,尽管执行了以下行,但什么也没做:
observer.OnCompleted();
我希望它会拨打电话:
Console.WriteLine("Unsubscribed...");
在某些时候,观察者和“OnUnsubscribe”方法之间的连接丢失了,我想确切地了解发生了什么。'observer.OnNext()' 怎么能成功注销,但是'observer.OnCompleted()' 什么都不做呢?
有人向我指出,仅仅因为我正在处理流并不意味着我应该调用“OnCompleted()”,但我仍然想了解它为什么不起作用。