2

我有两个IDisposables需要按顺序处理。排序很重要,因为第一个IDisposable会杀死依赖于将被第二个杀死的服务的 Rx 订阅IDisposable。这是在 Windows 窗体应用程序中,订阅IObservable需要发生在不同的线程上,但观察和处置需要发生在 UI 线程上。(实际上,只要确保顺序,我不在乎是否在 UI 线程上进行处理。)因此,在代码中,我大致有以下内容(一旦减少):

SomeService = new DisposableService();
Subscription = Foo(someService).SubscribeOn(NewThreadScheduler.Default).ObserveOn(theForm).Subscribe(...)

在许多 UI 事件中,我需要按顺序处理这两个事件(订阅和 SomeService)。为此CompositeDisposable,除了ContextDisposable在同一线程上提供串行处理之外,我还尝试使用 Rx:

_Disposable = new CompositeDisposable(new[] {                     
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, Subscription),                     
    new ContextDisposable(WindowsFormsSynchronizationContext.Current, SomeService)});

但是,上述方法不起作用。根据我的日志记录_DisposableContextDisposableforSomeService在同一个线程上调用,但ContextDisposable仍然发生在与正在处理的服务并发的不同线程上(从而导致竞争条件和 NPE)。

我只用了几个星期的 C# 编程,所以我确信问题在于我对上下文和调度程序如何工作的误解。解决这个问题的正确方法是什么?

4

2 回答 2

0

除非我误解了某些东西,否则您可以控制哪个线程处理什么。谁订阅哪个线程并不重要。看这个例子

internal class Program
{
    private static void Main(string[] args)
    {
        ReactiveTest rx1 = null;
        ReactiveTest rx2 = null;

        var thread1 = new Thread(() => rx1 = new ReactiveTest());
        var thread2 = new Thread(() => rx2 = new ReactiveTest());

        thread1.Start();
        thread2.Start();

        Thread.Sleep(TimeSpan.FromSeconds(1));

        thread1.Join();
        thread2.Join();

        rx1.Dispose();
        rx2.Dispose();
    }
}

public class ReactiveTest : IDisposable
{
    private IDisposable _timerObservable;

    private object _lock = new object();

    public ReactiveTest()
    {
        _timerObservable = Observable.Interval(TimeSpan.FromMilliseconds(250)).Subscribe(i => 
            Console.WriteLine("[{0}] - {1}", Thread.CurrentThread.ManagedThreadId, i));
    }

    public void Dispose()
    {
        lock (_lock)
        {
            _timerObservable.Dispose();
            Console.WriteLine("[{0}] - DISPOSING", Thread.CurrentThread.ManagedThreadId);
        }
    }
}

这输出

[14] - 0
[7] - 0
[15] - 1
[7] - 1
[14] - 2
[15] - 2
[10] - DISPOSING
[10] - DISPOSING

你可以看到我们订阅了两个单独的线程,然后处理了第三个线程。我只锁定了处置,以防您在订阅中需要发生线程安全的事情。在这个例子中,它真的没有必要。

于 2013-01-02T20:15:03.920 回答
0

SubscribeOn安排对Subscribe和的调用Dispose。因此,调用Dispose您的Subscription变量,无论当前是否在 UI 线程上执行,都会导致订阅被安排为由NewThreadScheduler.Default.

SubscribeOn使用;几乎从来都不是一个好主意。但是,在您的情况下,您声称它解决了 50% 的问题 - 这比我见过的大多数用途多 50% - 所以我必须质疑您是否真的需要订阅才能在后台线程上执行第一名。与直接在 UI 线程上调用方法相比,如果方法所做的只是开始一些异步工作,例如发送网络请求或读取文件,那么创建一个全新的线程然后在其上调用一个方法是非常昂贵的。也许如果计算要发送的网络消息被证明过于耗时,那么使用SubscribeOn可能是正确的;不过,当然,前提是您也希望安排处置。

如果对 observable 的订阅必须在后台线程上执行,但处置必须保持自由线程,那么请考虑使用以下运算符(未经测试)。

public static class ObservableExtensions
{
  public static IObservable<TSource> SubscribeOn<TSource>(
    this IObservable<TSource> source,
    bool doNotScheduleDisposal, 
    IScheduler scheduler)
  {
    if (!doNotScheduleDisposal)
    {
      return source.SubscribeOn(scheduler);
    }

    return Observable.Create<TSource>(observer =>
      {
        // Implementation is based on that of the native SubscribeOn operator in Rx
        var s = new SingleAssignmentDisposable();
        var d = new SerialDisposable();
        d.Disposable = s;
        s.Disposable = scheduler.Schedule(() =>
        {
          d.Disposable = source.SubscribeSafe(observer);
        });
        return d;
      });
  }
}
于 2014-08-29T14:18:36.487 回答