6

我根据this question中的代码采用了并行/消费者的实现

class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel, Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),
                        new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
                        (item, loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },
                _tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

这是一个测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试总是失败 - 它总是停留在 100 项测试中的第 93 项左右。知道我的代码的哪一部分导致了这个问题,以及如何解决它?

4

2 回答 2

8

正如您所发现的,您不能使用Parallel.Foreach()with 。BlockingCollection.GetConsumingEnumerable()

有关解释,请参阅此博客文章:

https://devblogs.microsoft.com/pfxteam/parallelextensionsextras-tour-4-blockingcollectionextensions/

摘自博客:

BlockingCollection 的 GetConsumingEnumerable 实现是使用 BlockingCollection 的内部同步,它已经支持多个消费者并发,但是 ForEach 不知道,它的可枚举分区逻辑在访问可枚举时也需要加锁。

因此,这里的同步比实际需要的要多,从而导致潜在的不可忽略的性能损失。

[另外] Parallel.ForEach 和 PLINQ 默认使用的分区算法使用分块来最小化同步成本:不是每个元素获取一次锁,而是获取锁,获取一组元素(一个块) , 然后释放锁。

虽然这种设计有助于提高整体吞吐量,但对于更关注低延迟的场景,这种分块可能会令人望而却步。

该博客还提供了一种方法的源代码GetConsumingPartitioner(),您可以使用它来解决问题。

public static class BlockingCollectionExtensions
{

    public static Partitioner<T> GetConsumingPartitioner<T>(this BlockingCollection<T> collection)
    {
        return new BlockingCollectionPartitioner<T>(collection);
    }


    public class BlockingCollectionPartitioner<T> : Partitioner<T>
    {
        private BlockingCollection<T> _collection;

        internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
        {
            if (collection == null)
                throw new ArgumentNullException("collection");

            _collection = collection;
        }

        public override bool SupportsDynamicPartitions
        {
            get { return true; }
        }

        public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
        {
            if (partitionCount < 1)
                throw new ArgumentOutOfRangeException("partitionCount");

            var dynamicPartitioner = GetDynamicPartitions();
            return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
        }

        public override IEnumerable<T> GetDynamicPartitions()
        {
            return _collection.GetConsumingEnumerable();
        }

    }
}
于 2013-07-24T08:21:42.423 回答
2

失败的原因是因为这里解释的以下原因

Parallel.ForEach 和 PLINQ 默认使用的分区算法使用分块来最小化同步成本:不是每个元素获取一次锁,而是获取锁,获取一组元素(一个块),然后释放锁。

为了让它工作,你可以在你的ParallelConsumer<T>类上添加一个方法来指示添加完成,如下所示

    public void StopAdding()
    {
        _entries.CompleteAdding();
    }

现在在你之后调用这个方法for loop,如下

        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        consumer.StopAdding();

否则,Parallel.ForEach()将等待达到阈值以便抓取块并开始处理。

于 2013-07-24T09:09:54.533 回答