2

我的程序中使用了一个Parallel.ForEach()语句。它使用一些对象的列表作为输入。我不关心输出顺序,但我需要这个循环以与输入列表中相同的顺序获取输入元素。是否有可能实现这一目标Parallel.ForEach()

4

3 回答 3

3

如果您需要保留IEnumerable<T>您的. 此类的示例代码包括一个简单的示例,该示例以递增的顺序从枚举中一次检索一个。OrderablePartitioner<T>

然而,这可能是一个简单的生产者 - 消费者模型,例如ConcurrentQueue<T>

var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
    X x;
    while (queue.TryDequeue(out x))
    {
        x.Frob();
    }
};

// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

使用此代码,您将得到保证先进先出的行为,并且您的请求最终几乎按照收到的顺序“在飞行中”进行处理。一旦并行放置,您将无法保证它们保持按顺序排列。

或者,您可以使用以下内容(限制队列项目的数量保持固定):

// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
    queue,
    _ =>
    {
        X x;
        if (queue.TryDequeue(out x))
        {
            x.Frob();
        }
    });

如果您想继续在一个线程中生产,并在其他线程中使用,请使用BlockingCollection<T>带有队列的 a 作为其支持集合:

var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());

// add to it
Task.Factory.StartNew( () =>
    {
         foreach (var x in yourEnumerableOfX)
         {
             queue.Add(x);
             Thread.Sleep(200);
         }

         // Signal to our consumers we're done:
         queue.CompleteAdding();
    });

现在我们需要“无界”消费者,因为我们不确定到底有多少队列项可能存在:

// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
    while (!queue.IsCompleted)
    {
        X x;
        try
        {
            // blocking form, switch to TryTake and maybe Thread.Sleep()
            x = queue.Take();
        }
        catch (InvalidOperationException)
        {
            // none left
            break;
        }

        x.Frob();
    }
};

int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
于 2013-09-10T14:41:28.510 回答
2

此功能不存在,因为 OS 调度程序可以暂停第 10 项的执行并将第 11 项放在 CPU 内核上,以便 11 在 10 之前执行。没有库可以抵消这一点。任何工作项都可以随时无限期暂停。

你可以得到大概的排序。请参阅其他答案。

于 2013-09-10T14:57:42.723 回答
0

不,根据文档,所有并行迭代器(基本上是 for 和 foreach)不保证任何订单处理项目,它总是不可预测的

并行循环的第二段msdn 文档

于 2013-09-10T13:21:50.793 回答