3

我试图找出一种很好的方法来并行化处理大数据集的代码,然后将结果数据导入 RavenDb。

数据处理受 CPU 限制和数据库导入 IO 限制。

我正在寻找一种在 Environment.ProcessorCount 线程数上并行处理的解决方案。然后,应将生成的数据导入到与上述过程并行的 x(比如说 10)个池线程上的 RavenDb 中。

这里的主要内容是我希望在导入完成的数据时继续处理,以便在等待导入完成时继续处理下一个数据集。

另一个问题是成功导入后需要丢弃每个批次的内存,因为私有工作内存很容易达到 >5GB。

下面的代码是我到目前为止所得到的。请注意,它不能满足上述并行化要求。

datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .ForAll(batch =>
    {
        Task.Run(() =>
        {
            ...
        }
    }

GetDataItem 产生可枚举的数据项,这些数据项被划分为批处理数据集。GetDataItem 将产生约 2,000,000 个项目,每个项目平均处理大约 0.3 毫秒。

该项目在 x64 平台上的最新 .NET 4.5 RC 上运行。

更新。

我当前的代码(见上文)将获取项目并分批对其进行分区。每个批次在八个线程上并行处理(i7 上的 Environment.ProcessorCount)。处理速度很慢,CPU 密集型和内存密集型。当单个批次的处理完成时,将启动一个任务以将结果数据异步导入 RavenDb。批量导入作业本身是同步的,如下所示:

using (var session = Store.OpenSession())
{
    foreach (var data in batch)
    {
        session.Store(data);
    }
    session.SaveChanges();
}

这种方法存在一些问题:

  1. 每次完成一个批次时,都会启动一个任务来运行导入作业。我想限制并行运行的任务数量(例如,最多 10 个)。此外,即使启动了许多任务,它们似乎也永远不会并行运行。

  2. 内存分配是一个大问题。处理/导入批次后,它似乎仍保留在内存中。

我正在寻找解决上述问题的方法。理想情况下,我想要:

  • 每个逻辑处理器一个线程执行繁重的批量数据处理。
  • 十个左右的并行线程等待完成的批次导入 RavenDb。
  • 将内存分配保持在最低限度,这意味着在导入任务完成后取消分配批次。
  • 不在线程之一上运行导入作业以进行批处理。已完成批次的导入应与正在处理的下一个批次并行运行。

解决方案

var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
    bc.GetConsumingEnumerable()
        .AsParallel()
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .WithMergeOptions(ParallelMergeOptions.NotBuffered)
        .ForAll(batch =>
        {
            using (var session = Store.OpenSession())
            {
                foreach (var i in batch) session.Store(i);
                session.SaveChanges();
            }
        });
});
var processTask = Task.Run(() =>
{
    datasupplier.GetDataItems()
        .Partition(batchSize)
        .AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount)
        .ForAll(batch =>
        {
            bc.Add(batch.Select(i => new Data()
            {
                ...
            }).ToList());
        });
});

processTask.Wait();
bc.CompleteAdding();
importTask.Wait();
4

3 回答 3

3

Your task overall sounds like a producer-consumer workflow. Your batch processors are producers, and your RavenDB data "import" are the consumers of the output of the producers.

Consider using a BlockingCollection<T> as the connection between your batch proccesors and your db importers. The db importers will wake up as soon as the batch processors push completed batches into the blocking collection, and will go back to sleep when they have "caught up" and emptied the collection.

批处理器生产者可以全速运行,并且始终与处理先前完成的批处理的 db 导入器任务并行运行。如果您担心批处理器可能比数据库导入器领先太多(b/c 数据库导入比处理每个批处理花费的时间要长得多),您可以在阻塞集合上设置一个上限,以便生产者在添加时会阻塞超过这个限制,给消费者一个追赶的机会。

不过,您的一些评论令人担忧。启动一个 Task 实例以异步执行数据库导入到批处理并没有什么特别的错误。任务!=线程。创建新任务实例不会产生与创建新线程相同的巨大开销。

不要过于精确地控制线程。即使您指定您想要的存储桶数量与您拥有的核心数量完全相同,您也无法独占使用这些核心。来自其他进程的数百个其他线程仍将安排在您的时间片之间。使用任务指定逻辑工作单元并让 TPL 管理线程池。让自己免于因错误的控制感而感到沮丧。;>

在您的评论中,您指出您的任务似乎没有彼此异步运行(您如何确定这一点?)并且在每批完成后似乎没有释放内存。我建议放弃一切,直到您首先弄清楚这两个问题的原因。您是否忘记在某处调用 Dispose() ?你是否持有一个让整个对象树不必要地存活的引用?你在测量正确的东西吗?并行任务是否被阻塞的数据库或网络 I/O 序列化?在解决这两个问题之前,您的并行计划是什么并不重要。

于 2012-07-26T23:49:29.250 回答
1

For each batch you are starting a task. This means that your loop completes very quickly. It leaves (number of batches) tasks behind which is not what you wanted. You wanted (number of CPUs).

Solution: Don't start a new task for each batch. The for loop is already parallel.

In response to your comment, here is an improved version:

//this runs in parallel
var processedBatches = datasupplier.GetDataItems()
    .Partition(batchSize)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .Select(x => ProcessCpuBound(x));

foreach (var batch in processedBatches) {
 PerformIOIntensiveWorkSingleThreadedly(batch); //this runs sequentially
}
于 2012-07-26T20:09:48.767 回答
0

我最近构建了类似的东西,我使用了 Queue 类 vs List 和 Parallel.Foreach。我发现过多的线程实际上会减慢速度,这是一个甜蜜点。

于 2012-08-02T23:33:18.543 回答