15

我不确定 Parallel.ForEach 中本地初始化函数的使用,如 msdn 文章中所述:http: //msdn.microsoft.com/en-us/library/dd997393.aspx

Parallel.ForEach<int, long>(nums, // source collection
   () => 0, // method to initialize the local variable
   (j, loop, subtotal) => // method invoked by the loop on each iteration
   {
      subtotal += nums[j]; //modify local variable 
      return subtotal; // value to be passed to next iteration
   },...

() => 0 如何初始化任何东西?变量的名称是什么,如何在循环逻辑中使用它?

4

4 回答 4

32

参考静态扩展方法的如下重载:Parallel.ForEach

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
    Action<TLocal> localFinally
)

在您的具体示例中

该行:

() => 0, // method to initialize the local variable

只是一个 lambda(匿名函数),它将返回常量整数零。这个 lambda 作为localInit参数传递给Parallel.ForEach- 因为 lambda 返回一个整数,所以它具有类型Func<int>并且类型TLocal可以int由编译器推断(类似地,TSource可以从作为参数传递的集合的类型推断source

然后将返回值 (0) 作为第三个参数(名为subtotal)传递给taskBody Func. 这个 (0) 用作主体循环的初始种子:

(j, loop, subtotal) =>
{
    subtotal += nums[j]; //modify local variable (Bad idea, see comment)
    return subtotal;     // value to be passed to next iteration
}

第二个 lambda(传递给taskBody)被调用 N 次,其中 N 是 TPL 分区器分配给此任务的项目数。

对第二个 lambda 的每个后续调用都taskBody将传递 的新值subTotal,从而有效地计算此任务的运行部分总计。添加分配给此任务的所有项目后,localFinally将再次调用第三个也是最后一个函数参数,并传递返回的最终值subtotalfrom taskBody。因为几个这样的任务将并行运行,所以还需要最后一步来将所有部分总计加到最终的“总”总计中。但是,由于多个并发任务(在不同线程上)可能会争用该grandTotal变量,因此以线程安全的方式对其进行更改非常重要。

(我更改了 MSDN 变量的名称以使其更清晰)

long grandTotal = 0;
Parallel.ForEach(nums,            // source collection
  () => 0,                        // method to initialize the local variable
  (j, loop, subtotal) =>          // method invoked by the loop on each iteration
     subtotal + nums[j],          // value to be passed to next iteration subtotal
  // The final value of subtotal is passed to the localFinally function parameter
  (subtotal) => Interlocked.Add(ref grandTotal, subtotal)

在 MS 示例中,修改任务主体内的参数小计是一种不好的做法,并且没有必要。即代码subtotal += nums[j]; return subtotal;会更好,因为return subtotal + nums[j];它可以缩写为 lambda 速记投影(j, loop, subtotal) => subtotal + nums[j]

一般来说

Parallel.For / Parallel.ForEachlocalInit / body / localFinally重载允许每个任务初始化和清理代码在任务执行之前和之后(分别)运行一次。taskBody

(注意传递给并行的 For range / Enumerable For/Foreach将被划分为批次IEnumerable<>,每个批次都将分配一个任务)

每个 Task中,localInit都会被调用一次,body代码会被重复调用一次,每项批量调用一次(0..N次),localFinally完成后调用一次。

此外,您可以通过来自的通用返回值传递任务期间所需的任何状态(即到taskBody和委托) - 我在下面调用了这个变量。localFinallyTLocallocalInit FunctaskLocals

“localInit”的常见用法:

  • 创建和初始化循环体所需的昂贵资源,例如数据库连接或 Web 服务连接。
  • 保留任务局部变量以保存(无竞争的)运行总计或集合
  • localInit如果您需要从taskBodyand返回多个对象localFinally,您可以使用强类型类 a ,Tuple<,,>或者,如果您只使用 lambdas localInit / taskBody / localFinally,您也可以通过匿名类传递数据。请注意,如果您使用 return fromlocalInit在多个任务之间共享一个引用类型,您将需要考虑此对象的线程安全性 - 不可变性更可取。

“localFinally”动作的常见用法:

  • 释放资源,例如IDisposablestaskLocals(例如数据库连接、文件句柄、Web 服务客户端等)中使用的资源
  • 将每个任务完成的工作聚合/组合/减少回共享变量。这些共享变量将被争用,因此线程安全是一个问题:
    • 例如Interlocked.Increment对于像整数这样的原始类型
    • lock写操作需要或类似的
    • 利用并发集合来节省时间和精力。

taskBodytight循环操作的一部分 - 您需要优化它以提高性能。

这一切都最好用一个评论的例子来总结:

public void MyParallelizedMethod()
{
    // Shared variable. Not thread safe
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task.
    () => 
    {
       // Local `task` variables have no contention 
       // since each Task can never run by multiple threads concurrently
       var sqlConnection = new SqlConnection("connstring...");
       sqlConnection.Open();

       // This is the `task local` state we wish to carry for the duration of the task
       return new 
       { 
          Conn = sqlConnection,
          RunningTotal = 0
       }
    },
    // Task Body. Invoked once per item in the batch assigned to this task
    (item, loopState, taskLocals) =>
    {
      // ... Do some fancy Sql work here on our task's independent connection
      using(var command = taskLocals.Conn.CreateCommand())
      using(var reader = command.ExecuteReader(...))
      {
        if (reader.Read())
        {
           // No contention for `taskLocal`
           taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
        }
      }
      // The same type of our `taskLocal` param must be returned from the body
      return taskLocals;
    },
    // LocalFinally called once per Task after body completes
    // Also takes the taskLocal
    (taskLocals) =>
    {
       // Any cleanup work on our Task Locals (as you would do in a `finally` scope)
       if (taskLocals.Conn != null)
         taskLocals.Conn.Dispose();

       // Do any reduce / aggregate / synchronisation work.
       // NB : There is contention here!
       Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
    }

还有更多例子:

每个任务的非竞争字典示例

每个任务的数据库连接示例

于 2015-06-25T07:46:52.063 回答
7

作为@Honza Brestan答案的扩展。Parallel foreach 将工作拆分为任务的方式也很重要,它将多个循环迭代分组到一个任务中,因此实际上localInit()每 n 次循环迭代调用一次,并且可以同时启动多个组。

localInita and的要点localFinally是确保并行 foreach 循环可以将每次迭代的结果组合成单个结果,而无需在 中指定锁定语句,body为此,您必须为要创建的值提供初始化 ( localInit)然后每次body迭代都可以处理本地值,然后您提供一种方法以localFinally线程安全的方式组合来自每个组 () 的值。

如果您不需要 localInit 来同步任务,则可以使用 lambda 方法照常引用周围上下文中的值,而不会出现任何问题。有关使用 localInit/Finally 的更深入教程,请参阅C# 中的线程(Parallel.For 和 Parallel.ForEach),并向下滚动到使用本地值优化,Joseph Albahari 确实是我所有线程的 goto 源。

于 2013-02-12T12:15:18.063 回答
2

您可以在 MSDN 上获得正确Parallel.ForEach重载的提示。

localInit 委托为参与循环执行的每个线程调用一次,并返回每个任务的初始本地状态。这些初始状态被传递给每个任务的第一个主体调用。然后,每个后续的主体调用都会返回一个可能已修改的状态值,该值将传递给下一个主体调用。

在您的示例() => 0中,委托只是返回0,因此该值用于每个任务的第一次迭代。

于 2013-02-12T11:24:24.923 回答
0

从我这边来看一个更简单的例子

class Program
{
    class Person
    {
        public int Id { get; set; }
        public string Name { get; set; }
        public int Age { get; set; }
    }

    static List<Person> GetPerson() => new List<Person>()
    {
        new Person() { Id = 0, Name = "Artur", Age = 26 },
        new Person() { Id = 1, Name = "Edward", Age = 30 },
        new Person() { Id = 2, Name = "Krzysiek", Age = 67 },
        new Person() { Id = 3, Name = "Piotr", Age = 23 },
        new Person() { Id = 4, Name = "Adam", Age = 11 },
    };

    static void Main(string[] args)
    {
        List<Person> persons = GetPerson();
        int ageTotal = 0;

        Parallel.ForEach
        (
            persons,
            () => 0,
            (person, loopState, subtotal) => subtotal + person.Age,
            (subtotal) => Interlocked.Add(ref ageTotal, subtotal)
        );

        Console.WriteLine($"Age total: {ageTotal}");
        Console.ReadKey();
    }
}
于 2018-02-06T10:58:38.010 回答