参考静态扩展方法的如下重载:Parallel.ForEach
public static ParallelLoopResult ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> localInit,
Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody,
Action<TLocal> localFinally
)
在您的具体示例中
该行:
() => 0, // method to initialize the local variable
只是一个 lambda(匿名函数),它将返回常量整数零。这个 lambda 作为localInit
参数传递给Parallel.ForEach
- 因为 lambda 返回一个整数,所以它具有类型Func<int>
并且类型TLocal
可以int
由编译器推断(类似地,TSource
可以从作为参数传递的集合的类型推断source
)
然后将返回值 (0) 作为第三个参数(名为subtotal
)传递给taskBody
Func
. 这个 (0) 用作主体循环的初始种子:
(j, loop, subtotal) =>
{
subtotal += nums[j]; //modify local variable (Bad idea, see comment)
return subtotal; // value to be passed to next iteration
}
第二个 lambda(传递给taskBody
)被调用 N 次,其中 N 是 TPL 分区器分配给此任务的项目数。
对第二个 lambda 的每个后续调用都taskBody
将传递 的新值subTotal
,从而有效地计算此任务的运行部分总计。添加分配给此任务的所有项目后,localFinally
将再次调用第三个也是最后一个函数参数,并传递返回的最终值subtotal
from taskBody
。因为几个这样的任务将并行运行,所以还需要最后一步来将所有部分总计加到最终的“总”总计中。但是,由于多个并发任务(在不同线程上)可能会争用该grandTotal
变量,因此以线程安全的方式对其进行更改非常重要。
(我更改了 MSDN 变量的名称以使其更清晰)
long grandTotal = 0;
Parallel.ForEach(nums, // source collection
() => 0, // method to initialize the local variable
(j, loop, subtotal) => // method invoked by the loop on each iteration
subtotal + nums[j], // value to be passed to next iteration subtotal
// The final value of subtotal is passed to the localFinally function parameter
(subtotal) => Interlocked.Add(ref grandTotal, subtotal)
在 MS 示例中,修改任务主体内的参数小计是一种不好的做法,并且没有必要。即代码subtotal += nums[j]; return subtotal;
会更好,因为return subtotal + nums[j];
它可以缩写为 lambda 速记投影(j, loop, subtotal) => subtotal + nums[j]
一般来说
Parallel.For / Parallel.ForEach的localInit / body / localFinally
重载允许每个任务初始化和清理代码在任务执行之前和之后(分别)运行一次。taskBody
(注意传递给并行的 For range / Enumerable For
/Foreach
将被划分为批次IEnumerable<>
,每个批次都将分配一个任务)
在每个 Task中,localInit
都会被调用一次,body
代码会被重复调用一次,每项批量调用一次(0..N
次),localFinally
完成后调用一次。
此外,您可以通过来自的通用返回值传递任务期间所需的任何状态(即到taskBody
和委托) - 我在下面调用了这个变量。localFinally
TLocal
localInit Func
taskLocals
“localInit”的常见用法:
- 创建和初始化循环体所需的昂贵资源,例如数据库连接或 Web 服务连接。
- 保留任务局部变量以保存(无竞争的)运行总计或集合
localInit
如果您需要从taskBody
and返回多个对象localFinally
,您可以使用强类型类 a ,Tuple<,,>
或者,如果您只使用 lambdas localInit / taskBody / localFinally
,您也可以通过匿名类传递数据。请注意,如果您使用 return fromlocalInit
在多个任务之间共享一个引用类型,您将需要考虑此对象的线程安全性 - 不可变性更可取。
“localFinally”动作的常见用法:
- 释放资源,例如
IDisposables
在taskLocals
(例如数据库连接、文件句柄、Web 服务客户端等)中使用的资源
- 将每个任务完成的工作聚合/组合/减少回共享变量。这些共享变量将被争用,因此线程安全是一个问题:
- 例如
Interlocked.Increment
对于像整数这样的原始类型
lock
写操作需要或类似的
- 利用并发集合来节省时间和精力。
这taskBody
是tight
循环操作的一部分 - 您需要优化它以提高性能。
这一切都最好用一个评论的例子来总结:
public void MyParallelizedMethod()
{
// Shared variable. Not thread safe
var itemCount = 0;
Parallel.For(myEnumerable,
// localInit - called once per Task.
() =>
{
// Local `task` variables have no contention
// since each Task can never run by multiple threads concurrently
var sqlConnection = new SqlConnection("connstring...");
sqlConnection.Open();
// This is the `task local` state we wish to carry for the duration of the task
return new
{
Conn = sqlConnection,
RunningTotal = 0
}
},
// Task Body. Invoked once per item in the batch assigned to this task
(item, loopState, taskLocals) =>
{
// ... Do some fancy Sql work here on our task's independent connection
using(var command = taskLocals.Conn.CreateCommand())
using(var reader = command.ExecuteReader(...))
{
if (reader.Read())
{
// No contention for `taskLocal`
taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]);
}
}
// The same type of our `taskLocal` param must be returned from the body
return taskLocals;
},
// LocalFinally called once per Task after body completes
// Also takes the taskLocal
(taskLocals) =>
{
// Any cleanup work on our Task Locals (as you would do in a `finally` scope)
if (taskLocals.Conn != null)
taskLocals.Conn.Dispose();
// Do any reduce / aggregate / synchronisation work.
// NB : There is contention here!
Interlocked.Add(ref itemCount, taskLocals.RunningTotal);
}
还有更多例子:
每个任务的非竞争字典示例
每个任务的数据库连接示例