4

我正在使用 Rx 2.0 的 EventLoopScheduler 来排队/序列化工作。当我需要处置调度程序时,如果还有剩余工作,我将收到未处理的 ObjectDisposedException。这是预期的行为吗?

人为/示例代码:

    EventLoopScheduler scheduler = new EventLoopScheduler();
    List<IDisposable> handles = new List<IDisposable>();

    for (int i = 0; i < 100; ++i)
    {
        var handle = Observable.Interval(TimeSpan.FromMilliseconds(1))
                               .ObserveOn(scheduler)
                               .Subscribe(Observer.Create<long>((x) => Thread.Sleep(1000)));

        handles.Add(handle);
    }

    Thread.Sleep(TimeSpan.FromSeconds(1));

    foreach (var handle in handles)
        handle.Dispose();

    scheduler.Dispose();
    Console.ReadLine();

堆栈跟踪:

   System.ObjectDisposedException
   at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.<>c__DisplayClass50`1.<InvokeRec1>b__4e(TState state2)
   at System.Reactive.ScheduledObserver`1.Run(Object state, Action`1 recurse)
   at System.Reactive.Concurrency.Scheduler.<>c__DisplayClass50`1.<InvokeRec1>b__4d(TState state1)
   at System.Reactive.Concurrency.Scheduler.InvokeRec1[TState](IScheduler scheduler, Pair`2 pair)
   at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
   at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
   at System.Reactive.Concurrency.EventLoopScheduler.Run()
   at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
   at System.Threading.ThreadHelper.ThreadStart()

类似于这个未回答的问题:RX2.0: ObjectDisposedException after disposing EventLoopScheduler

4

3 回答 3

3

这并不是说还有剩余的工作(即调度程序的队列中仍然有东西),而是您仍然有未完成的订阅正在尝试东西添加到该队列中。

处置所有订阅,然后处置调度程序,它应该可以正常工作

于 2013-03-28T19:32:13.077 回答
2

是的,我以前也见过 - 我不认为有办法“清除事件线程”本身,但你可以这样做:

EventLoopScheduler scheduler = new EventLoopScheduler();
var wrappedScheduler = scheduler.Catch<Exception>((ex) => 
{
    Console.WriteLine("Got an exception:" + ex.ToString());
    return true;
});

for (int i = 0; i < 100; ++i)
{
    var handle = Observable.Interval(TimeSpan.FromMilliseconds(1))
                           .ObserveOn(wrappedScheduler)
                           .Subscribe(Observer.Create<long>((x) => Thread.Sleep(1000)));

    handles.Add(handle);
}
于 2013-03-28T17:23:56.823 回答
1

实际上,有一种简单的方法可以EventLoopScheduler在处理之前刷新队列。首先,确保向队列中添加操作的所有订阅都已释放,以便无法添加任何其他内容。

然后,只需安排调度程序本身的处置:

scheduler.Schedule(() =>
{
    // cleanup code: dispose of other stuff here
    scheduler.Dispose();
});

确保在所有其他挂起的操作都运行并关闭线程之后运行最后一个计划的操作。

于 2019-02-01T04:47:18.847 回答