15

我正在尝试编写一个程序,通过将项目放在来自不同线程的集合中并在一个迭代集合并处理项目的单个线程中清理它们来安排要删除的项目。

在这样做之前,我想知道什么会产生最佳性能,所以我尝试了 ConcurrentBag、ConcurrentStack 和 ConcurrentQueue,并测量了添加 10000000 个项目所需的时间。

我使用以下程序对此进行了测试:

class Program
{
    static List<int> list = new List<int>();
    static ConcurrentBag<int> bag = new ConcurrentBag<int>();
    static ConcurrentStack<int> stack = new ConcurrentStack<int>();
    static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
    static void Main(string[] args)
    {
        run(addList);
        run(addBag);
        run(addStack);
        run(addQueue);
        Console.ReadLine();
    }

    private static void addList(int obj) { lock (list) { list.Add(obj); } }

    private static void addStack(int obj) { stack.Push(obj); }

    private static void addQueue(int obj) { queue.Enqueue(obj); }

    private static void addBag(int obj) { bag.Add(obj); }



    private static void run(Action<int> action)
    {
        Stopwatch stopwatch = Stopwatch.StartNew();
        Parallel.For(0, 10000000, new ParallelOptions() { MaxDegreeOfParallelism = # }, action);
        stopwatch.Stop();
        Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
    }
}

其中 # 是使用的线程数量。

但结果相当混乱:

有 8 个线程:

  • addList 需要 00:00:00.8166816
  • addBag 需要 00:00:01.0368712
  • addStack 需要 00:00:01.0902852
  • addQueue 需要 00:00:00.6555039

有 1 个线程:

  • addList 需要 00:00:00.3880958
  • addBag 需要 00:00:01.5850249
  • addStack 需要 00:00:01.2764924
  • addQueue 需要 00:00:00.4409501

因此,无论有多少线程,似乎只锁定一个普通的旧列表比使用任何并发集合都更快,但如果队列需要处理大量写入,则可能是队列。

编辑:在下面关于垃圾和调试构建的评论之后:是的,这会影响基准。调试构建影响将是线性的,垃圾会随着内存使用量的增加而增加。

然而多次运行相同的测试会得到大致相同的结果。

我将集合的初始化移到了测试运行之前,现在在运行之后收集垃圾,如下所示:

        list = new List<int>();
        run(addList);
        list = null;
        GC.Collect();

将 MaxDegreeOfParallelism 设置为 8,我得到以下结果:

  • addList 需要 00:00:7959546
  • addBag 需要 00:00:01.08023823
  • addStack 需要 00:00:01.1354566
  • addQueue 需要 00:00:00.6597145

每次运行代码时都会有 0.02 秒的偏差。

4

4 回答 4

3

并发收集并​​不总是更快。他们中的更多人只在更高级别的争用中看到性能提升,实际工作量也会产生影响。查看 pfx 团队的这篇论文 :)

http://blogs.msdn.com/b/pfxteam/archive/2010/04/26/9997562.aspx

不过要注意过早的优化。将一些可行的东西放在一起,然后进行优化。特别是因为实际工作量很重要。此外,将锁作为性能瓶颈是非常重要的,通常有一些 io 或其他算法需要更长的时间:)

于 2013-10-11T20:23:22.470 回答
2

不要忘记您不必将项目添加到集合中,但也必须检索它们。所以更公平的比较是基于Monitor的Queue<T>BlockingCollection<T>,每个都有 8 个生产者和 1 个消费者。

然后我在我的机器上得到以下结果(我将迭代次数增加了 10 倍):

  • AddQueue1 占用 00:00:18.0119159
  • AddQueue2 占用 00:00:13.3665991

但有趣的不仅仅是性能。看看这两种方法:检查 Add/ConsumeQueue1 的正确性非常困难,而由于BlockingCollection<T>提供的抽象,很容易看出 Add/ConsumeQueue2 完全按照预期进行。


