1

试图将 ThreadPool.QueueUserWorkItem 重构为 TPL,老实说,我也不太了解(还)。

在现实生活中,这将是一个 I/O 操作。在 PartitionKey 上查询 Azure 表存储,以便它应该有效地并行。

请参阅 GetAllTPL() 方法,因为它具有我编写的简单 TPL。它通过了这个非常简单的测试用例。TPL 是错误的,我只是不知道吗?我可以做得更好吗?当它是一个实际的表查询时,任何可能使这个简单的测试用例失败的东西?

对于这个非常有限的测试用例,ThreadPool.QueueUserWorkItem 和 TPL 都产生了相同的正确答案。将秒表放在更大(但仍然简化)的测试用例上,TPL 的速度仅比 ThreadPool.QueueUserWorkItem 快两倍。TPL 似乎以 3 人一组排队,而 ThreadPool.QueueUserWorkItem 以 2 人一组排队,仅在具有超线程的 P4 上运行。由于 thread.sleep 不是真正的工作,它并不能告诉我很多。

ThreadPool.QueueUserWorkItem 来自 .NET 3.5 代码示例(TPL 之前)。我很确定 TPL 有更好的选择,但我是 TPL 的新手。谢谢

using System.Threading;
using System.Threading.Tasks;

namespace WaitForConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            string[] partitionKey = new string[] { "one", "two", "three", "four" };
            IEnumerable<string> commonRowKeys = GetAllQWI(partitionKey);          
            foreach (string commonRowKey in commonRowKeys) Console.WriteLine(commonRowKey);
            Console.ReadLine();
            commonRowKeys = GetAllTPL(partitionKey);
            foreach (string commonRowKey in commonRowKeys) Console.WriteLine(commonRowKey);
            Console.ReadLine();
        }

        public static IEnumerable<string> GetAllQWI(string[] partitionKey)
        {
            // this a a code sample from .NET 3.5 and it runs on 4.0
            IEnumerable<string> finalResults = null;
            ManualResetEvent[] resetEvents = new ManualResetEvent[partitionKey.Length];
            HashSet<string>[] rowKeys = new HashSet<string>[partitionKey.Length];
            for (int i = 0; i < rowKeys.Length; i++)
            {
                resetEvents[i] = new ManualResetEvent(false);
                ThreadPool.QueueUserWorkItem(new WaitCallback((object index) =>
                {
                    Console.WriteLine("GetAllQWI " + ((int)index).ToString());
                    rowKeys[(int)index] = TableQueryGetRowKeys(partitionKey[(int)index]);
                    resetEvents[(int)index].Set();
                }), i);
            }
            try
            {
                WaitHandle.WaitAll(resetEvents);
                Console.WriteLine("WaitAll done");
                finalResults = (IEnumerable<string>)rowKeys[0];
                foreach (var thisRowKeys in rowKeys)
                {
                    finalResults = finalResults.Intersect(thisRowKeys);
                }

            }
            catch (Exception ex)
            {
                Console.WriteLine("WaitAll ex " + ex.Message);
            }
            return finalResults;
        }

        public static IEnumerable<string> GetAllTPL(string[] partitionKey)
        {
            // this is the conversion of the ThreadPool.QueueUserWorkItem to TPL 
            // seems to be working but is this optimal
            IEnumerable<string> finalResults = null;
            HashSet<string>[] rowKeys = new HashSet<string>[partitionKey.Length];
            //  how to do this in TPL
            Parallel.For(0, partitionKey.Length, i =>
            {
                Console.WriteLine("GetAllTPL " + i.ToString());
                rowKeys[i] = TableQueryGetRowKeys(partitionKey[i]);
            }); 
            //  Do I need to do anything special to wait for all the tasks to finish?
            //  Interesting that i is not necessarily in order but it does not need to be
            finalResults = (IEnumerable<string>)rowKeys[0];
            foreach (var thisRowKeys in rowKeys)
            {
                finalResults = finalResults.Intersect(thisRowKeys);
            }
            return finalResults;
        }

        public static HashSet<string> TableQueryGetRowKeys(string partitionKey)
        {
            // in real life this is an Azure table query to get all rowKeys for a partitionKey
            Thread.Sleep(10000);
            if (DateTime.Now.Millisecond % 2 == 0)
            {
                return new HashSet<string> { "alph", "beta", "gamma", "delta" };
            }
            else
            {
               return new HashSet<string> { "beta", "gamma", "delta", "epsilon" };
            }
        }
    }
}

花了几个小时才走到这一步。如果 TPL 会这样做,那么我将更多地了解 TPL 而不是学习 ThreadPool.QueueUserWorkItem。TPL 太简单了,这让我有点害怕。只是想检查一下我没有遗漏什么。我从一个计算密集型样本中得到了这个 TPL。

关于 TPL 和 Azure 的不规则结果的一篇关于 SO 的帖子,但最终无法重现。 Parallel.ForEach 能否与 CloudTableQuery 一起安全使用

Azure 团队的另一篇文章指出 TPL 和 Azure 没问题 Windows Azure:代码并行化

4

0 回答 0