我正在编写一些代码来处理大量数据,并且我认为让 Parallel.ForEach 为它创建的每个线程创建一个文件会很有用,因此输出不需要同步(至少由我同步)。
它看起来像这样:
Parallel.ForEach(vals,
new ParallelOptions { MaxDegreeOfParallelism = 8 },
()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
(item, state, writer)=>
{
if(something)
{
state.Break();
return writer;
}
List<Result> results = new List<Result>();
foreach(var subItem in item.SubItems)
results.Add(ProcessItem(subItem));
if(results.Count > 0)
{
foreach(var result in results)
result.Write(writer);
}
return writer;
},
(writer)=>writer.Dispose());
我期望发生的是最多 8 个文件将被创建,并会在整个运行时持续存在。然后,当整个 ForEach 调用完成时,每个都将被处置。真正发生的是 localInit 似乎为每个项目调用一次,所以我最终得到了数百个文件。在处理的每个项目结束时,作者也会得到处置。
这表明发生了同样的事情:
var vals = Enumerable.Range(0, 10000000).ToArray();
long sum = 0;
Parallel.ForEach(vals,
new ParallelOptions { MaxDegreeOfParallelism = 8 },
() => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
(i, state, common) =>
{
Thread.Sleep(10);
return common + i;
},
(common) => Interlocked.Add(ref sum, common));
我懂了:
init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18
注意:如果我省略 Thread.Sleep 调用,它有时似乎“正常”运行。对于它决定在我的电脑上使用的 4 个线程,localInit 只被调用一次。然而,并非每次。
这是函数的期望行为吗?导致它这样做的幕后发生了什么?最后,获得所需功能 ThreadLocal 的好方法是什么?
顺便说一下,这是在 .NET 4.5 上的。