0

我有一个由网络 I/O 频繁触发的事件源,基于底层设计,当然事件每次总是在不同的线程上,现在我通过 Rx 包装了这个事件:Observable.FromEventPattern(...),现在我使用TakeWhile(predict)to过滤一些特殊事件数据。目前,我对它的线程安全性、hit 和 muteTakeWhile(predict)的工作有些担心,但在并发的情况下,还能保证吗?因为我猜底层实现可能是(我无法阅读源代码,因为它太复杂了......):

    public static IObservable<TSource> TakeWhile<TSource>(this IObservable<TSource> source, Func<TSource, bool> predict)
    {
        ISubject<TSource> takeUntilObservable = new TempObservable<TSource>();
        IDisposable dps = null;
        // 0 for takeUntilObservable still active, 1 for predict failed, diposed and OnCompleted already send. 
        int state = 0;
        dps = source.Subscribe(
             (s) =>
             {
                 /* NOTE here the 'hit and mute' still not thread safe, one thread may enter 'else' and under CompareExchange, but meantime another thread may passed the predict(...) and calling OnNext(...)
                  * so the CompareExchange here mainly for avoid multiple time call OnCompleted() and Dispose();
                  */

                 if (predict(s) && state == 0)
                 {
                     takeUntilObservable.OnNext(s);
                 }
                 else
                 {
                     // !=0 means already disposed and OnCompleted send, avoid multiple times called via parallel threads.
                     if (0 == Interlocked.CompareExchange(ref state, 1, 0))
                     {
                         try
                         {
                             takeUntilObservable.OnCompleted();
                         }
                         finally
                         {
                             dps.Dispose();
                         }
                     }
                 }
             },
             () =>
             {
                 try
                 {
                     takeUntilObservable.OnCompleted();
                 }
                 finally { dps.Dispose(); }
             },
             (ex) => { takeUntilObservable.OnError(ex); });
        return takeUntilObservable;
    }

TempObservable只是 ISubject 的一个简单实现。
如果我的猜测合理,那么似乎无法保证线程安全,这意味着一些意外事件数据可能仍会传入 OnNext(...) 因为“静音”仍在进行中。然后我写了一个简单的测试来验证,但出乎意料,结果都是肯定的:

    public class MultipleTheadEventSource
    {
        public event EventHandler OnSthNew;
        int cocurrentCount = 1000;
        public void Start()
        {
            for (int i = 0; i < this.cocurrentCount; i++)
            {
                int j = i;
                ThreadPool.QueueUserWorkItem((state) =>
                {
                    var safe = this.OnSthNew;
                    if (safe != null)
                        safe(j, null);
                });
            }
        }
    }

    [TestMethod()]
    public void MultipleTheadEventSourceTest()
    {
        int loopTimes = 10;
        int onCompletedCalledTimes = 0;
        for (int i = 0; i < loopTimes; i++)
        {
            MultipleTheadEventSource eventSim = new MultipleTheadEventSource();
            var host = Observable.FromEventPattern(eventSim, "OnSthNew");
            host.TakeWhile(p => { return int.Parse(p.Sender.ToString()) < 110; }).Subscribe((nxt) =>
            {
                //try print the unexpected values, BUT I Never saw it happened!!!
                if (int.Parse(nxt.Sender.ToString()) >= 110)
                {
                    this.testContextInstance.WriteLine(nxt.Sender.ToString());
                }
            }, () => { Interlocked.Increment(ref onCompletedCalledTimes); });
            eventSim.Start();
        }

        // simply wait everything done.
        Thread.Sleep(60000);
        this.testContextInstance.WriteLine("onCompletedCalledTimes: " + onCompletedCalledTimes);
    }

在我进行测试之前,这里的一些朋友建议我尝试使用Synchronize<TSource>ObserveOn使其线程安全,所以对我的进展想法有什么想法以及为什么问题没有重现?

4

1 回答 1

3

根据您的其他问题,答案仍然相同:在 Rx 中,您应该假设 Observers 以序列化方式调用。

提供更好的答案;最初,Rx 团队确保 Observable 序列是线程安全的,但是对于行为良好/设计良好的应用程序的性能损失是不必要的。因此决定移除线程安全以消除性能成本。为了让您重新选择线程安全,您可以应用Synchronize()将序列化所有方法调用 OnNext/OnError/OnCompleted 的方法。这并不意味着它们将在同一个线程上被调用,但您不会OnNext在另一个正在处理的方法中调用您的方法。

坏消息,根据记忆,这发生在 Rx 2.0 中,而您专门询问 Rx 1.0。(我不确定是否Synchonize()存在于 1.xx 中?)

因此,如果您使用的是 Rx v1,那么您对什么是线程安全的和什么不是线程安全的有这种模糊的确定性。我很确定主题是安全的,但我不能确定工厂方法,如FromEventPattern.

我的建议是:如果您需要确保线程安全,请序列化您的数据管道。最简单的方法是使用单线程IScheduler实现,即 DispatcherScheduler 或 EventLoopScheduler 实例。

一些好消息是,当我写关于 Rx 的书时,它确实针对 v1,所以本节与您非常相关http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html

因此,如果您的查询现在看起来像这样:

Observable.FromEventPatter(....)
          .TakeWhile(x=>x>5)
          .Subscribe(....);

为确保管道被序列化,您可以创建一个 EventLoopScheduler(以为此专用线程为代价):

var scheduler = new EventLoopScheduler();
Observable.FromEventPatter(....)
          .ObserveOn(scheduler)
          .TakeWhile(x=>x>5)
          .Subscribe(....);
于 2013-09-18T17:36:59.600 回答