2

我在我的 .NET 4 应用程序中使用并行数据结构,并且ConcurrentQueue在处理它时添加了一个。

我想做类似的事情:

personqueue.AsParallel().WithDegreeOfParallelism(20).ForAll(i => ... );

当我调用数据库来保存数据时,我限制了并发线程的数量。

但是,我希望ForAll不会出队,我担心只是做

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

因为不能保证我会弹出正确的。

那么,我如何以并行方式遍历集合和出列。

PLINQ或者,并行执行此处理会更好吗?

4

2 回答 2

5

好吧,我不是 100% 确定您尝试在此处存档的内容。您是否试图将所有项目出列,直到什么都没有?或者只是一次出列大量项目?

第一个可能出乎意料的行为始于以下语句:

 theQueue.AsParallel()

对于 ConcurrentQueue,您将获得一个“快照”枚举器。因此,当您迭代并发堆栈时,您只迭代快照,而不是“实时”队列。

一般来说,我认为迭代你在迭代过程中改变的东西不是一个好主意。

所以另一个解决方案看起来像这样:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

这避免了在迭代某些东西时对其进行操作。但是,在该语句之后,队列仍然可以包含在 for 循环期间添加的数据。

要让队列暂时清空,您可能需要做更多的工作。这是一个非常丑陋的解决方案。当队列仍有项目时,创建新任务。只要可以,每个任务都开始从队列中出列。最后,我们等待所有任务结束。为了限制并行性,我们从不创建超过 20 个任务。

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);
于 2010-06-10T15:37:54.600 回答
0

如果您的目标是高吞吐量的真实站点并且您不必立即进行数据库更新,那么您最好选择非常保守的解决方案而不是额外的层库。

制作固定大小的数组(估计大小 - 比如说 1000 个项目或 N 秒的请求)和互锁索引,以便请求只需将数据放入插槽并返回。当一个块被填满时(继续检查计数),创建另一个块并生成异步委托来处理并将刚刚填满的块发送到 SQL。根据您的数据结构,委托可以将所有数据打包到逗号分隔的数组中,甚至可能是一个简单的 XML(当然要测试那个的性能)并将它们发送到 SQL sproc,它应该最好处理它们记录根据记录 - 从不持有大锁。如果它变得很重,你可以将你的块分成几个更小的块。关键是您最大限度地减少了对 SQL 的请求数量,始终保持一定程度的分离并且没有

这将比摆弄 Parallel-s 快得多。

于 2010-08-27T13:52:15.070 回答