5

我意识到,当我尝试使用多个线程处理并发队列中的项目而多个线程可以将项目放入其中时,理想的解决方案是将 Reactive Extensions 与 Concurrent 数据结构一起使用。

我原来的问题是:

在使用 ConcurrentQueue 时,尝试在并行循环时出队

所以我很好奇是否有任何方法可以让 LINQ(或 PLINQ)查询随着项目的放入而不断出队。

我试图让它以这样一种方式工作,即我可以让 n 个生产者推入队列并处理有限数量的线程,这样我就不会使数据库过载。

如果我可以使用 Rx 框架,那么我希望我可以启动它,如果在 100 毫秒内放入 100 个项目,那么作为 PLINQ 查询一部分的 20 个线程将通过队列进行处理。

我正在尝试使用三种技术:

  1. Rx 框架(反应式 LINQ)
  2. PLING
  3. System.Collections.Concurrent 结构
4

2 回答 2

6

Drew 是对的,我认为 ConcurrentQueue 尽管听起来很适合这项工作,但实际上是 BlockingCollection 使用的底层数据结构。对我来说似乎也很前后。查看本书的第 7 章* http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 ,它将解释如何使用 BlockingCollection 并让多个生产者和多个消费者各自离开“队列”。您将需要查看“GetConsumingEnumerable()”方法,并可能只调用 .ToObservable() 方法。

*本书的其余部分相当平均。

编辑:

这是一个示例程序,我认为它可以满足您的需求?

class Program
{
    private static ManualResetEvent _mre = new ManualResetEvent(false);
    static void Main(string[] args)
    {
        var theQueue = new BlockingCollection<string>();
        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000));

        theQueue.GetConsumingEnumerable()
            .ToObservable(Scheduler.TaskPool)
            .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000));


        LoadQueue(theQueue, "Producer A");
        LoadQueue(theQueue, "Producer B");
        LoadQueue(theQueue, "Producer C");

        _mre.Set();

        Console.WriteLine("Processing now....");

        Console.ReadLine();
    }

    private static void ProcessNewValue(string value, string consumerName, int delay)
    {
        Thread.SpinWait(delay);
        Console.WriteLine("{1} consuming {0}", value, consumerName);
    }

    private static void LoadQueue(BlockingCollection<string> target, string prefix)
    {
        var thread = new Thread(() =>
                                    {
                                        _mre.WaitOne();
                                        for (int i = 0; i < 100; i++)
                                        {
                                            target.Add(string.Format("{0} {1}", prefix, i));
                                        }
                                    });
        thread.Start();
    }
}
于 2011-01-06T13:18:57.720 回答
3

我不知道如何最好地使用 Rx 来实现这一点,但我建议只使用BlockingCollection<T>and the producer-consumer pattern。您的主线程将项目添加到集合中,ConcurrentQueue<T>默认情况下在下面使用。然后你有一个单独Task的,你在它之前使用Parallel::ForEachBlockingCollection<T>来处理集合中尽可能多的项目,因为它对系统同时有意义。现在,您可能还想考虑使用GetConsumingPartitionerParallelExtensions 库的方法以提高效率,因为在这种情况下,默认分区程序会产生比您想要的更多的开销。您可以从这篇博文中了解更多相关信息。

当主线程完成时,您调用CompleteAdding 并您启动以等待所有消费者完成处理集合中的所有项目。BlockingCollection<T>Task::WaitTask

于 2010-12-01T05:26:49.890 回答