4

我试图找到一种方法来处理多个线程中的队列,动态调整消费者的数量。基本上这个任务是众所周知的:多个生产者创建消息并将它们提交到一个队列中,多个消费者处理来自队列的消息。现在,我考虑了使用不同组件(如 System.Collections.Queue.Synchronized、System.Collections.Concurrent.ConcurrentQueue 和 System.Collections.Concurrent.BlockingCollection)的不同方法,但我无法决定如何正确使用最大效率,所以我很高兴通过您的意见收到一些好主意。
以下是更多详细信息:

  • 在某些情况下,消息速率预计会非常密集,但处理会相对简单;
  • 我不知道我应该拥有多少消费者;
  • 我希望进程调整当前消费者的数量,而不是阻止它们,具体取决于排队的消息数量(这意味着我想为每百条消息填充额外的消费者 fe,如果数量,消费者应该停止入队消息的数量比填充它所需的数量少 50,当消息数量超过 300 时,将填充第三个消费者,当它下降到 250 时它应该停止)。

这就是想法。现在,我考虑将 ConcurrentQueue 包装到一个类中,该类将封装 Enqueue 方法,并检查入队后的消息数量,并决定是否启动额外的消费者。并且消费者应该在循环中进行检查,该检查应该决定停止它。我认为您会提出一些更有趣的解决方案。

顺便说一句,我仍然不知道如何处理的情况之一是理论上当最后一条消息正在排队并且同时最后一个消费者已经停止。另一种情况也是关于停止 - 如果多个消费者同时进行停止检查,他们将被停止。我应该如何处理这些情况?

为了说明我的意思,请考虑以下示例:

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    int amountOfConsumers;

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message); // point two

        if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers) // point three
        {
            Task.Factory.StartNew(() =>
            {
                IMessage msg;
                while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers)) //point one
                {
                    msg = messageQueue.Take();
                    //process msg...
                }

                ConsumerQuit(); // point four
            });

            Interlocked.Increment(ref amountOfConsumers);
        }
    }

    public void ConsumerQuit()
    {
        Interlocked.Decrement(ref amountOfConsumers);
    }
}

所以现在当我可以指出具体的代码行时,这些是问题:

  • 当最后一个消费者发现没有消息入队(@point one)并且在它调用 ConsumerQuit 方法之前,最后一条消息到达并入队,然后检查额外的消费者,结果是(@point 3)仍然有一个消费者在工作,并且因为单个消息的一个消费者绰绰有余 - 没有任何反应,然后最终调用 ConsumerQuit,并且我有一条消息卡在队列中。
ConsumerTask                       | LastMessageThread
------------------------------------------------------
@point one(messageQueue.Count=0)   | @point two
no time                            | @point three(amountOfConsumers=1)
@point four                        | ended;
ended;                             | ended;
  • 当其中一个应该停止时,几个消费者同时到达“第一点”检查(fe messageQueue.Count 为 249),其中几个将停止,因为在其中一个上调用 ConsumerQuit 之前,其他几个也将执行此检查.
ConsumerTask1                  | ConsumerTask2| ConsumerTask3 | ConsumerTask4|
------------------------------------------------------------------------------
@point one(.Count=249;amount=4)| no time      | no time       | @point one   |
no time                        | @point one   | processing msg| @point four  |
@point four                    | no time      | @point one    | ended;       |
ended;                         | @point four  | processing msg| ended;       |
ended;                         | ended;       | ...           | ended;       |

在这里,如果最后一条消息已经入队,我们还剩下一个消费者任务,它必须单独处理 249 条消息,但最坏的情况可能是所有消息都停止,在最后一条消息之后,可能有数百条消息会卡住。

4

2 回答 2

1

看来我终于想出了一个解决方案,但不确定性能。请考虑以下代码,任何反馈将不胜感激!我仍然希望看到其他一些解决方案或想法,即使它们完全不同并且需要对方法进行重大改变。这是目标:“一种在多个线程中处理队列,动态调整消费者数量的方法”

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    private ManualResetEvent mre = new ManualResetEvent(true);

    private int amountOfConsumers;

    object o = new object();

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message);

        mre.WaitOne();
        if (Math.Floor((double)messageQueue.Count / 100)+1 > amountOfConsumers)
        {
            Interlocked.Increment(ref amountOfConsumers);

            var task = Task.Factory.StartNew(() =>
            {
                IMessage msg;
                bool repeat = true;

                while (repeat)
                {
                    while ((messageQueue.Count > 0) && (Math.Floor((double)((messageQueue.Count + 50) / 100)) + 1 >= amountOfConsumers))
                    {
                        msg = messageQueue.Take();
                        //process msg...
                    }

                    lock (o)
                    {
                        mre.Reset();

                        if ((messageQueue.Count == 0) || (Math.Ceiling((double)((messageQueue.Count + 51) / 100)) < amountOfConsumers))
                        {
                            ConsumerQuit();
                            repeat = false;
                        }

                        mre.Set();
                    }
                }
            });
        }
    }

    public void ConsumerQuit()
    {
        Interlocked.Decrement(ref amountOfConsumers);
    }
}
于 2013-03-05T19:05:41.537 回答
1

我最初的想法是你是在倒着设计这个。

在查看并行性时,您可能并不总是通过向单个任务添加更多线程来提高效率。有时最佳数量等于或小于您正在使用的机器上的核心数量。这样做的原因是您正在通过锁争用和线程切换创建更多开销。

通过增加更多的消费者,你可能会发现消费率实际上是下降而不是上升。

要考虑的一件事是处理一条消息需要多长时间?如果这个时间比产生一个任务的时间长得多,为什么不让一个消费者简单地创建一个新的任务来处理每条消息呢?

class MessageController
{
    private BlockingCollection<IMessage> messageQueue = new BlockingCollection<IMessage>();

    public void Enqueue(IMessage message)
    {
        messageQueue.Add(message);
    }

    public void Consume()
    {
        //This loop will not exit until messageQueue.CompleteAdding() is called
        foreach (var item in messageQueue.GetConsumingEnumerable())
        {
            IMessage message = item;
            Task.Run(() => ProcessMessage(message);
        }
    }
}
于 2013-03-05T19:06:57.113 回答