我试图找到一种方法来处理多个线程中的队列,动态调整消费者的数量。基本上这个任务是众所周知的:多个生产者创建消息并将它们提交到一个队列中,多个消费者处理来自队列的消息。现在,我考虑了使用不同组件(如 System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue 和 System.Collections.Concurrent.BlockingCollection)的不同方法,但我无法决定如何正确使用最大效率,所以我很高兴通过您的意见收到一些好主意。
以下是更多详细信息:
- 在某些情况下,消息速率预计会非常密集,但处理会相对简单;
- 我不知道我应该拥有多少消费者;
- 我希望进程调整当前消费者的数量,而不是阻止它们,具体取决于排队的消息数量(这意味着我想为每百条消息填充额外的消费者 fe,如果数量,消费者应该停止入队消息的数量比填充它所需的数量少 50,当消息数量超过 300 时,将填充第三个消费者,当它下降到 250 时它应该停止)。
这就是想法。现在,我考虑将 ConcurrentQueue 包装到一个类中,该类将封装 Enqueue 方法,并检查入队后的消息数量,并决定是否启动额外的消费者。并且消费者应该在循环中进行检查,该检查应该决定停止它。我认为您会提出一些更有趣的解决方案。
顺便说一句,我仍然不知道如何处理的情况之一是理论上当最后一条消息正在排队并且同时最后一个消费者已经停止。另一种情况也是关于停止 - 如果多个消费者同时进行停止检查,他们将被停止。我应该如何处理这些情况?
为了说明我的意思,请考虑以下示例:
class MessageController
{
private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();
int amountOfConsumers;
public void Enqueue(IMessage message)
{
messageQueue.Add(message); // point two
if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
{
Task.Factory.StartNew(() =>
{
IMessage msg;
while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
{
msg = messageQueue.Take();
//process msg...
}
ConsumerQuit(); // point four
});
Interlocked.Increment(ref amountOfConsumers);
}
}
public void ConsumerQuit()
{
Interlocked.Decrement(ref amountOfConsumers);
}
}
所以现在当我可以指出具体的代码行时,这些是问题:
- 当最后一个消费者发现没有消息入队(@point one)并且在它调用 ConsumerQuit 方法之前,最后一条消息到达并入队,然后检查额外的消费者,结果是(@point 3)仍然有一个消费者在工作,并且因为单个消息的一个消费者绰绰有余 - 没有任何反应,然后最终调用 ConsumerQuit,并且我有一条消息卡在队列中。
ConsumerTask | LastMessageThread ------------------------------------------------------ @point one(messageQueue.Count=0) | @point two no time | @point three(amountOfConsumers=1) @point four | ended; ended; | ended;
- 当其中一个应该停止时,几个消费者同时到达“第一点”检查(fe messageQueue.Count 为 249),其中几个将停止,因为在其中一个上调用 ConsumerQuit 之前,其他几个也将执行此检查.
ConsumerTask1 | ConsumerTask2| ConsumerTask3 | ConsumerTask4| ------------------------------------------------------------------------------ @point one(.Count=249;amount=4)| no time | no time | @point one | no time | @point one | processing msg| @point four | @point four | no time | @point one | ended; | ended; | @point four | processing msg| ended; | ended; | ended; | ... | ended; |
在这里,如果最后一条消息已经入队,我们还剩下一个消费者任务,它必须单独处理 249 条消息,但最坏的情况可能是所有消息都停止,在最后一条消息之后,可能有数百条消息会卡住。