6

我有一个 BlockingCollection(ConcurrentBag, 50000),我试图为生产者线程使用 50,000 的非常小的有界容量,以最大限度地增加我可以在消费者线程的 ConcurrentDictionary 中处理的记录数。生产者比消费者快得多,否则会消耗大部分内存。

不幸的是,我立即注意到我的 ConcurrentDictionary 中的记录总数现在大大低于在我的测试数据执行时添加 50,000 的有界容量后应有的记录数。我读到 BlockingCollection 的 .add 方法应该无限期地阻塞,直到集合中有空间可以执行添加。然而,情况似乎并非如此。

问题:

  1. 如果在 BlockingCollection 中的容量释放之前调用了太多 add 方法,BlockingCollection 的 .add 方法最终会超时还是静默失败?

  2. 如果 #1 的答案是肯定的,那么在超出 Bounding Capacity 后我可以尝试多少次添加而不丢失数据?

  3. 如果调用了许多 BlockingCollection .add() 方法,这些方法正在等待/阻塞容量并调用 CompleteAdding() 方法,那么那些等待/阻塞的添加会继续等待然后最终添加还是静默失败?

4

1 回答 1

11

如果您将 BlockingCollection 与 ConcurrentDictionary 一起使用,请确保您的代码中没有隐藏 BlockingCollection.TryAdd(myobject) 方法,并且将其误认为是 ConcurrentDictionary.TryAdd() 方法。如果 BlockingCollection 的 Bounding Capacity 已超出, BlockingCollection.TryAdd(myobject) 将返回 false 并丢弃产生“静默失败”的添加请求。

  1. BlockingCollection 的 .Add() 方法在大量超出 Bounding Capacity 后似乎不会“静默失败”或丢失添加。add() 方法最终会导致进程耗尽内存,如果有太多的 .add() 等待添加到容量过剩的 BlockingCollection。(这必须是流量控制问题的一个非常极端的情况)
  2. 见#1。
  3. 我自己的测试似乎表明,一旦调用 CompleteAdding() 方法,所有后续添加都会失败,如 MSDN 文档中所述。

关于性能的最后说明

与在同一过程中不使用 Bounding Capacity 和 .TryAdd() 相比,似乎(在我自己的情况下)在 BlockingCollection 上使用 Bounding Capacity 和 .Add() 非常慢。

通过实施我自己的边界容量策略,我取得了更好的性能结果。有很多方法可以做到这一点。三个选项包括与 Monitor.PulseAll() 一起使用的 Thread.Sleep()、Thread.Spinwait() 或 Monitor.Wait()。当使用其中一种策略时,也可以使用 BlockingCollection.TryAdd() 而不是 BlockingCollection.Add() 并且没有边界容量,而不会丢失任何数据或内存不足。这种方法似乎也能产生更好的性能。

您可以根据哪种方案最适合生产者和消费者线程中的速度差异,从三个示例中进行选择。

Thread.Wait() 示例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the bounded capacity has been exceeded
    //place the thread in wait mode 
    Thread.Sleep(SleepTime);
}

Thread.SpinWait() 示例:

//Check to see if the BlockingCollection's bounded capacity has been exceeded.
while (Tokens.Count > 50000)
{   //If the capacity has been exceeded
    //place the thread in wait mode 
    Thread.SpinWait(SpinCount);
}  

Monitor.Wait() 示例

这个例子需要在生产者和消费者端都有一个钩子。

生产者代码

//Check to see BlockingCollection capacity has been exceeded.
if (Tokens.Count > 50000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count > 50000)
        {
            Monitor.Wait(syncLock, 1000);
        }
    }
}

消费者守则

//Check to see BlockingCollection capacity is back a normal range.
if (Tokens.Count <= 40000)
{ 
    lock (syncLock)
    {   //Double check before waiting
        if (Tokens.Count < 40000)
        {
            Monitor.PulseAll(syncLock);
        }
    }
}
于 2012-11-03T19:27:59.423 回答