1

下面的代码继续创建线程,即使队列为空..直到最终发生 OutOfMemory 异常。如果我用常规 foreach 替换 Parallel.ForEach,则不会发生这种情况。有人知道为什么会发生这种情况吗?

public delegate void DataChangedDelegate(DataItem obj);

public class Consumer
{
    public DataChangedDelegate OnCustomerChanged;
    public DataChangedDelegate OnOrdersChanged;

    private CancellationTokenSource cts;
    private CancellationToken ct;
    private BlockingCollection<DataItem> queue;

    public Consumer(BlockingCollection<DataItem> queue) {
        this.queue = queue;
        Start();
    }

    private void Start() {
        cts = new CancellationTokenSource();
        ct = cts.Token;
        Task.Factory.StartNew(() => DoWork(), ct);
    }

    private void DoWork() {

        Parallel.ForEach(queue.GetConsumingPartitioner(), item => {
            if (item.DataType == DataTypes.Customer) {
                OnCustomerChanged(item);
            } else if(item.DataType == DataTypes.Order) {
                OnOrdersChanged(item);
            }
        });
    }
}
4

3 回答 3

5

我认为Parallel.ForEach()主要用于处理有界集合。并且它不会期望像由返回的集合那样GetConsumingPartitioner()阻塞MoveNext()很长时间。

问题是它Parallel.ForEach()试图找到最好的并行度,所以它开始运行的次数TaskTaskScheduler允许运行的次数一样多。但是TaskScheduler看到有很多Tasks 需要很长时间才能完成,并且他们没有做任何事情(他们阻止)所以它继续开始新的。

我认为最好的解决方案是设置MaxDegreeOfParallelism.

作为替代方案,您可以使用 TPL Dataflow 的ActionBlock. 这种情况下的主要区别在于,ActionBlock当没有要处理的项目时不会阻塞任何线程,因此线程数不会接近限制。

于 2012-06-22T07:01:23.497 回答
3

生产者/消费者模式主要用于只有一个生产者和一个消费者的情况。

但是,您想要实现的目标(多个消费者)更符合 Worklist 模式。以下代码取自犹他大学教授的并行编程课程中 unit2 幻灯片“2c - 共享内存模式”的幻灯片,可在http://ppcp.codeplex.com/下载

BlockingCollection<Item> workList;
CancellationTokenSource cts;
int itemcount

public void Run()
{
  int num_workers = 4;

  //create worklist, filled with initial work
  worklist = new BlockingCollection<Item>(
    new ConcurrentQueue<Item>(GetInitialWork()));

  cts = new CancellationTokenSource();
  itemcount = worklist.Count();

  for( int i = 0; i < num_workers; i++)
    Task.Factory.StartNew( RunWorker );
}

IEnumberable<Item> GetInitialWork() { ... }

public void RunWorker() {
  try  {
    do {
      Item i = worklist.Take( cts.Token );
      //blocks until item available or cancelled
          Process(i);
      //exit loop if no more items left
    } while (Interlocked.Decrement( ref itemcount) > 0);
  } finally {
      if( ! cts.IsCancellationRequested )
        cts.Cancel();
    }
  }
}

public void AddWork( Item item) {
  Interlocked.Increment( ref itemcount );
  worklist.Add(item);
}

public void Process( Item i ) 
{
  //Do what you want to the work item here.
}

前面的代码允许您将工作列表项添加到队列中,并允许您设置任意数量的工作人员(在本例中为四个)以将项目从队列中拉出并处理它们。

.Net 4.0 上的 Parallelism 的另一个重要资源是“使用 Microsoft .Net 进行并行编程”一书,可在以下网址免费获得:http: //msdn.microsoft.com/en-us/library/ff963553

于 2012-06-23T01:08:38.263 回答
1

在任务并行库的内部,Parallel.For 和 Parallel.Foreach 遵循爬山算法来确定操作应该使用多少并行度。

或多或少,他们从在一项任务上运行主体开始,移动到两项,依此类推,直到达到断点,他们需要减少任务的数量。

这对于快速完成的方法体非常有效,但如果方法体需要很长时间才能运行,它可能需要很长时间才能意识到它需要减少并行度。在那之前,它会继续添加任务,并可能导致计算机崩溃。

我是在 Task Parallel Library 的一位开发人员的讲座中了解到以上内容的。

指定 MaxDegreeOfParallelism 可能是最简单的方法。

于 2012-06-22T23:59:56.097 回答