2

是否有一种模式可以将并行与并行上的线程安全计算相结合?

需要计算一个结果,其中第一步将受益于并行,第二个是对并行结果的串行过程。

一种选择是并行运行并将输出保存到一个集合中,然后串行处理该集合,我就可以工作了。存在内存管理的问题,因为集合可能非常大。

下面是串行版本。基本上我想并行 TableQueryGetRowKeys 并以线程安全的方式使用该结果。尝试仅并行化 for 并锁定最终结果,但 rowKeys 可能已关闭。尝试过聚合,但我无法弄清楚如何将集合传递给聚合,更不用说在聚合中执行线程安全的相交了。

IEnumerable<string> finalResults = null;
if (partitionKey.Length == 0) return finalResults;
object lockObject = new object();
finalResults = TableQueryGetRowKeys(partitionKey[0], 0);
HashSet<string> rowKeys;
for(int i = 1; i < partitionKey.Length; i++)
{
    // IO operation to Azure Table Storage against the PartitionKey
    // so very amenable to parallel
    rowKeys = TableQueryGetRowKeys(partitionKey[i]);
    // a memory and CPU operation 
    // this should be much faster than TableQueryGetRowKeys
    // going parallel and wrapping this in a lock did not properly synch rowKeys
    finalResults = finalResults.Intersect(rowKeys); 
}
return finalResults;
4

1 回答 1

2

假设这TableQueryGetRowKeys是线程安全的:

var final = partitionKey.AsParallel()
                        // By returning AsParallel we can get parallel intersect
                        .Select(k => TableQueryGetRowKeys(k).AsParallel())
                        .Aggregate((x, y) => x.Intersect(y));

// Using fake-ish data I see about a 30% speed-up on a 4-core machine:
// static HashSet<string> TableQueryGetRowKeys(string prefix)
// {
//     // Simulate 1s of IO round-trip
//     if (useSleep) Thread.Sleep(1000);
//
//     return new HashSet<string>(
//         Enumerable.Range(0, 500)
//                   .Select(_ => random.Value.Next(0, 500).ToString()));
// }

该算法以逐步方式工作,如下所示:

  1. partitionKey.AsParallel()将常规IEnumerable<string>转换为ParallelQuery<string>允许并行处理序列的 a。
  2. 接下来,ParallelEnumerable.Select用于TableQueryGetRowKeys并行调用。
  3. 然后将每次调用的结果TableQueryGetRowKeys包装在ParallelQuery<T>using 中AsParallel()
  4. ParallelEnumerable.Intersect用作由 . 返回的每个“并行启用”枚举的聚合函数TableQueryGetRowKeys

实际上,这可以通过删除调用以串行方式替换您以前的代码,如下所示:AsParallel

var serialEquivalent = partitionKey.Select(k => TableQueryGetRowKeys(k))
                                   .Aggregate((x,y) => x.Intersect(y));

当您查看实现的肉和土豆时,您可以“说服”自己这等同于您的方法:

IEnumerable<string> results = SomeMethod(0);
for (int ii = 1; ii < count; ++ii)
{
    results = results.Intersect(SomeMethod(ii));
}

+使用而不是重写上述内容Intersect

int results = SomeMethod(0);
for (int ii = 1; ii < count; ++ii)
{
    results = results + SomeMethod(ii);
}

现在很明显,它Intersect可以用来代替其他更“常见”的聚合函数(例如数学运算符)。

于 2012-07-06T17:04:52.820 回答