如果您将 BlockingCollection 与 ConcurrentDictionary 一起使用,请确保您的代码中没有隐藏 BlockingCollection.TryAdd(myobject) 方法,并且将其误认为是 ConcurrentDictionary.TryAdd() 方法。如果 BlockingCollection 的 Bounding Capacity 已超出, BlockingCollection.TryAdd(myobject) 将返回 false 并丢弃产生“静默失败”的添加请求。
- BlockingCollection 的 .Add() 方法在大量超出 Bounding Capacity 后似乎不会“静默失败”或丢失添加。add() 方法最终会导致进程耗尽内存,如果有太多的 .add() 等待添加到容量过剩的 BlockingCollection。(这必须是流量控制问题的一个非常极端的情况)
- 见#1。
- 我自己的测试似乎表明,一旦调用 CompleteAdding() 方法,所有后续添加都会失败,如 MSDN 文档中所述。
关于性能的最后说明
与在同一过程中不使用 Bounding Capacity 和 .TryAdd() 相比,似乎(在我自己的情况下)在 BlockingCollection 上使用 Bounding Capacity 和 .Add() 非常慢。
通过实施我自己的边界容量策略,我取得了更好的性能结果。有很多方法可以做到这一点。三个选项包括与 Monitor.PulseAll() 一起使用的 Thread.Sleep()、Thread.Spinwait() 或 Monitor.Wait()。当使用其中一种策略时,也可以使用 BlockingCollection.TryAdd() 而不是 BlockingCollection.Add() 并且没有边界容量,而不会丢失任何数据或内存不足。这种方法似乎也能产生更好的性能。
您可以根据哪种方案最适合生产者和消费者线程中的速度差异,从三个示例中进行选择。
Thread.Wait() 示例:
//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{ //If the bounded capacity has been exceeded
//place the thread in wait mode
Thread.Sleep(SleepTime);
}
Thread.SpinWait() 示例:
//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{ //If the capacity has been exceeded
//place the thread in wait mode
Thread.SpinWait(SpinCount);
}
Monitor.Wait() 示例
这个例子需要在生产者和消费者端都有一个钩子。
生产者代码
//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{
lock (syncLock)
{ //Double check before waiting
if (Tokens.Count > 50000)
{
Monitor.Wait(syncLock, 1000);
}
}
}
消费者守则
//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{
lock (syncLock)
{ //Double check before waiting
if (Tokens.Count < 40000)
{
Monitor.PulseAll(syncLock);
}
}
}