2

我正在使用 BlockingCollection 处理一些文件,然后将它们上传到服务器。

现在我有一个 Producer 递归文件系统并将某些文件压缩到临时位置。一旦它完成了一个文件,它就会将我自己的对象添加到 BlockingCollection 中,其中包含诸如文件名、文件路径、修改日期等信息。消费者然后抓住这个对象并使用它来上传文件。当 Producer 完成搜索文件系统并处理文件时,它调用 BlockingCollection.CompleteAdding() 方法向 Consumer 发出它已经完成的信号。

我想做的是将生产者的数量增加到2个或更多。原因是压缩过程需要一段时间,而且在多核处理器上我只利用 1 个核心。这会导致生产者有时在更快的网络上落后于消费者。

我的问题是,当我有多个生产者且只有一个消费者时,我如何向消费者发出所有生产者都已完成工作的信号?如果我在其中一个生产者上调用 BlockingCollection.CompleteAdding() 方法,我仍然可以让一个或多个其他生产者仍在工作。

4

2 回答 2

2

Producer调用BlockingCollection.CompleteAdding(). 信号量由所有Producer实例共享,当最后一个生产者完成时,它可以调用该方法。信号量可以实现为一个简单的计数器,在创建生产者时增加计数器,在生产者结束其工作时减少它。如果计数器达到零,则BlockingCollection.CompleteAdding()可以调用。

于 2012-02-08T15:02:24.720 回答
0

我使用这样的东西来拥有多个生产者和消费者。这只是一个非常简单的解决方案,并未针对生产代码进行优化。

public class ManageBatchProcessing 
{
    private  BlockingCollection<Action> blockingCollection;

    public void Process()
    {
        blockingCollection = new BlockingCollection<Action>();
        int numberOfBatches = 10;
        Process(HandleProducers, HandleConsumers, numberOfBatches);
    }

    private void Process(Action<int> produce, Action<int> consume, int numberOfBatches)
    {
        produce(numberOfBatches);
        consume(numberOfBatches);
    }

    private void HandleConsumers(int numberOfBatches)
    {
        var consumers = new List<Task>();

        for (var i = 1; i <= numberOfBatches; i++)
        {
            consumers.Add(Task.Factory.StartNew(() =>
            {
                foreach (var action in blockingCollection.GetConsumingEnumerable())
                {
                    action();
                }
            }));
        }

        Task.WaitAll(consumers.ToArray());
    }

    private void HandleProducers(int numberOfBatches)
    {
        var producers = new List<Task>();

        for (var i = 0; i <= numberOfBatches; i++)
        {
            producers.Add(Task.Factory.StartNew(() =>
            {
                blockingCollection.Add(() => YourProdcerMethod());
            }));
        }

        Task.WaitAll(producers.ToArray());
        blockingCollection.CompleteAdding();
    }
}
于 2014-05-30T12:22:52.100 回答