4

我正在寻找实现一个生产者多消费者多线程应用程序的最佳方案。目前我正在使用一个队列来共享缓冲区,但它比一个生产者一个消费者的情况慢得多。我打算这样做:

Queue<item>[] buffs = new Queue<item>[N];
object[] _locks = new object[N];
static void Produce()
{
    int curIndex = 0;
    while(true)
    {
        // Produce item;
        lock(_locks[curIndex])
        {
            buffs[curIndex].Enqueue(curItem);
            Monitor.Pulse(_locks[curIndex]);
        }
        curIndex = (curIndex+1)%N;
    }
}

static void Consume(int myIndex)
{
    item curItem;
    while(true)
    {
        lock(_locks[myIndex])
        {
            while(buffs[myIndex].Count == 0)
                Monitor.Wait(_locks[myIndex]);
            curItem = buffs[myIndex].Dequeue();
        }
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}
4

3 回答 3

6

使用BlockingCollection

BlockingCollection<item> _buffer = new BlockingCollection<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        _buffer.Add(curItem);
    }

    // eventually stop producing
    _buffer.CompleteAdding();
}

static void Consume(int myIndex)
{
    foreach (var curItem in _buffer.GetConsumingEnumerable())
    {
        // Consume item;
    }
}

static void main()
{
    int N = 100;
    Thread[] consumers = new Thread[N];
    for(int i = 0; i < N; i++)
    {
        consumers[i] = new Thread(Consume);
        consumers[i].Start(i);
    }
    Thread producer = new Thread(Produce);
    producer.Start();
}

如果您不想从一开始就指定线程数,则可以改用 Parallel.ForEach。

static void Consume(item curItem)
{
    // consume item
}

void Main()
{
    Thread producer = new Thread(Produce);
    producer.Start();

    Parallel.ForEach(_buffer.GetConsumingPartitioner(), Consumer)
}
于 2013-10-02T14:05:20.407 回答
1

使用更多线程无济于事。它甚至可能会降低性能。我建议您尝试使用ThreadPool每个工作项都是由生产者创建的一项。但是,这并不能保证所生产的物品按照生产顺序被消费。


另一种方法可能是将消费者的数量减少到 4 个,例如并修改它们的工作方式,如下所示:

生产者将新工作添加到队列中。所有工作线程只有一个全局队列。然后它设置一个标志来指示有这样的新工作:

ManualResetEvent workPresent = new ManualResetEvent(false);
Queue<item> workQueue = new Queue<item>();

static void Produce()
{
    while(true)
    {
        // Produce item;
        lock(workQueue)
        {
            workQueue.Enqueue(newItem);
            workPresent.Set();
        }
    }
}

消费者等待工作被添加到队列中。只有一个消费者可以完成它的工作。然后它从队列中取出所有工作并重置标志。在完成之前,制作人将无法添加新作品。

static void Consume()
{
    while(true)
    {
        if (WaitHandle.WaitOne(workPresent))
        {
            workPresent.Reset();

            Queue<item> localWorkQueue = new Queue<item>();
            lock(workQueue)
            {
                while (workQueue.Count > 0)
                    localWorkQueue.Enqueue(workQueue.Dequeue());
            }

            // Handle items in local work queue
            ...
        }
    }
}    

然而,这个结果有点不可预测。可能是一个线程正在做所有的工作,而其他线程什么也不做。

于 2013-10-02T07:44:53.880 回答
0

我不明白为什么你必须使用多个队列。只需减少锁定量。这是一个示例,您可以在其中拥有大量消费者,他们都在等待新工作。

public class MyWorkGenerator
{
    ConcurrentQueue<object> _queuedItems = new ConcurrentQueue<object>();
    private object _lock = new object();

    public void Produce()
    {
        while (true)
        {
            _queuedItems.Enqueue(new object());
            Monitor.Pulse(_lock);
        }
    }

    public object Consume(TimeSpan maxWaitTime)
    {
        if (!Monitor.Wait(_lock, maxWaitTime))
            return null;

        object workItem;
        if (_queuedItems.TryDequeue(out workItem))
        {
            return workItem;
        }

        return null;
    }

}

请注意,Pulse()一次只会触发一个消费者。

示例用法:

    static void main()
    {
        var generator = new MyWorkGenerator();

        var consumers = new Thread[20];
        for (int i = 0; i < consumers.Length; i++)
        {
            consumers[i] = new Thread(DoWork);
            consumers[i].Start(generator);
        }

        generator.Produce();
    }

    public static void DoWork(object state)
    {
        var generator = (MyWorkGenerator) state;

        var workItem = generator.Consume(TimeSpan.FromHours(1));
        while (workItem != null)
        {
            // do work


            workItem = generator.Consume(TimeSpan.FromHours(1));
        }
    }

请注意,实际队列隐藏在生产者中,因为它是实现细节。消费者实际上并不需要知道工作项是如何生成的。

于 2013-10-02T14:16:29.693 回答