4

我正在尝试使用 C# 应用程序尽可能快地处理数字。我使用 aThread.Sleep()来模拟一个处理和随机数。我使用 3 种不同的技术。

这是我使用的测试代码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Test
{
    internal class Program
    {
        private static void Main()
        {
            var data = new int[500000];
            var random = new Random();

            for (int i = 0; i < 500000; i++)
            {
                data[i] = random.Next();
            }

            var partialTimes = new Dictionary<int, double>();
            var iterations = 5;

            for (int i = 1; i < iterations + 1; i++)
            {
                Console.Write($"ProcessData3 {i}\t");
                StartProcessing(data, partialTimes, ProcessData3);
                GC.Collect();
            }

            Console.WriteLine();
            Console.WriteLine("Press Enter to Exit");
            Console.ReadLine();
        }

        private static void StartProcessing(int[] data, Dictionary<int, double> partialTimes, Action<int[], Dictionary<int, double>> processData)
        {
            var stopwatch = Stopwatch.StartNew();

            try
            {
                processData?.Invoke(data, partialTimes);
                stopwatch.Stop();

                Console.WriteLine($"{stopwatch.Elapsed.ToString(@"mm\:ss\:fffffff")} total = {partialTimes.Sum(s => s.Value)} max = {partialTimes.Values.Max()}");
            }
            finally
            {
                partialTimes.Clear();
            }
        }

        private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
        {
            Parallel.ForEach(data, number =>
            {
                var partialStopwatch = Stopwatch.StartNew();

                Thread.Sleep(1);

                partialStopwatch.Stop();

                lock (partialTimes)
                {
                    partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                }
            });
        }

        private static void ProcessData3(int[] data, Dictionary<int, double> partialTimes)
        {
            // Partition the entire source array.
            var rangePartitioner = Partitioner.Create(0, data.Length);

            // Loop over the partitions in parallel.
            Parallel.ForEach(rangePartitioner, (range, loopState) =>
            {
                // Loop over each range element without a delegate invocation.
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    var number = data[i];
                    var partialStopwatch = Stopwatch.StartNew();

                    Thread.Sleep(1);

                    partialStopwatch.Stop();

                    lock (partialTimes)
                    {
                        partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                    }
                }
            });
        }

        private static void ProcessData2(int[] data, Dictionary<int, double> partialTimes)
        {
            var tasks = new Task[data.Count()];
            for (int i = 0; i < data.Count(); i++)
            {
                var number = data[i];

                tasks[i] = Task.Factory.StartNew(() =>
                {
                    var partialStopwatch = Stopwatch.StartNew();

                    Thread.Sleep(1);

                    partialStopwatch.Stop();

                    lock (partialTimes)
                    {
                        partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
                    }
                });
            }

            Task.WaitAll(tasks);
        }
    }
}

对于每种技术,我都会重新启动程序。我得到了这些结果,
有一个Thread.Sleep( 1 )

ProcessData1 1  00:56:1796688 total = 801335,282599955 max = 16,8783
ProcessData1 2  00:23:5390014 total = 816167,642100022 max = 14,5913
ProcessData1 3  00:14:7090566 total = 827589,675899998 max = 13,2617
ProcessData1 4  00:10:8929177 total = 829296,528300007 max = 15,0175
ProcessData1 5  00:10:6333310 total = 839282,123200008 max = 29,2738

ProcessData2 1  00:37:8084153 total = 824507,174200022 max = 112,071
ProcessData2 2  00:16:3762096 total = 849272,47810001  max = 77,1514
ProcessData2 3  00:12:9177717 total = 854012,353100029 max = 67,5684
ProcessData2 4  00:10:4798701 total = 857396,642899983 max = 92,9408
ProcessData2 5  00:09:2206146 total = 870966,655499989 max = 51,8945

ProcessData3 1  01:13:6814541 total = 803581,718699918 max = 25,6815
ProcessData3 2  01:07:9809277 total = 814069,532899922 max = 26,0671
ProcessData3 3  01:07:9857984 total = 814148,329399928 max = 21,3116
ProcessData3 4  01:07:4812183 total = 808042,695499966 max = 16,8601
ProcessData3 5  01:07:2954614 total = 805895,325499903 max = 23,8517

其中是在每个函数中
total花费的总时间,并且是每个函数的最长时间。Parallel.ForEach()
max

为什么第一个循环这么慢?其他尝试怎么可能处理得这么快?如何在第一次尝试时实现更快的并行处理?


编辑:

所以我也试过了,Thread.Sleep( 10 )
结果是:

ProcessData1 1  02:50:2845698 total = 5109831,95429994 max = 12,0612
ProcessData1 2  00:56:3361645 total = 5125884,05919954 max = 12,7666
ProcessData1 3  00:53:4911541 total = 5131105,15209993 max = 12,7486
ProcessData1 4  00:49:5665628 total = 5144654,75829992 max = 13,2678
ProcessData1 5  00:46:0218194 total = 5152955,19509996 max = 13,702

ProcessData2 1  01:21:7207557 total = 5121889,31579983 max = 73,8152
ProcessData2 2  00:39:6660074 total = 5175557,68889969 max = 59,369
ProcessData2 3  00:31:9036416 total = 5193819,89889973 max = 56,2895
ProcessData2 4  00:27:4616803 total = 5207168,56969977 max = 65,5495
ProcessData2 5  00:24:4270755 total = 5222567,9044998  max = 65,368

