1

我刚刚开始使用任务并行库。所讨论的任务是以尽可能并行的方式处理结果,但要保持结果的顺序。

此外,可以随时添加一个项目,直到设置标志表示不再接受更多项目。

此外,一旦完成所有结果,将需要通知一些客户(只有在没有更多项目被接受时才会发生这种情况)。

我想出了下面的简化示例,它似乎在我的所有测试中都运行良好。

class Program
{
    static void Main(string[] args)
    {
        for (int i = 1; i < random.Next(5, 21); ++i)
        {
            AddItem(i);
        }

        finishedAddingItems = true;

        completion.Task.Wait();
        Console.WriteLine("Finished");
        Console.ReadKey();
    }

    static TaskCompletionSource<bool> completion = 
                            new TaskCompletionSource<bool>();

    static bool finishedAddingItems = false;

    static Random random = new Random();

    class QueueResult
    {
        public int data;
        public int IsFinished;
    }

    static ConcurrentQueue<QueueResult> queue = 
                           new ConcurrentQueue<QueueResult>();

    static object orderingLockObject = new object();

    static void AddItem(int i)
    {
        var queueItem = new QueueResult { data = i, IsFinished = 0 };

        queue.Enqueue(queueItem);

        Task.Factory
            .StartNew(() => 
            { 
                for (int busy = 0; 
                     busy <= random.Next(9000000, 90000000); 
                     ++busy) 
                { }; 
                Interlocked.Increment(ref queueItem.IsFinished); 
            })
            .ContinueWith(t =>
            {
                QueueResult result;

                //the if check outside the lock is to avoid tying up resources
                //needlessly, since only one continuation can actually process
                //the queue at a time.
                if (queue.TryPeek(out result) 
                    && result.IsFinished == 1)
                {
                    lock (orderingLockObject)
                    {
                        while (queue.TryPeek(out result) 
                               && result.IsFinished == 1)
                        {
                            Console.WriteLine(result.data);
                            queue.TryDequeue(out result);
                        }

                        if (finishedAddingItems && queue.Count == 0)
                        {
                            completion.SetResult(true);
                        }
                    }
                }
            });
    }
}

但是,我无法说服自己是否存在潜在的竞争条件,即项目可能无法得到处理?

4

1 回答 1

2

我认为您的代码可能无法正常工作,因为您没有声明IsFinishedvolatile并且您直接在锁之外访问它。无论如何,正确使用双重检查锁定是很困难的,所以除非你真的必须这样做,否则你不应该这样做。

此外,您的代码也很混乱(将所有内容都放在一个类中,使用int而不是bool,不必要的ContinueWith(),......)并且至少包含一个线程安全问题(Random不是线程安全的)。

因此,我建议您了解 TPL 更高级的部分。在您的情况下,PLINQ 听起来像是正确的解决方案:

var source = Enumerable.Range(1, random.Next(5, 21)); // or some other collection

var results = source.AsParallel()
                    .AsOrdered()
                    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
                    .Select(i => /* perform your work here */);

foreach (int i in results)
    Console.WriteLine(i);
于 2013-01-26T18:57:18.657 回答