8

我正在编写一些代码来处理大量数据,并且我认为让 Parallel.ForEach 为它创建的每个线程创建一个文件会很有用,因此输出不需要同步(至少由我同步)。

它看起来像这样:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

我期望发生的是最多 8 个文件将被创建,并会在整个运行时持续存在。然后,当整个 ForEach 调用完成时,每个都将被处置。真正发生的是 localInit 似乎为每个项目调用一次,所以我最终得到了数百个文件。在处理的每个项目结束时,作者也会得到处置。

这表明发生了同样的事情:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));

我懂了:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

注意:如果我省略 Thread.Sleep 调用,它有时似乎“正常”运行。对于它决定在我的电脑上使用的 4 个线程,localInit 只被调用一次。然而,并非每次。

这是函数的期望行为吗?导致它这样做的幕后发生了什么?最后,获得所需功能 ThreadLocal 的好方法是什么?

顺便说一下,这是在 .NET 4.5 上的。

4

4 回答 4

8

Parallel.ForEach不像你想象的那样工作。重要的是要注意该方法是建立在Task类之上的,并且 和之间的关系不是 1:1TaskThread。例如,您可以有 10 个任务在 2 个托管线程上运行。

尝试在方法体中使用这一行而不是当前行:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

您应该看到ThreadId将在许多不同的任务中重用,由它们的唯一 ID 显示。如果您离开或增加对 的调用,您会看到更多Thread.Sleep

Parallel.ForEach方法如何工作的(非常)基本思想是,它需要您的枚举创建一系列任务,这些任务将运行枚举的进程部分,完成方式很大程度上取决于输入。还有一些特殊的逻辑可以检查任务超过一定毫秒数而没有完成的情况。如果这种情况属实,那么可能会产生一项新任务来帮助减轻工作量。

如果您查看 中localinit函数的文档Parallel.ForEach,您会注意到它说的是它returns the initial state of the local data for each _task_,而不是每个线程

您可能会问为什么会产生超过 8 个任务。该答案与上一个类似,可在ParallelOptions.MaxDegreeOfParallelism.

从默认值更改仅限制将使用MaxDegreeOfParallelism多少并发任务。

此限制仅针对并发任务的数量,而不是对在整个处理过程中将创建的任务数量的硬限制。正如我上面提到的,有时会产生一个单独的任务,这会导致您的localinit函数被多次调用并将数百个文件写入磁盘。

写入磁盘肯定是一个有一点延迟的操作,尤其是在您使用同步 I/O 时。当磁盘操作发生时,它会阻塞整个线程;同样的情况发生在Thread.Sleep. 如果 aTask这样做,它将阻塞它当前正在运行的线程,并且没有其他任务可以在其上运行。通常在这些情况下,调度程序会产生一个新Task的来帮助弥补松弛。

最后,获得所需功能 ThreadLocal 的好方法是什么?

底线是线程局部变量没有意义,Parallel.ForEach因为您没有处理线程;你正在处理任务。本地线程可以在任务之间共享,因为许多任务可以同时使用同一个线程。此外,任务的本地线程可能会在执行过程中发生变化,因为调度程序可以抢占它的运行,然后在另一个线程上继续执行,该线程将具有不同的本地线程。

我不确定最好的方法,但是您可以依靠该localinit函数来传递您想要的任何资源,一次只允许一个资源在一个线程中使用。您可以使用 将localfinally其标记为不再使用,从而可供其他任务获取。这就是这些方法的设计目的;每个方法仅在每个生成的任务中调用一次(请参阅Parallel.ForEachMSDN 文档的备注部分)。

您也可以自己拆分工作,创建自己的线程集并运行您的工作。但是,在我看来,这不是一个好主意,因为Parallel课程已经为您完成了这项繁重的工作。

于 2013-01-20T09:30:34.280 回答
2

您所看到的是试图让您的工作尽快完成的实现。

为此,它尝试使用不同数量的任务来最大化吞吐量。它从线程池中获取一定数量的线程并运行您的工作一段时间。然后它尝试添加和删除线程以查看会发生什么。它会继续这样做,直到您完成所有工作。

该算法非常愚蠢,因为它不知道您的工作是使用大量 CPU 还是大量 IO,或者即使有很多同步并且线程相互阻塞。它所能做的就是添加和删除线程并测量每个工作单元完成的速度。

这意味着它在注入和退出线程时不断调用你的localInitlocalFinally函数——这就是你所发现的。

不幸的是,没有简单的方法来控制这个算法。Parallel.ForEach是一种高级构造,它有意隐藏了大部分线程管理代码。


使用 a可能会有所帮助,但它依赖于线程池在请求新ThreadLocal线程时将重用相同线程的事实。Parallel.ForEach这不是保证 - 事实上,线程池不太可能在整个调用中使用 8 个线程。这意味着您将再次创建不必要的文件。


可以保证的一件事在任何时候Parallel.ForEach都不会使用超过线程。MaxDegreeOfParallelism

您可以通过创建一个固定大小的文件“池”来利用这一点,这些文件可以由在特定时间运行的任何线程重复使用。您知道MaxDegreeOfParallelism一次只能运行线程,因此您可以在调用ForEach. 然后抓住你的一个localInit并在你的localFinally.

当然,您必须自己编写这个池,并且它必须是线程安全的,因为它将被并发调用。不过,一个简单的锁定策略应该足够好,因为与锁定的成本相比,线程不会很快注入和退出。

于 2013-01-20T09:25:25.453 回答
1

根据MSDN,该localInit方法为每个任务调用一次,而不是每个线程:

localInit 委托对参与循环执行的每个任务调用一次,并返回每个任务的初始本地状态。

于 2013-01-20T09:26:09.687 回答
-1

localInit 在线程创建时调用。如果 body 需要很长时间,它必须创建另一个线程并挂起当前线程,如果它创建另一个线程,它会调用 localInit

同样,当 Parallel.ForEach 调用它时,它会创建与 MaxDegreeOfParallelism 值一样多的线程,例如:

var k = Enumerable.Range(0, 1);
Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....

它在第一次调用时创建了 4 个线程

于 2013-01-20T08:04:44.603 回答