-1

我试图使用 Chunkify 方法来“捕获”所有“待处理”项目。但我发现了一个问题,消耗了一个线程的所有资源,有谁知道为什么会发生这种情况,我该如何防止这种情况发生?

事实上,我的目标是为我的事件创建一个“垃圾邮件过滤器”,只选择最后 5 个值,并忽略超过两个连续重复。

问题如何发生的示例:

注意力!下面的代码既愚蠢又毫无意义。这只是为了演示问题,并表明事件可以被多个线程调用(请运行上面的代码并观察输出窗口,这就是问题所在)。

    [TestMethod]
    public void ThreadSpinning()
    {

     var subs = Observable.FromEventPattern(add => this.Raise += add, rem => this.Raise -= rem)
                           .Select((item, countRaise) => countRaise)
                           .Chunkify()
                           .ToObservable(Scheduler.Default)
                           .Select((countRaise, countChunkify) => new { raiseItems = countRaise, countChunkify })
                           .Do(obj => Trace.Write("Chunkify = " + obj.countChunkify + " | "))
                           .Select(a => a.raiseItems)
                           .Where(a => a.Any())
                           .Do(obj =>
                           {
                               Trace.WriteLine("[ Start do something.. Raise = " + Dump(obj) + " ] " +
                                               Environment.NewLine + Environment.NewLine);

                               Thread.Sleep(700);
                           }).Subscribe();

        Thread.Sleep(2000);

        var handle = new ManualResetEventSlim(false);
        ThreadPool.QueueUserWorkItem(r =>
            {
                Thread.Sleep(500);

                Task.Factory.StartNew(() =>
                {
                    OnRaise();
                    OnRaise();

                }).Wait();

                OnRaise();
                Thread.Sleep(500);
                OnRaise();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(1500);
                OnRaise();
                OnRaise();

                Thread.Sleep(500);

                OnRaise();

                Thread.Sleep(250);

                OnRaise();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(500);
                Task.Factory.StartNew(OnRaise).Wait();
                Task.Factory.StartNew(OnRaise).Wait();
                Thread.Sleep(2000);

                handle.Set();
            });

        handle.Wait();
        Thread.Sleep(3000); 
        subs.Dispose();

        Thread.Sleep(1000); 
    }

    private event EventHandler Raise;

    protected virtual void OnRaise()
    {
        EventHandler handler = Raise;
        if (handler != null) 
            handler(this, EventArgs.Empty);
    }

    public static string Dump<T>(IEnumerable<T> source)
    {
        return source.Select(a => a.ToString()).Aggregate((a, b) => a + ", " + b);
    }
4

1 回答 1

2

我不确定您到底要做什么,但是您的代码存在一些问题:

  • Chunkify用于从 a 转换IObservableIEnumerable,但随后将其转换回IObservablea ,这有点奇怪。

  • 声明

              .Select((item, count) => new { item, count })
              .Do(obj => Trace.Write(obj.count + " | "))
              .Select(a => a.item)
              .Where(a => a.Any())
              .Do(obj => Trace.WriteLine("Do something.. " + obj.Dump()))
    

    有很多代码似乎只是为了调试目的而进行转换。您可以在一个Do调用中的一个语句 lambda 中编写所有调试代码。

  • 您不应该创建新Random对象,而是重用一个并调用Next()它:http: //msdn.microsoft.com/en-us/library/h343ddh9.aspx

  • 您使用的Thread.Sleep非常过度,并且在可观察的序列中执行它是一种代码气味。尝试将您的代码转换为使用各种时间运算符,例如ThrottleDelay。您可能还想使用Observable.Generate.

  • 有可能这Chunkify实际上不是你所想的——你考虑过Buffer运营商吗?这里有一个很好的时间运算符列表:http: //introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html#TimeShiftedSequences

  • 要测试您的代码,您不需要实际引发您的事件处理程序,您只需通过生成可观察序列并订阅它来测试您的订阅代码。例如,如果您有一个方法SubscribeToMyEvent(IObservable<T>),那么您可以通过传递一个使用创建的可观察对象FromEventPattern或使用类似IntervalGenerate

你的场景到底是什么?是什么触发了事件?您究竟是如何尝试更改事件流的?绘制大理石图(例如http://channel9.msdn.com/blogs/j.van.gogh/reactive-extensions-api-in-depth-marble-diagrams-select--where )会很有帮助,以便想想你的算法。

于 2013-08-21T21:52:34.873 回答