试图将 ThreadPool.QueueUserWorkItem 重构为 TPL,老实说,我也不太了解(还)。
在现实生活中,这将是一个 I/O 操作。在 PartitionKey 上查询 Azure 表存储,以便它应该有效地并行。
请参阅 GetAllTPL() 方法,因为它具有我编写的简单 TPL。它通过了这个非常简单的测试用例。TPL 是错误的,我只是不知道吗?我可以做得更好吗?当它是一个实际的表查询时,任何可能使这个简单的测试用例失败的东西?
对于这个非常有限的测试用例,ThreadPool.QueueUserWorkItem 和 TPL 都产生了相同的正确答案。将秒表放在更大(但仍然简化)的测试用例上,TPL 的速度仅比 ThreadPool.QueueUserWorkItem 快两倍。TPL 似乎以 3 人一组排队,而 ThreadPool.QueueUserWorkItem 以 2 人一组排队,仅在具有超线程的 P4 上运行。由于 thread.sleep 不是真正的工作,它并不能告诉我很多。
ThreadPool.QueueUserWorkItem 来自 .NET 3.5 代码示例(TPL 之前)。我很确定 TPL 有更好的选择,但我是 TPL 的新手。谢谢
using System.Threading;
using System.Threading.Tasks;
namespace WaitForConsole
{
class Program
{
static void Main(string[] args)
{
string[] partitionKey = new string[] { "one", "two", "three", "four" };
IEnumerable<string> commonRowKeys = GetAllQWI(partitionKey);
foreach (string commonRowKey in commonRowKeys) Console.WriteLine(commonRowKey);
Console.ReadLine();
commonRowKeys = GetAllTPL(partitionKey);
foreach (string commonRowKey in commonRowKeys) Console.WriteLine(commonRowKey);
Console.ReadLine();
}
public static IEnumerable<string> GetAllQWI(string[] partitionKey)
{
// this a a code sample from .NET 3.5 and it runs on 4.0
IEnumerable<string> finalResults = null;
ManualResetEvent[] resetEvents = new ManualResetEvent[partitionKey.Length];
HashSet<string>[] rowKeys = new HashSet<string>[partitionKey.Length];
for (int i = 0; i < rowKeys.Length; i++)
{
resetEvents[i] = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(new WaitCallback((object index) =>
{
Console.WriteLine("GetAllQWI " + ((int)index).ToString());
rowKeys[(int)index] = TableQueryGetRowKeys(partitionKey[(int)index]);
resetEvents[(int)index].Set();
}), i);
}
try
{
WaitHandle.WaitAll(resetEvents);
Console.WriteLine("WaitAll done");
finalResults = (IEnumerable<string>)rowKeys[0];
foreach (var thisRowKeys in rowKeys)
{
finalResults = finalResults.Intersect(thisRowKeys);
}
}
catch (Exception ex)
{
Console.WriteLine("WaitAll ex " + ex.Message);
}
return finalResults;
}
public static IEnumerable<string> GetAllTPL(string[] partitionKey)
{
// this is the conversion of the ThreadPool.QueueUserWorkItem to TPL
// seems to be working but is this optimal
IEnumerable<string> finalResults = null;
HashSet<string>[] rowKeys = new HashSet<string>[partitionKey.Length];
// how to do this in TPL
Parallel.For(0, partitionKey.Length, i =>
{
Console.WriteLine("GetAllTPL " + i.ToString());
rowKeys[i] = TableQueryGetRowKeys(partitionKey[i]);
});
// Do I need to do anything special to wait for all the tasks to finish?
// Interesting that i is not necessarily in order but it does not need to be
finalResults = (IEnumerable<string>)rowKeys[0];
foreach (var thisRowKeys in rowKeys)
{
finalResults = finalResults.Intersect(thisRowKeys);
}
return finalResults;
}
public static HashSet<string> TableQueryGetRowKeys(string partitionKey)
{
// in real life this is an Azure table query to get all rowKeys for a partitionKey
Thread.Sleep(10000);
if (DateTime.Now.Millisecond % 2 == 0)
{
return new HashSet<string> { "alph", "beta", "gamma", "delta" };
}
else
{
return new HashSet<string> { "beta", "gamma", "delta", "epsilon" };
}
}
}
}
花了几个小时才走到这一步。如果 TPL 会这样做,那么我将更多地了解 TPL 而不是学习 ThreadPool.QueueUserWorkItem。TPL 太简单了,这让我有点害怕。只是想检查一下我没有遗漏什么。我从一个计算密集型样本中得到了这个 TPL。
关于 TPL 和 Azure 的不规则结果的一篇关于 SO 的帖子,但最终无法重现。 Parallel.ForEach 能否与 CloudTableQuery 一起安全使用
Azure 团队的另一篇文章指出 TPL 和 Azure 没问题 Windows Azure:代码并行化