ProcessData3 1  02:44:9985645 total = 5110117,19019997 max = 11,7172
ProcessData3 2  02:25:6533128 total = 5237779,27010012 max = 26,3171
ProcessData3 3  02:22:2771259 total = 5116123,45259975 max = 12,0581
ProcessData3 4  02:22:1678911 total = 5112574,93779995 max = 11,5334
ProcessData3 5  02:21:9418178 total = 5104980,07120004 max = 11,5583

所以第一个循环仍然比其他循环花费更多的秒数..

4

1 回答 1

4

您看到的行为完全可以通过以下事实来解释:ThreadPool该类延迟创建新线程,直到经过一小段时间(大约 1 秒……多年来一直在变化)。

将仪器添加到一个人的程序中可以提供丰富的信息。在您的示例中,一个非常有用的工具是计算线程池管理的并发线程数,确定“高水位线”(即最终确定的最大线程数),然后使用该数字覆盖线程池的行为。

当我这样做时,我发现在第一种方法的第一次运行时,你会得到大约 25 个线程。但是由于线程池的默认设置是仅创建与计算机上的内核数相等的线程数(在我的情况下为 8 个),因此创建额外的线程可能需要相当长的时间。当然,在此期间,您获得的吞吐量明显低于其他情况(因此您会产生比仅仅 20 秒左右的线程数导致更大的延迟)。

在该测试的后续运行中,最大线程数逐渐增加(因为每次新运行都从线程池中的更多线程开始,从上一次运行开始)高达 53 左右。

如果您事先知道线程池需要多少线程才能有效地执行您的工作,您可以使用该SetMinThreads()方法来增加它将立即按需创建的线程数,然后再切换到受限制的线程创建算法。例如,手头有 53 个线程的高水位标记,您可以将最小线程数设置为该数字(或一个不错的圆形,例如 50)。

当我这样做时,您的第一个测试的所有五次运行,以前需要 25 秒到 1 分钟(当然,更长的运行时间更早),大约需要 19 秒才能完成。

我想强调的是,你应该SetMinThreads()非常小心地使用。一般来说,线程池非常适合管理工作负载。您在上面提供的场景显然只是为了举例而不是现实,但它确实存在一个问题,即您Parallel.ForEach()首先在每次迭代中并没有真正做那么多工作。它似乎不太适合并发,因为大部分时间都花在了开销上。在任何类似的情况下使用SetMinThreads()只是解决一个更隐蔽的潜在问题。

您会发现,如果您定制工作负载以更好地匹配可用资源,并最大限度地减少任务和线程之间的转换,您可以获得良好的吞吐量,而无需覆盖默认线程池数量。


有关此特定测试的其他一些说明……

请注意,如果您将程序更改为在同一会话中运行所有三个测试(每个运行五个),则“第一次运行时间更长”仅发生在第一次测试中。为了将来参考,您应该始终处理这种“第一次较慢”的问题,着眼于测试不同的组合和排序,以验证它是否是一个特定的实现受到影响,或者您是否第一次看到效果测试,无论哪个实现首先运行。有许多实现和平台细节,包括 JIT、线程池、磁盘缓存,它们会影响任何算法的初始运行,您需要确保快速缩小搜索范围以了解您是否正在处理与您自己的算法中的其中一个或一些真正的问题。

顺便说一句,这对你的问题并不重要,但我觉得你选择使用data数组中的随机数作为你的时间字典的键很奇怪。由于随机数的冲突,恕我直言,这些时间值无用。您不会每次都计算(发生碰撞时,只会存储该数字的最后一个实例),这意味着显示的“总”时间少于实际花费的总时间,甚至最大值也不会一定是正确的(如果真正的最大值被使用相同键的后续值覆盖,你会错过它)。


这是我对您的第一个测试的修改版本,它显示了我添加的诊断代码和(注释掉)设置线程池计数以产生更快、更一致的行为的语句:

private static int _threadCount1;
private static int _maxThreadCount1;

private static void ProcessData1(int[] data, Dictionary<int, double> partialTimes)
{
    const int minOverride = 50;
    int minMain, minIOCP, maxMain, maxIOCP;

    ThreadPool.GetMinThreads(out minMain, out minIOCP);
    ThreadPool.GetMaxThreads(out maxMain, out maxIOCP);

    WriteLine($"cores: {Environment.ProcessorCount}");
    WriteLine($"threads: {minMain} min, {maxMain} max");

    // Uncomment two lines below to see uniform behavior across test runs:

    //ThreadPool.SetMinThreads(minOverride, minIOCP);
    //ThreadPool.SetMaxThreads(minOverride, maxIOCP);

    _threadCount1 = _maxThreadCount1 = 0;

    Parallel.ForEach(data, number =>
    {
        int threadCount = Interlocked.Increment(ref _threadCount1);

        var partialStopwatch = Stopwatch.StartNew();

        Thread.Sleep(1);

        partialStopwatch.Stop();

        lock (partialTimes)
        {
            partialTimes[number] = partialStopwatch.Elapsed.TotalMilliseconds;
            if (_maxThreadCount1 < threadCount)
            {
                _maxThreadCount1 = threadCount;
            }
        }

        Interlocked.Decrement(ref _threadCount1);
    });

    ThreadPool.SetMinThreads(minMain, minIOCP);
    ThreadPool.SetMaxThreads(maxMain, maxIOCP);
    WriteLine($"max thread count: {_maxThreadCount1}");
}
于 2017-10-31T19:36:51.543 回答