7

我正在使用 Parallel.For 循环来提高计算的执行速度。

我想测量剩下的计算时间。通常,只需测量每个步骤所需的时间,并通过将步骤时间乘以总步骤数来估计总时间。

例如,如果有 100 个步骤,并且某个步骤需要 5 秒,那么除了总时间约为 500 秒之外,其他步骤也可以。(可以平均几个步骤并不断向用户报告这是我想要做的)。

我能想到的唯一方法是使用外部 for 循环,该循环基本上通过拆分 parallel.for 间隔并测量每个循环来恢复原始方式。

for(i;n;i += step)
    Time(Parallel.For(i, i + step - 1, ...))

一般来说,这不是一个很好的方法,因为一些非常长的步骤或大量的短步骤都会导致计时问题。

有人有想法么?

(请意识到我需要实时估计并行完成所花费的时间,而不是总时间。我想让用户知道执行中还剩多少时间)。

4

6 回答 6

4

这种方法似乎很有效。我们可以通过简单地让每个并行循环增加一个计数器来“线性化”并行 for 循环:

Parallel.For(0, n, (i) => { Thread.Sleep(1000); Interlocked.Increment(ref cnt); });     

(注意,感谢 Niclas,这++不是原子的,必须使用lockor Interlocked.Increment

每个并行运行的循环都会递增cnt。效果是cnt单调增加到n,并且cnt/n是 for 完成的百分比。由于没有争用cnt,因此不存在并发问题,并且非常快速且非常准确。

For我们可以通过简单的计算来测量执行过程中任何时候并行循环的完成百分比cnt/n

总计算时间可以很容易地通过将循环开始以来经过的时间除以循环所处的百分比来估算。当每个循环花费大致相同的时间量时,这两个量应该具有大致相同的变化率,并且表现相对良好(也可以平均小波动)。

显然,每个任务越不可预测,剩余计算时间就越不准确。这是意料之中的,一般来说,没有解决方案(这就是为什么它被称为近似值)。我们仍然可以完全准确地获得经过的计算时间或百分比。

“剩余时间”算法的任何估计的基本假设是每个子任务花费大约相同的计算时间(假设一个想要线性结果)。例如,如果我们有一个并行方法,其中 99 个任务非常快,1 个任务非常慢,我们的估计将非常不准确。我们的计数器会很快拉到 99,然后坐在最后一个百分比,直到缓慢的任务完成。我们可以线性插值并进行进一步的估计以获得更平滑的倒计时,但最终会有一个突破点。

以下代码演示了如何有效地测量并行度。注意 100% 的时间是真正的总执行时间,可以作为参考。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ParallelForTiming
{
    class Program
    {       
        static void Main(string[] args)
        {
            var sw = new Stopwatch();
            var pct = 0.000001;
            var iter = 20;
            var time = 20 * 1000 / iter;
            var p = new ParallelOptions(); p.MaxDegreeOfParallelism = 4;

            var Done = false;
            Parallel.Invoke(() =>
            {
                sw.Start();
                Parallel.For(0, iter, p, (i) => { Thread.Sleep(time); lock(p) { pct += 1 / (double)iter; }});               
                sw.Stop(); 
                Done = true;

            }, () =>
            {
                while (!Done)
                {
                    Console.WriteLine(Math.Round(pct*100,2) + " : " + ((pct < 0.1) ? "oo" : (sw.ElapsedMilliseconds / pct /1000.0).ToString()));
                    Thread.Sleep(2000);
                }

            }
            );
            Console.WriteLine(Math.Round(pct * 100, 2) + " : " + sw.ElapsedMilliseconds / pct / 1000.0);


            Console.ReadKey();
        }
    }
}
于 2012-10-15T21:16:34.557 回答
1

这是衡量所有先前完成任务的平均值的可能解决方案。每个任务完成后,Action<T>都会调用一个,您可以在其中总结所有时间并将其除以完成的总任务。然而,这只是当前状态,无法预测任何未来的任务/平均值。(正如其他人提到的,这非常困难)

但是:您必须衡量它是否适合您的问题,因为在方法级别声明的变量上都有可能发生锁争用。

     static void ComputeParallelForWithTLS()
            {
                var collection = new List<int>() { 1000, 2000, 3000, 4000 }; // values used as sleep parameter
                var sync = new object();
                TimeSpan averageTime = new TimeSpan();
                int amountOfItemsDone = 0; // referenced by the TPL, increment it with lock / interlocked.increment

                Parallel.For(0, collection.Count,
                    () => new TimeSpan(),
                    (i, loopState, tlData) =>
                    {
                        var sw = Stopwatch.StartNew();
                        DoWork(collection, i);
                        sw.Stop();
                        return sw.Elapsed;
                    },
                    threadLocalData =>   // Called each time a task finishes
                    {
                        lock (sync)
                        {
                            averageTime += threadLocalData; // add time used for this task to the total.
                        }
                        Interlocked.Increment(ref amountOfItemsDone); // increment the tasks done
                        Console.WriteLine(averageTime.TotalMilliseconds / amountOfItemsDone + ms."); 
/*print out the average for all done tasks so far. For an estimation, 
multiply with the remaining items.*/
                    });
            }
            static void DoWork(List<int> items, int current)
            {
                System.Threading.Thread.Sleep(items[current]);
            }
于 2012-10-15T12:32:11.793 回答
1

这几乎是不可能回答的。

首先,不清楚所有步骤的作用。有些步骤可能是 I/O 密集型或计算密集型的。

此外,Parallel.For 是一个请求——您不确定您的代码是否会真正并行运行。代码是否真正并行运行取决于环境(线程和内存的可用性)。然后,如果您有依赖 I/O 的并行代码,则一个线程将在等待 I/O 完成时阻塞其他线程。而且您也不知道其他进程在做什么。

这就是为什么预测某件事需要多长时间非常容易出错,实际上,这是一种徒劳的练习。

于 2012-10-15T11:42:37.967 回答
1

这个问题很难回答。您提到的使用非常长的步骤或大量非常短的步骤的时间问题可能与您的循环将在并行分区器可以处理的边缘工作有关。

由于默认分区器是非常动态的,而且我们对您的实际问题一无所知,因此没有好的答案可以让您解决手头的问题,同时仍然可以通过动态负载平衡获得并行执行的好处。

如果实现对预计运行时间的可靠估计非常重要,也许您可​​以设置一个自定义分区器,然后利用您对分区的了解从一个线程上的几个块推断时间。

于 2012-10-15T11:58:42.620 回答
0

我建议在完成后在每个步骤报告中执行该方法。当然,这对于线程安全来说有点棘手,因此在实现时要记住这一点。这将使您可以跟踪总数中已完成任务的数量,并且还可以(某种程度上)轻松了解每个单独步骤所花费的时间,这对于删除异常值等很有用。

编辑:一些代码来展示这个想法

Parallel.For(startIdx, endIdx, idx => {
    var sw = Stopwatch.StartNew();
    DoCalculation(idx);
    sw.Stop();
    var dur = sw.Elapsed;
    ReportFinished(idx, dur);
});

这里的关键是ReportFinished它将为您提供有关已完成任务数量以及每个任务持续时间的连续信息。这使您可以通过对此数据进行统计来更好地猜测剩余时间。

于 2012-10-15T11:36:28.560 回答
0

在这里,我写了测量时间和速度的课程

public static class Counter
{
    private static long _seriesProcessedItems = 0;
    private static long _totalProcessedItems = 0;
    private static TimeSpan _totalTime = TimeSpan.Zero;
    private static DateTime _operationStartTime;
    private static object _lock = new object();
    private static int _numberOfCurrentOperations = 0;



    public static void StartAsyncOperation()
    {
        lock (_lock)
        {
            if (_numberOfCurrentOperations == 0)
            {
                _operationStartTime = DateTime.Now;   
            }

            _numberOfCurrentOperations++;
        }
    }

    public static void EndAsyncOperation(int itemsProcessed)
    {
        lock (_lock)
        {
            _numberOfCurrentOperations--;
            if (_numberOfCurrentOperations < 0) 
                throw new InvalidOperationException("EndAsyncOperation without StartAsyncOperation");

            _seriesProcessedItems +=itemsProcessed;

            if (_numberOfCurrentOperations == 0)
            {
                _totalProcessedItems += _seriesProcessedItems;
                _totalTime += DateTime.Now - _operationStartTime;
                _seriesProcessedItems = 0;
            }
        }
    }

    public static double GetAvgSpeed()
    {
        if (_totalProcessedItems == 0) throw new InvalidOperationException("_totalProcessedItems is zero");
        if (_totalProcessedItems == 0) throw new InvalidOperationException("_totalTime is zero");
        return _totalProcessedItems / (double)_totalTime.TotalMilliseconds;
    }

    public static void Reset()
    {
        _totalProcessedItems = 0;
        _totalTime = TimeSpan.Zero;
    }
}

使用和测试示例:

    static void Main(string[] args)
    {
        var st = Stopwatch.StartNew();
        Parallel.For(0, 100, _ =>
        {
            Counter.StartAsyncOperation();
            Thread.Sleep(100);
            Counter.EndAsyncOperation(1);
        });

        st.Stop();
        Console.WriteLine("Speed correct {0}", 100 / (double)st.ElapsedMilliseconds);

        Console.WriteLine("Speed to test {0}", Counter.GetAvgSpeed());
    }
于 2014-02-11T22:10:36.250 回答