static Queue<int> queue1 = new Queue<int>();
static BlockingCollection<int> queue2 = new BlockingCollection<int>();

static void Main(string[] args)
{
    Run(AddQueue1, ConsumeQueue1);
    Run(AddQueue2, ConsumeQueue2);
    Console.ReadLine();
}

private static void AddQueue1(int obj)
{
    lock (queue1)
    {
        queue1.Enqueue(obj);
        if (queue1.Count == 1)
            Monitor.Pulse(queue1);
    }
}

private static void ConsumeQueue1()
{
    lock (queue1)
    {
        while (true)
        {
            while (queue1.Count == 0)
                Monitor.Wait(queue1);
            var item = queue1.Dequeue();
            // do something with item
        }
    }
}

private static void AddQueue2(int obj)
{
    queue2.TryAdd(obj);
}

private static void ConsumeQueue2()
{
    foreach (var item in queue2.GetConsumingEnumerable())
    {
        // do something with item
    }
}

private static void Run(Action<int> action, ThreadStart consumer)
{
    new Thread(consumer) { IsBackground = true }.Start();
    Stopwatch stopwatch = Stopwatch.StartNew();
    Parallel.For(0, 100000000, new ParallelOptions() { MaxDegreeOfParallelism = 8 }, action);
    stopwatch.Stop();
    Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
于 2013-10-11T20:22:52.903 回答
1

我想看看添加和获取的性能比较。这是我使用的代码:

class Program
{
    static List<int> list = new List<int>();
    static ConcurrentBag<int> bag = new ConcurrentBag<int>();
    static ConcurrentStack<int> stack = new ConcurrentStack<int>();
    static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
    static void Main(string[] args)
    {
        list = new List<int>();
        run(addList);
        run(takeList);

        list = null;
        GC.Collect();

        bag = new ConcurrentBag<int>();
        run(addBag);
        run(takeBag);

        bag = null;
        GC.Collect();

        stack = new ConcurrentStack<int>();
        run(addStack);
        run(takeStack);

        stack = null;
        GC.Collect();

        queue = new ConcurrentQueue<int>();
        run(addQueue);
        run(takeQueue);

        queue = null;
        GC.Collect();

        Console.ReadLine();
    }

    private static void takeList(int obj)
    {
        lock (list)
        {
            if (list.Count == 0)
                return;

            int output = list[obj];
        }
    }

    private static void takeStack(int obj)
    {
        stack.TryPop(out int output);
    }

    private static void takeQueue(int obj)
    {
        queue.TryDequeue(out int output);
    }

    private static void takeBag(int obj)
    {
        bag.TryTake(out int output);
    }

    private static void addList(int obj) { lock (list) { list.Add(obj); } }

    private static void addStack(int obj) { stack.Push(obj); }

    private static void addQueue(int obj) { queue.Enqueue(obj); }

    private static void addBag(int obj) { bag.Add(obj); }



    private static void run(Action<int> action)
    {
        Stopwatch stopwatch = Stopwatch.StartNew();
        Parallel.For(0, 10000000, new ParallelOptions()
        {
            MaxDegreeOfParallelism = 8
        }, action);
        stopwatch.Stop();
        Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
    }
}

输出是:

  • addList 需要 00:00:00.8875893
  • takeList 需要 00:00:00.7500289
  • addBag 需要 00:00:01.8651759
  • takeBag 需要 00:00:00.5749322
  • addStack 需要 00:00:01.5565545
  • takeStack 占用 00:00:00.3838718
  • addQueue 需要 00:00:00.8861318
  • takeQueue 占用 00:00:01.0510706
于 2017-08-23T23:53:02.920 回答
0

是的,但重点是您需要多个线程的并发性,长时间并发运行以查看平均性能,因为这没有考虑不同集合的锁定策略。

于 2018-07-11T14:37:53.603 回答