我的程序中使用了一个Parallel.ForEach()
语句。它使用一些对象的列表作为输入。我不关心输出顺序,但我需要这个循环以与输入列表中相同的顺序获取输入元素。是否有可能实现这一目标Parallel.ForEach()
?
问问题
1844 次
3 回答
3
如果您需要保留IEnumerable<T>
您的. 此类的示例代码包括一个简单的示例,该示例以递增的顺序从枚举中一次检索一个。OrderablePartitioner<T>
然而,这可能是一个简单的生产者 - 消费者模型,例如ConcurrentQueue<T>
:
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
X x;
while (queue.TryDequeue(out x))
{
x.Frob();
}
};
// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
使用此代码,您将得到保证先进先出的行为,并且您的请求最终几乎按照收到的顺序“在飞行中”进行处理。一旦并行放置,您将无法保证它们保持按顺序排列。
或者,您可以使用以下内容(限制队列项目的数量保持固定):
// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
queue,
_ =>
{
X x;
if (queue.TryDequeue(out x))
{
x.Frob();
}
});
如果您想继续在一个线程中生产,并在其他线程中使用,请使用BlockingCollection<T>
带有队列的 a 作为其支持集合:
var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());
// add to it
Task.Factory.StartNew( () =>
{
foreach (var x in yourEnumerableOfX)
{
queue.Add(x);
Thread.Sleep(200);
}
// Signal to our consumers we're done:
queue.CompleteAdding();
});
现在我们需要“无界”消费者,因为我们不确定到底有多少队列项可能存在:
// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
while (!queue.IsCompleted)
{
X x;
try
{
// blocking form, switch to TryTake and maybe Thread.Sleep()
x = queue.Take();
}
catch (InvalidOperationException)
{
// none left
break;
}
x.Frob();
}
};
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);
于 2013-09-10T14:41:28.510 回答
2
此功能不存在,因为 OS 调度程序可以暂停第 10 项的执行并将第 11 项放在 CPU 内核上,以便 11 在 10 之前执行。没有库可以抵消这一点。任何工作项都可以随时无限期暂停。
你可以得到大概的排序。请参阅其他答案。
于 2013-09-10T14:57:42.723 回答
0
不,根据文档,所有并行迭代器(基本上是 for 和 foreach)不保证任何订单处理项目,它总是不可预测的
并行循环的第二段msdn 文档
于 2013-09-10T13:21:50.793 回答