我的团队也刚刚完成了一本关于这方面的书......
使用 Microsoft® .NET 进行并行编程:
多核架构上的分解和协调设计模式
科林·坎贝尔、拉尔夫·约翰逊、阿德·米勒和斯蒂芬·图布。托尼·嘿的前言
您可以在此处下载草稿和示例:http: //parallelpatterns.codeplex.com/
整本书将于本月晚些时候在 MSDN 上和 10 月在亚马逊上提供。
为公然的插件道歉,但我认为您可能会发现这些内容真的很有帮助。
更新...
为了回答您选择的问题(如下),聚合的实现(从列表创建基于列表内容的聚合)恰好比替代方案 Parallel.ForEach 更好地映射到 PLinq。PDF 的 p72 上有一个使用 Parallel.ForEach 聚合的示例。
如果您只是想使用 PLinq 更新集合的内容,则映射要简单得多。例如,以下代码循环遍历帐户列表,计算余额趋势并标记预测余额超过其透支限制的帐户。
顺序:
static void UpdatePredictionsSequential(AccountRepository accounts)
{
foreach (Account account in accounts.AllAccounts)
{
Trend trend = SampleUtilities.Fit(account.Balance);
double prediction = trend.Predict(account.Balance.Length + NumberOfMonths);
account.SeqPrediction = prediction;
account.SeqWarning = prediction < account.Overdraft;
}
}
普林克:
static void UpdatePredictionsPlinq(AccountRepository accounts)
{
accounts.AllAccounts
.AsParallel()
.ForAll(account =>
{
Trend trend = SampleUtilities.Fit(account.Balance);
double prediction = trend.Predict(account.Balance.Length + NumberOfMonths);
account.PlinqPrediction = prediction;
account.PlinqWarning = prediction < account.Overdraft;
});
}
Parallel.ForEach:
static void UpdatePredictionsParallel(AccountRepository accounts)
{
Parallel.ForEach(accounts.AllAccounts, account =>
{
Trend trend = SampleUtilities.Fit(account.Balance);
double prediction = trend.Predict(account.Balance.Length + NumberOfMonths);
account.ParPrediction = prediction;
account.ParWarning = prediction < account.Overdraft;
});
}
在某些情况下,PLinq 可能是最具表现力的选择。在其他情况下,Parallel.For/ForEach 可能更好,在某些情况下,这在很大程度上取决于程序员的偏好。
但是,Task Parallel Library 支持的不仅仅是 Parallel Loop 和 Aggregation 模式。它允许您构建将被安排在多核硬件上并行执行的任务(任务并行模式):
static int ParallelTaskImageProcessing(Bitmap source1, Bitmap source2,
Bitmap layer1, Bitmap layer2, Graphics blender)
{
Task toGray = Task.Factory.StartNew(() => SetToGray(source1, layer1));
Task rotate = Task.Factory.StartNew(() => Rotate(source2, layer2));
Task.WaitAll(toGray, rotate);
Blend(layer1, layer2, blender);
return source1.Width;
}
它允许您创建任务图,其中一个任务的输出被馈送到另一个任务(任务图或期货模式):
public static int Example4()
{
var a = 22;
var cf = Task<int>.Factory.StartNew(() => F2(a));
var df = cf.ContinueWith((t) => F3(t.Result));
var b = F1(a);
var f = F4(b, df.Result);
return f;
}
其中 F1-F4 是输入和输出具有依赖性的函数。
它支持创建依赖任务树,用于分治问题,如排序(动态任务并行模式):
static void ParallelWalk<T>(Tree<T> tree, Action<T> action)
{
if (tree == null) return;
var t1 = Task.Factory.StartNew(
() => action(tree.Data));
var t2 = Task.Factory.StartNew(
() => ParallelWalk(tree.Left, action));
var t3 = Task.Factory.StartNew(
() => ParallelWalk(tree.Right, action));
Task.WaitAll(t1, t2, t3);
}
它还实现了几个(线程安全的)集合以在并行程序中使用。例如,它允许直接实现流水线模式:
static void Chapter7Example01Pipeline(int seed)
{
Console.Write("Begin Pipelined Sentence Builder");
var buffer1 = new BlockingCollection<string>(BufferSize);
var buffer2 = new BlockingCollection<string>(BufferSize);
var buffer3 = new BlockingCollection<string>(BufferSize);
var f = new TaskFactory(TaskCreationOptions.LongRunning,
TaskContinuationOptions.None);
var stage1 = f.StartNew(() => ReadStrings(buffer1, seed));
var stage2 = f.StartNew(() => CorrectCase(buffer1, buffer2));
var stage3 = f.StartNew(() => CreateSentences(buffer2, buffer3));
var stage4 = f.StartNew(() => WriteSentences(buffer3));
Task.WaitAll(stage1, stage2, stage3, stage4);
}
上述所有功能还包含对异常处理和取消的支持,尽管为了清楚起见,我没有在这里展示它们。