要求是:要处理的项目存储在一个全局队列中。几个处理线程从全局队列中获取项目进行处理。Producer线程连续快速地将item添加到全局队列(比所有dealer线程的处理速度快得多。另外,handler线程是计算密集型的。最好的性能是完全使用CPU)。所以,我再使用一个countKeeping 线程来将队列的长度保持在特定范围内,就像从BOTTOM到TOP大致一样(只是为了防止内存使用过多)。
我ManualResetEvent
用来处理“可以添加到队列”状态更改。全局队列是
Queue<Object> mQueue = new Queue<Object>;
ManualResetEvent waitingKeeper = new ManualResetEvent(false);
处理程序线程是
void Handle()
{
while(true)
{
Object item;
lock(mQueue)
{
if(mQueue.Count > 0)
item = mQueue.Dequeue();
}
// deal with item, compute-intensive
}
}
生产者线程将调用 AddToQueue() 函数将项目添加到 mQueue。
void AddToQueue(Object item)
{
waitingKeeper.WaitOne();
lock(mQueue)
{
mQueue.Enqueue(item);
}
}
countKeeping线程主要如下
void KeepQueueingCount()
{
while(true)
{
// does not use 'lock(mQueue)'
// because I don't need that specific count of the queue
// I just need the queue does not cost too much memory
if(mQueue.Count < BOTTOM)
waitingKeeper.Set();
else if(mQueue.Count > TOP)
waitingKeeper.Reset();
Thread.Sleep(1000);
}
}
问题来了。
当我将 BOTTOM 和 TOP 设置为较小的数字时,例如 BOTTOM = 20,TOP = 100,它适用于四核 CPU(CPU 利用率高),但对于单 CPU 则效果不佳(CPU 利用率波动较大。 )。
当我将 BOTTOM 和 TOP 设置为更大的数字时,例如 BOTTOM = 100,TOP = 300,它适用于单 CPU,但不适用于四核 CPU。
两种环境,两种情况,内存都没有使用太多(最多50M左右)。
从逻辑上讲,更大的 BOTTOM 和 TOP 将有助于提高性能(当内存没有使用太多时),因为有更多的项目供处理程序线程处理。但事实似乎并非如此。
我尝试了几种方法来找到问题的原因。而且我刚刚发现,当我用于lock(mQueue)
保持线程时,它在上述两种 CPU 条件下都能正常工作。
新的countKeeping线程主要是这样的
void KeepQueueingCount()
{
bool canAdd = false;
while(true)
{
lock(mQueue)
{
if(mQueue.Count < BOTTOM)
canAdd = true;
else if(mQueue.Count > TOP)
canAdd = false;
}
if(canAdd)
waitingKeeper.Set();
else
waitingKeeper.Reset();
// I also did some change here
// when the 'can add' status changed, will sleep longer
// if not 'can add' status not changed, will sleep lesser
// but this is not the main reason
Thread.Sleep(1000);
}
}
所以我的问题是
- 当我没有
lock
在countKeeping 线程中使用时,为什么全局队列的范围会在不同的 CPU 条件下影响性能(这里,性能主要是 CPU 利用率)? - 当我
lock
在countKeeping thread中使用时,在不同的条件下性能都很好。有什么lock
真正影响这一点? - 有没有更好的方法来改变“可以添加”状态而不是使用
ManualResetEvent
? - 有没有更好的模型适合我的要求?或者当生产者线程连续快速工作 时,有没有更好的方法来防止内存不被使用?
---更新---
生产者线程的主要部分如下。STEP是数据库中每个查询的项目数。依次查询,直到查询完所有项目。
void Produce()
{
while(true)
{
// query STEP items from database
itemList = QuerySTEPFromDB();
if(itemList.Count == 0)
// stop all handler thread
// here, will wait for handler threads handle all items in queue
// then, all handler threads exit
else
foreach(var item in itemList)
AddToQueue(item);
}
}