关于订阅的一些观察
(对不起,无法抗拒双关语!)IObservable<out T>
,几乎每个 Rx 运算符实现的接口,只有一个重要的方法:
IDisposable Subscribe(IObserver<T> observer);
观察者(实现IObserver<T>
)完全通过此方法及其返回值的处置可以确定订阅何时开始和结束。
当订阅属于链的一部分的 Observable 时,通常(直接或间接),这将导致订阅更进一步的链。确切地说,是否以及何时发生这种情况取决于给定的 Observable。
在许多情况下,收到的订阅与做出的订阅之间的关系不是一对一的。这方面的一个例子是 Publish(),它最多只能有一个对其源的订阅,而不管它收到多少订阅。这确实是 Publish 的全部意义所在。
在其他情况下,这种关系具有时间方面。例如, Concat() 在第一个流订阅之前不会订阅它的第二个流OnCompleted()
——这可能永远不会!
值得花一点时间来研究一下Rx 设计指南,因为它们有一些非常相关的内容要说:
Rx 设计指南
4.4. 假设尽最大努力停止取消订阅上的所有未完成工作。当对可观察订阅调用取消订阅时,可观察序列将尽最大努力尝试停止所有未完成的工作。这意味着任何尚未开始的排队工作都不会开始。
任何已经在进行中的工作可能仍会完成,因为中止正在进行的工作并不总是安全的。这项工作的结果不会发送给任何先前订阅的观察者实例。
底线
注意这里的含义;底线是,当任何上游订阅可能被制作或处置时,它完全取决于 Observable 的实现。换句话说,绝对不能保证处置订阅会导致 Observable 处置它直接或间接进行的任何或所有订阅。这适用于运营商使用的任何其他资源(例如计划的操作)或其上游订阅。
最好的希望是,每个上游运营商的作者确实尽了最大的努力,停止了所有优秀的工作。
回到问题(终于!)
如果没有看到SomeMoreRXFunctions
我无法确定的内容,但您看到的异常似乎很可能是由于 - 尽管处理了您知道的订阅 - 通过处理调度程序,您已经从下面撕下了地毯英尺仍在运行的订阅。实际上,您导致了这种情况:
void Main()
{
var scheduler = new EventLoopScheduler();
// Decide it's time to stop
scheduler.Dispose();
// The next line will throw an ObjectDisposedException
scheduler.Schedule(() => {});
}
编写一个可能导致此问题的完全合理的运算符很容易——即使是不直接使用调度程序的运算符!考虑一下:
public static class ObservableExtensions
{
public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
(this IObservable<TSource> source, IObservable<TDelay> delay)
{
return Observable.Create<TSource>(observer =>
{
var subscription = new SerialDisposable();
subscription.Disposable = delay
.IgnoreElements()
.Subscribe(_ => {}, () => {
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
});
return subscription;
});
}
}
一旦传递的延迟 observable 完成,此运算符将订阅源。看看它有多合理 - 它使用 aSerialDisposable
将两个潜在的时间上独立的订阅正确地呈现给它的观察者作为一个一次性的。
然而,颠覆这个运算符并让它引发异常是微不足道的:
void Main()
{
var scheduler = new EventLoopScheduler();
var rx = Observable.Range(0, 10, scheduler)
.ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
var subs = rx.Subscribe();
Thread.Sleep(TimeSpan.FromSeconds(2));
subs.Dispose();
scheduler.Dispose();
}
这里发生了什么事?我们正在Range
EventLoopScheduler 上创建一个,但附加我们使用它的默认调度程序ReasonableDelay
创建的延迟流。Timer
现在我们订阅,等到我们的延迟流完成,然后我们以“正确的顺序”处理我们的订阅和 EventLoopScheduler。
我插入的人为延迟Thread.Sleep
确保了很容易自然发生的竞争条件 - 延迟已完成,订阅已被处理,但阻止Range
操作员访问已处理的 EventLoopScheduler 为时已晚。
一旦延迟部分完成,我们甚至可以加强我们的合理努力来检查观察者是否取消订阅:
// In the ReasonableDelay method
.Subscribe(_ => {}, () => {
if(!subscription.IsDisposed) // Check for unsubscribe
{
Console.WriteLine("Waiting to subscribe to source");
// Artifical sleep to create a problem
Thread.Sleep(TimeSpan.FromSeconds(2));
Console.WriteLine("Subscribing to source");
// Is this line safe?
subscription.Disposable = source.Subscribe(observer);
}
});
它不会有帮助。也没有办法纯粹在这个运算符的上下文中使用锁定语义。
你做错了什么
你没有处理那个 EventLoopScheduler 的事情!一旦您将其传递给其他 Rx 操作员,您就已将其责任转嫁给了其他人。由 Rx 操作员遵循指导方针并尽可能及时地清理他们的订阅 - 这意味着直接或间接取消 EventLoopScheduler 上的任何未决计划项目并停止任何进一步的调度,以便队列尽快清空可能的。
在上面的示例中,您可以将问题归因于对多个调度程序的有些人为的使用以及在 ReasonableDelay 中的强制休眠 - 但不难想象一个操作员无法立即清理的真实场景。
本质上,通过处理 Rx 调度程序,您正在执行与线程中止等效的 Rx。就像在那种情况下一样,您可能需要处理异常!
正确的做法是揭开神秘面纱SomeMoreRXFunctions()
,并确保他们尽可能地遵守准则。