5

我有一组字符串,我需要对其执行两个操作。

其中第一个可以安全地以任何顺序独立处理(耶),但输出必须按原始顺序顺序处理(boo)。

以下 Plinq 让我大部分时间到达那里:

myStrings.AsParallel().AsOrdered()
         .Select( str => Operation1(str) )
         .AsSequential()
         .Select( str => Operation2(str) );
//immagine Operation2() maintains some sort of state and must take the outputs from Operation1 in the original order    

这让我很顺利,但问题是由于 AsOrdered(),Operation1 首先在每个字符串上执行,然后将结果元素排序回原来的顺序,最后 Operation2 开始执行。

理想情况下,只要 Operation1 调用返回第一个字符串(即 myStrings[0],而不是返回的第一个),我希望 Operation2 开始它的工作。

所以这是我尝试一般地解决这个问题:

public static class ParallelHelper
{
    public static IEnumerable<U> SelectAsOrdered<T, U>(this ParallelQuery<T> query, Func<T, U> func)
    {
        var completedTasks = new Dictionary<int, U>();
        var queryWithIndexes = query.Select((x, y) => new { Input = x, Index = y })
                                    .AsParallel()
                                    .Select(t => new { Value = func(t.Input), Index = t.Index })
                                    .WithMergeOptions(ParallelMergeOptions.NotBuffered);

        int i = 0;
        foreach (var task in queryWithIndexes)
        {
            if (i==task.Index)
            {
                Console.WriteLine("immediately yielding task: {0}", i);
                i++;
                yield return task.Value;

                U previouslyCompletedTask;
                while (completedTasks.TryGetValue(i, out previouslyCompletedTask))
                {
                    completedTasks.Remove(i);
                    Console.WriteLine("delayed yielding task: {0}", i);
                    yield return previouslyCompletedTask;
                    i++;
                }
            }
            else
            {
                completedTasks.Add(task.Index, task.Value);
            }
        }
        yield break;
    }
}

然后我可以将我的原始代码块重写为:

myStrings.AsParallel()
         .SelectAsOrdered( str => Operation1(str) )
         .Select(str => Operation2(str));

一旦 myStrings[0] 从 Operation1 出来,Operation2 就会启动。

我想知道的是:

  1. 这是并行化中相当常见的问题/模式,我是否错过了在 .Net 框架中开箱即用的东西?或者有没有更简单的方法?
  2. 虽然上述扩展方法似乎可以完成这项工作,但如何改进呢?代码中的任何内容看起来都是个坏主意吗?

谢谢!

安迪

以防万一您感兴趣:

  • 如果没有对 .WithMergeOptions(ParallelMergeOptions.NotBuffered) 的调用,Operation2 直到所有 Operation1 调用都开始后才开始工作(这比等到它们全部完成的原始代码要好)。

  • 现实生活中的问题:
    Operation1 正在大量文本(例如:“children act 1989”)中搜索合法引用和参考。
    这些参考文献通常是独立的,但有时抄本会包含类似“前面提到的法案的第 6 节”之类的内容。Operation2 依靠来自 Operation1 的捕获来获取这些部分引用。

4

1 回答 1

0

如果您需要速度,您可以并行化所有流程(加载数据、准备数据、处理数据和聚合数据),我认为最好使用生产者/消费者模式。

但是,如果您使用“Linq”,则无法并行生成(以一种很好的方式执行完整的并行工作流程)数据(但是可以:准备、处理和恢复)。

另一方面,我认为尝试使用“Linq”作为“并行(A)+顺序(B)”是错误的(你可以,是的),你的过程(我认为)是

B = f(A)

那么,B必须等待A。

为什么不简单地“并行(A/B)”呢?

你可以做一个助手(扩展),但我认为它一般没有用。

在您的实际情况下,只需使用 aSemaphore来防止过早访问“文章 ID”。

并行准备、处理和恢复(无生成)的完整代码是:

class Text {
    public static Regex rx = new Regex(@" (PREVID|ACTID\=([0-9]+)) ");

    private Text prv; // previous article
    private string ot; // original text
    private int id; // act id on text
    private Semaphore isComputed = new Semaphore(0, 1);

    public int ActID {
        get {
            isComputed.WaitOne();
            int _id = id;
            isComputed.Release();
            return _id;
        }
    }

    public bool ProcessText() {
        var mx = rx.Match(ot);
        var prev = mx.Groups [1].Value == "PREVID";
        if(prev)
            id = prv == null ? 0 : prv.ActID;
        else
            if(!int.TryParse(mx.Groups [2].Value, out id))
                throw new Exception(string.Format(@"Incorrect article id ""{0}""", mx.Groups [0].Value));
        isComputed.Release();
        return !prev;
    }

    public Text(string original_text, Text previous) {
        prv = previous;
        ot = original_text;
    }

}

public static void Main(String [] args) {

    // same random stream (for debugging)
    var rnd = new Random(1);

    var noise = @"These references are usually independent, but occasionally";

    // some noise text
    var bit = new Func<string>(() =>
        noise.Substring(0, rnd.Next(noise.Length)));

    // random article
    var text = new Func<string>(() =>
        string.Format(@"{0}{1}{2}", bit(),
            rnd.Next() % 2 == 0 ? " PREVID "
                                : string.Format(@" ACTID={0} ", rnd.Next()), bit()));

    // random data input
    var data = new List<Text>();
    Text prv = null;
    for(var n = 0; n < 1000000; n++)
        // producer / consumer is better to parallelize load data step
        data.Add(prv = new Text(text(), prv));

    Console.Write("Press key to start...");
    Console.ReadKey();

    // parallel processing
    Console.WriteLine("{0} unique ID's", data.AsParallel().Where(n => n.ProcessText()).Count());

    Console.WriteLine("Process completed.");
}

如您所见,ProcessText并行处理所有文章。只有 PREVID 文章会等到其上一篇文章计算自己的 id。

抽象这种行为的真正问题(我认为)是项目关系(一个项目依赖于另一个项目),在 Linq 中,自然的方式是“无项目关系”(您必须使用“分组依据”来执行它)。

我建议你使用生产者/消费者模式。

于 2012-10-23T09:42:57.197 回答