73

我正在尝试将图像存储在数据库中的数据库迁移到数据库中指向硬盘驱动器上文件的记录。我试图使用这种方法Parallel.ForEach来加快进程来查询数据。

但是,我注意到我遇到了OutOfMemory异常。我知道Parallel.ForEach如果有一个用于间隔查询的枚举将查询一批可枚举以减轻开销成本(因此,如果您一次执行一堆查询而不是间隔它们,您的源更有可能将下一条记录缓存在内存中出去)。问题是由于我返回的记录之一是一个 1-4Mb 字节数组,缓存导致整个地址空间被用完(程序必须在 x86 模式下运行,因为目标平台将是 32 位机器)

有什么方法可以禁用缓存或使 TPL 更小?


这是一个显示问题的示例程序。这必须在 x86 模式下编译以显示问题,如果它需要很长时间或在您的机器上没有发生增加阵列的大小(我发现1 << 20在我的机器上大约需要 30 秒并且4 << 20几乎是瞬时的)

class Program
{

    static void Main(string[] args)
    {
        Parallel.ForEach(CreateData(), (data) =>
            {
                data[0] = 1;
            });
    }

    static IEnumerable<byte[]> CreateData()
    {
        while (true)
        {
            yield return new byte[1 << 20]; //1Mb array
        }
    }
}
4

4 回答 4

106

仅当任务受 CPU 限制且线性扩展时,默认选项Parallel.ForEach 才能正常工作。当任务受 CPU 限制时,一切正常。如果您有一个四核并且没有其他进程在运行,则Parallel.ForEach使用所有四个处理器。如果您有一个四核并且您的计算机上的某些其他进程正在使用一个完整的 CPU,那么Parallel.ForEach大约使用三个处理器。

但如果任务不受 CPU 限制,则Parallel.ForEach继续启动任务,努力使所有 CPU 保持忙碌。然而,无论有多少任务并行运行,总是有更多未使用的 CPU 马力,因此它不断创建任务。

如何判断您的任务是否受 CPU 限制?希望只是通过检查它。如果您要分解素数,那是显而易见的。但其他情况并不那么明显。判断您的任务是否受 CPU 限制的经验方法是限制最大并行度ParallelOptions.MaximumDegreeOfParallelism并观察程序的行为方式。如果您的任务受 CPU 限制,那么您应该在四核系统上看到类似这样的模式:

  • ParallelOptions.MaximumDegreeOfParallelism = 1:使用一个完整的 CPU 或 25% 的 CPU 利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 2: 使用两个 CPU 或 50% CPU 利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 4:使用所有 CPU 或 100% CPU 利用率

如果它表现得像这样,那么您可以使用默认Parallel.ForEach选项并获得良好的结果。线性 CPU 利用率意味着良好的任务调度。

但是如果我在我的 Intel i7 上运行你的示例应用程序,无论我设置的最大并行度是多少,我都会得到大约 20% 的 CPU 利用率。为什么是这样?分配了太多内存,以至于垃圾收集器阻塞了线程。应用程序是资源绑定的,资源是内存。

同样,对数据库服务器执行长时间运行查询的 I/O 密集型任务也永远无法有效利用本地计算机上所有可用的 CPU 资源。在这种情况下,任务调度程序无法“知道何时停止”启动新任务。

如果您的任务不受 CPU 限制,或者 CPU 利用率没有随着最大并行度线性扩展,那么您应该建议Parallel.ForEach不要一次启动太多任务。最简单的方法是指定一个数字,该数字允许重叠 I/O 绑定任务的一些并行性,但不要太多,以免压倒本地计算机对资源的需求或使任何远程服务器负担过重。需要反复试验才能获得最佳结果:

static void Main(string[] args)
{
    Parallel.ForEach(CreateData(),
        new ParallelOptions { MaxDegreeOfParallelism = 4 },
        (data) =>
            {
                data[0] = 1;
            });
}
于 2011-08-08T04:08:56.803 回答
47

因此,尽管 Rick 的建议绝对是重要的一点,但我认为缺少的另一件事是对partitioning的讨论。

Parallel::ForEach将使用默认Partitioner<T>实现,对于IEnumerable<T>没有已知长度的,将使用块分区策略。这意味着Parallel::ForEach将用于处理数据集的每个工作线程将从其中读取一些元素,IEnumerable<T>然后仅由该线程处理(暂时忽略工作窃取)。它这样做是为了节省不断返回源并分配一些新工作并将其安排给另一个工作线程的费用。因此,通常,这是一件好事。但是,在您的特定场景中,假设您在一个四核上,并且您MaxDegreeOfParallelism为工作设置了 4 个线程,现在每个线程都从您的IEnumerable<T>. 好吧,对于那个特定的工作线程来说,那是 100-400 兆,对吧?

那么你如何解决这个问题呢?很简单,你编写一个自定义Partitioner<T>实现。现在,分块在您的情况下仍然有用,因此您可能不想使用单个元素分区策略,因为这样您将引入开销以及所有必要的任务协调。相反,我会编写一个可配置的版本,您可以通过应用设置对其进行调整,直到找到适合您的工作负载的最佳平衡点。好消息是,虽然编写这样的实现非常简单,但实际上您甚至不必自己编写它,因为 PFX 团队已经完成了它并将其放入并行编程示例项目中。

于 2011-08-08T14:52:20.100 回答
15

这个问题与分区器有关,与并行度无关。解决方案是实现自定义数据分区器。

如果数据集很大,似乎TPL的单声道实现肯定会耗尽内存。这发生在我最近(本质上我在运行上面的循环,发现内存线性增加,直到它给了我一个OOM异常)。

跟踪问题后,我发现默认情况下,mono 将使用 EnumerablePartitioner 类划分枚举器。这个类有一个行为,每次它向任务提供数据时,它都会以不断增加(且不可更改)的因子 2 来“分块”数据。因此,当任务第一次请求数据时,它会获得一大块大小1、下一次大小2*1=2,下一次2*2=4,然后2*4=8,以此类推。结果就是交给任务的数据量,因此存放在同时内存,随着任务的长度而增加,如果正在处理大量数据,则不可避免地会出现内存不足的异常。

据推测,这种行为的最初原因是它希望避免让每个线程多次返回以获取数据,但它似乎是基于所有正在处理的数据都可以放入内存的假设(从大文件)。

如前所述,使用自定义分区程序可以避免此问题。一个简单地将数据一次返回到每个任务的通用示例如下:

https://gist.github.com/evolvedmicrobe/7997971

只需先实例化该类并将其交给 Parallel.For 而不是可枚举本身

于 2013-12-17T00:45:46.563 回答
-2

虽然使用自定义分区器无疑是最“正确”的答案,但更简单的解决方案是让垃圾收集器迎头赶上。在我尝试过的情况下,我在函数内重复调用 parallel.for 循环。尽管每次退出该函数,程序使用的内存都保持线性增长,如此处所述。我补充说:

//Force garbage collection.
GC.Collect();
// Wait for all finalizers to complete before continuing.
GC.WaitForPendingFinalizers();

虽然速度不是很快,但它确实解决了内存问题。大概在 CPU 使用率和内存使用率很高的情况下,垃圾收集器无法有效运行。

于 2019-04-05T17:06:51.330 回答