5

我们最近将系统从 RX 1.11111 移植到 RX 2.0 并发现了这个问题。我们为 ObserveOn 使用 EventLoopScheduler,如下所示:

IDisposable subscription = someSubject
    .ObserveOn(m_eventLoopScheduler)
    .SomeMoreRXFunctions()
    .Subscribe((something)=>something)

调度程序位于应用程序出口 ( m_eventLoopScheduler.Dispose) 上。在此之前,我们处理所有对 observable ( subscription.Dispose) 的订阅。

尽管如此,我们还是得到ObjectDisposedExceptionEventLoopScheduler.Schedule. 不可能捕获该异常,因为它源自 RX 线程。这几乎就像 Dispose 并没有摆脱某个队列中的所有项目。

我们试图删除对的调用,EventLoopScheduler.Dispose异常消失了。但是,SomeMoreRXFunctions()尽管所有订阅都已处理完毕,但其中的代码又执行了大约 10 次。

还有其他方法可以正确关闭EventLoopScheduler吗?

4

4 回答 4

9

关于订阅的一些观察

(对不起,无法抗拒双关语!)IObservable<out T>,几乎每个 Rx 运算符实现的接口,只有一个重要的方法:

IDisposable Subscribe(IObserver<T> observer);

观察者(实现IObserver<T>)完全通过此方法及其返回值的处置可以确定订阅何时开始和结束。

当订阅属于链的一部分的 Observable 时,通常(直接或间接),这将导致订阅更进一步的链。确切地说,是否以及何时发生这种情况取决于给定的 Observable。

在许多情况下,收到的订阅与做出的订阅之间的关系不是一对一的。这方面的一个例子是 Publish(),它最多只能有一个对其源的订阅,而不管它收到多少订阅。这确实是 Publish 的全部意义所在。

在其他情况下,这种关系具有时间方面。例如, Concat() 在第一个流订阅之前不会订阅它的第二个流OnCompleted()——这可能永远不会!

值得花一点时间来研究一下Rx 设计指南,因为它们有一些非常相关的内容要说:

Rx 设计指南

4.4. 假设尽最大努力停止取消订阅上的所有未完成工作。当对可观察订阅调用取消订阅时,可观察序列将尽最大努力尝试停止所有未完成的工作。这意味着任何尚未开始的排队工作都不会开始。

任何已经在进行中的工作可能仍会完成,因为中止正在进行的工作并不总是安全的。这项工作的结果不会发送给任何先前订阅的观察者实例。

底线

注意这里的含义;底线是,当任何上游订阅可能被制作或处置时,它完全取决于 Observable 的实现。换句话说,绝对不能保证处置订阅会导致 Observable 处置它直接或间接进行的任何或所有订阅。这适用于运营商使用的任何其他资源(例如计划的操作)或其上游订阅。

最好的希望是,每个上游运营商的作者确实尽了最大的努力,停止了所有优秀的工作。

回到问题(终于!)

如果没有看到SomeMoreRXFunctions我无法确定的内容,但您看到的异常似乎很可能是由于 - 尽管处理了您知道的订阅 - 通过处理调度程序,您已经从下面撕下了地毯英尺仍在运行的订阅。实际上,您导致了这种情况:

void Main()
{
    var scheduler = new EventLoopScheduler();

    // Decide it's time to stop
    scheduler.Dispose();

    // The next line will throw an ObjectDisposedException
    scheduler.Schedule(() => {});
}

编写一个可能导致此问题的完全合理的运算符很容易——即使是不直接使用调度程序的运算符!考虑一下:

public static class ObservableExtensions
{
    public static IObservable<TSource> ReasonableDelay<TSource, TDelay>
        (this IObservable<TSource> source, IObservable<TDelay> delay)
    {
        return Observable.Create<TSource>(observer =>
        {        
            var subscription = new SerialDisposable();
            subscription.Disposable = delay
                .IgnoreElements()
                .Subscribe(_ => {}, () => {
                    Console.WriteLine("Waiting to subscribe to source");
                    // Artifical sleep to create a problem
                    Thread.Sleep(TimeSpan.FromSeconds(2));
                    Console.WriteLine("Subscribing to source");
                    // Is this line safe?
                    subscription.Disposable = source.Subscribe(observer);
                }); 
            return subscription;
        });
    }    
}

一旦传递的延迟 observable 完成,此运算符将订阅源。看看它有多合理 - 它使用 aSerialDisposable将两个潜在的时间上独立的订阅正确地呈现给它的观察者作为一个一次性的。

然而,颠覆这个运算符并让它引发异常是微不足道的:

