我试图使用 Chunkify 方法来“捕获”所有“待处理”项目。但我发现了一个问题,消耗了一个线程的所有资源,有谁知道为什么会发生这种情况,我该如何防止这种情况发生?
事实上,我的目标是为我的事件创建一个“垃圾邮件过滤器”,只选择最后 5 个值,并忽略超过两个连续重复。
问题如何发生的示例:
注意力!下面的代码既愚蠢又毫无意义。这只是为了演示问题,并表明事件可以被多个线程调用(请运行上面的代码并观察输出窗口,这就是问题所在)。
[TestMethod]
public void ThreadSpinning()
{
var subs = Observable.FromEventPattern(add => this.Raise += add, rem => this.Raise -= rem)
.Select((item, countRaise) => countRaise)
.Chunkify()
.ToObservable(Scheduler.Default)
.Select((countRaise, countChunkify) => new { raiseItems = countRaise, countChunkify })
.Do(obj => Trace.Write("Chunkify = " + obj.countChunkify + " | "))
.Select(a => a.raiseItems)
.Where(a => a.Any())
.Do(obj =>
{
Trace.WriteLine("[ Start do something.. Raise = " + Dump(obj) + " ] " +
Environment.NewLine + Environment.NewLine);
Thread.Sleep(700);
}).Subscribe();
Thread.Sleep(2000);
var handle = new ManualResetEventSlim(false);
ThreadPool.QueueUserWorkItem(r =>
{
Thread.Sleep(500);
Task.Factory.StartNew(() =>
{
OnRaise();
OnRaise();
}).Wait();
OnRaise();
Thread.Sleep(500);
OnRaise();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(1500);
OnRaise();
OnRaise();
Thread.Sleep(500);
OnRaise();
Thread.Sleep(250);
OnRaise();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(500);
Task.Factory.StartNew(OnRaise).Wait();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(2000);
handle.Set();
});
handle.Wait();
Thread.Sleep(3000);
subs.Dispose();
Thread.Sleep(1000);
}
private event EventHandler Raise;
protected virtual void OnRaise()
{
EventHandler handler = Raise;
if (handler != null)
handler(this, EventArgs.Empty);
}
public static string Dump<T>(IEnumerable<T> source)
{
return source.Select(a => a.ToString()).Aggregate((a, b) => a + ", " + b);
}