我有一组字符串,我需要对其执行两个操作。
其中第一个可以安全地以任何顺序独立处理(耶),但输出必须按原始顺序顺序处理(boo)。
以下 Plinq 让我大部分时间到达那里:
myStrings.AsParallel().AsOrdered()
.Select( str => Operation1(str) )
.AsSequential()
.Select( str => Operation2(str) );
//immagine Operation2() maintains some sort of state and must take the outputs from Operation1 in the original order
这让我很顺利,但问题是由于 AsOrdered(),Operation1 首先在每个字符串上执行,然后将结果元素排序回原来的顺序,最后 Operation2 开始执行。
理想情况下,只要 Operation1 调用返回第一个字符串(即 myStrings[0],而不是返回的第一个),我希望 Operation2 开始它的工作。
所以这是我尝试一般地解决这个问题:
public static class ParallelHelper
{
public static IEnumerable<U> SelectAsOrdered<T, U>(this ParallelQuery<T> query, Func<T, U> func)
{
var completedTasks = new Dictionary<int, U>();
var queryWithIndexes = query.Select((x, y) => new { Input = x, Index = y })
.AsParallel()
.Select(t => new { Value = func(t.Input), Index = t.Index })
.WithMergeOptions(ParallelMergeOptions.NotBuffered);
int i = 0;
foreach (var task in queryWithIndexes)
{
if (i==task.Index)
{
Console.WriteLine("immediately yielding task: {0}", i);
i++;
yield return task.Value;
U previouslyCompletedTask;
while (completedTasks.TryGetValue(i, out previouslyCompletedTask))
{
completedTasks.Remove(i);
Console.WriteLine("delayed yielding task: {0}", i);
yield return previouslyCompletedTask;
i++;
}
}
else
{
completedTasks.Add(task.Index, task.Value);
}
}
yield break;
}
}
然后我可以将我的原始代码块重写为:
myStrings.AsParallel()
.SelectAsOrdered( str => Operation1(str) )
.Select(str => Operation2(str));
一旦 myStrings[0] 从 Operation1 出来,Operation2 就会启动。
我想知道的是:
- 这是并行化中相当常见的问题/模式,我是否错过了在 .Net 框架中开箱即用的东西?或者有没有更简单的方法?
- 虽然上述扩展方法似乎可以完成这项工作,但如何改进呢?代码中的任何内容看起来都是个坏主意吗?
谢谢!
安迪
以防万一您感兴趣:
如果没有对 .WithMergeOptions(ParallelMergeOptions.NotBuffered) 的调用,Operation2 直到所有 Operation1 调用都开始后才开始工作(这比等到它们全部完成的原始代码要好)。
现实生活中的问题:
Operation1 正在大量文本(例如:“children act 1989”)中搜索合法引用和参考。
这些参考文献通常是独立的,但有时抄本会包含类似“前面提到的法案的第 6 节”之类的内容。Operation2 依靠来自 Operation1 的捕获来获取这些部分引用。