void Main()
{
    var scheduler = new EventLoopScheduler();
    var rx = Observable.Range(0, 10, scheduler)
                       .ReasonableDelay(Observable.Timer(TimeSpan.FromSeconds(1)));
    var subs = rx.Subscribe();

    Thread.Sleep(TimeSpan.FromSeconds(2));
    subs.Dispose();
    scheduler.Dispose();    
}

这里发生了什么事?我们正在RangeEventLoopScheduler 上创建一个,但附加我们使用它的默认调度程序ReasonableDelay创建的延迟流。Timer

现在我们订阅,等到我们的延迟流完成,然后我们以“正确的顺序”处理我们的订阅和 EventLoopScheduler。

我插入的人为延迟Thread.Sleep确保了很容易自然发生的竞争条件 - 延迟已完成,订阅已被处理,但阻止Range操作员访问已处理的 EventLoopScheduler 为时已晚。

一旦延迟部分完成,我们甚至可以加强我们的合理努力来检查观察者是否取消订阅:

// In the ReasonableDelay method
.Subscribe(_ => {}, () => {        
    if(!subscription.IsDisposed) // Check for unsubscribe
    {
        Console.WriteLine("Waiting to subscribe to source");
        // Artifical sleep to create a problem            
        Thread.Sleep(TimeSpan.FromSeconds(2));
        Console.WriteLine("Subscribing to source");
        // Is this line safe?                    
        subscription.Disposable = source.Subscribe(observer);
    }
}); 

它不会有帮助。也没有办法纯粹在这个运算符的上下文中使用锁定语义。

你做错了什么

你没有处理那个 EventLoopScheduler 的事情!一旦您将其传递给其他 Rx 操作员,您就已将其责任转嫁给了其他人。由 Rx 操作员遵循指导方针并尽可能及时地清理他们的订阅 - 这意味着直接或间接取消 EventLoopScheduler 上的任何未决计划项目并停止任何进一步的调度,以便队列尽快清空可能的。

在上面的示例中,您可以将问题归因于对多个调度程序的有些人为的使用以及在 ReasonableDelay 中的强制休眠 - 但不难想象一个操作员无法立即清理的真实场景。

本质上,通过处理 Rx 调度程序,您正在执行与线程中止等效的 Rx。就像在那种情况下一样,您可能需要处理异常!

正确的做法是揭开神秘面纱SomeMoreRXFunctions(),并确保他们尽可能地遵守准则。

于 2013-10-31T22:15:10.423 回答
2

刚刚注意到这个问题是这个问题的链接: Reactive Rx 2.0 EventLoopScheduler ObjectDisposedException after dispose

应在此处重新发布我在此处所做的事情-我不知道有任何方法可以“刷新”调度程序,但是您可以通过这种方式包装/处理不可避免的“对象已处理”异常:

EventLoopScheduler scheduler = new EventLoopScheduler();
var wrappedScheduler = scheduler.Catch<Exception>((ex) => 
{
    Console.WriteLine("Got an exception:" + ex.ToString());
    return true;
});

for (int i = 0; i < 100; ++i)
{
    var handle = Observable.Interval(TimeSpan.FromMilliseconds(1))
                           .ObserveOn(wrappedScheduler)
                           .Subscribe(Observer.Create<long>((x) => Thread.Sleep(1000)));

    handles.Add(handle);
}
于 2013-03-28T17:25:52.080 回答
2

我们遇到了同样的问题,最终做了以下处理 EventLoopScheduler 没有例外:

scheduler.Schedule(() => scheduler.Dispose());

如果您在执行此操作之前正确处置所有订阅(您说您已经这样做了),则 Dipose() 调用是最后一个计划的操作,并且所有其他挂起的操作都可以在调用 Dispose 之前完成。

为了使其更加健壮/可重用,您可以创建自己的 IScheduler 实现,包装 EventLoopScheduler 将所有操作委托给它 + 实现 Dispose,如上所示。最重要的是,您可以在 Schedule 方法中实现守卫,以防止在调用 Dispose 后调度操作(例如,如果您忘记取消订阅某些观察者)。

于 2015-08-13T12:33:40.453 回答
0

部分解决。案件比这里显示的要复杂得多。链条是这样的:

var 已发布 = someSubject.ObserveOn(m_eventLoopScheduler).SomeMoreRXFunctions().Publish();

IDisposable 一次性 1 = 已发布.Connect();

IDisposable 一次性2 = 已发布。订阅((某事)=>某事);

如果我同时处理了一次性 1 和一次性 2,那么 SomeMoreRXFunctions() 中的代码将不再执行。另一方面,尝试处理调度程序本身仍然会引发相同的异常。

不幸的是,我无法用更简单的代码重现该问题。这可能表明我还缺少其他东西。

这是一个我们可以接受的解决方案,但我仍然希望找到更好的方法来一次关闭调度程序,而不会出现异常。

于 2012-10-30T14:27:49.433 回答