我试图找出一种很好的方法来并行化处理大数据集的代码,然后将结果数据导入 RavenDb。
数据处理受 CPU 限制和数据库导入 IO 限制。
我正在寻找一种在 Environment.ProcessorCount 线程数上并行处理的解决方案。然后,应将生成的数据导入到与上述过程并行的 x(比如说 10)个池线程上的 RavenDb 中。
这里的主要内容是我希望在导入完成的数据时继续处理,以便在等待导入完成时继续处理下一个数据集。
另一个问题是成功导入后需要丢弃每个批次的内存,因为私有工作内存很容易达到 >5GB。
下面的代码是我到目前为止所得到的。请注意,它不能满足上述并行化要求。
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
Task.Run(() =>
{
...
}
}
GetDataItem 产生可枚举的数据项,这些数据项被划分为批处理数据集。GetDataItem 将产生约 2,000,000 个项目,每个项目平均处理大约 0.3 毫秒。
该项目在 x64 平台上的最新 .NET 4.5 RC 上运行。
更新。
我当前的代码(见上文)将获取项目并分批对其进行分区。每个批次在八个线程上并行处理(i7 上的 Environment.ProcessorCount)。处理速度很慢,CPU 密集型和内存密集型。当单个批次的处理完成时,将启动一个任务以将结果数据异步导入 RavenDb。批量导入作业本身是同步的,如下所示:
using (var session = Store.OpenSession())
{
foreach (var data in batch)
{
session.Store(data);
}
session.SaveChanges();
}
这种方法存在一些问题:
每次完成一个批次时,都会启动一个任务来运行导入作业。我想限制并行运行的任务数量(例如,最多 10 个)。此外,即使启动了许多任务,它们似乎也永远不会并行运行。
内存分配是一个大问题。处理/导入批次后,它似乎仍保留在内存中。
我正在寻找解决上述问题的方法。理想情况下,我想要:
- 每个逻辑处理器一个线程执行繁重的批量数据处理。
- 十个左右的并行线程等待完成的批次导入 RavenDb。
- 将内存分配保持在最低限度,这意味着在导入任务完成后取消分配批次。
- 不在线程之一上运行导入作业以进行批处理。已完成批次的导入应与正在处理的下一个批次并行运行。
解决方案
var batchSize = 10000;
var bc = new BlockingCollection<List<Data>>();
var importTask = Task.Run(() =>
{
bc.GetConsumingEnumerable()
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.ForAll(batch =>
{
using (var session = Store.OpenSession())
{
foreach (var i in batch) session.Store(i);
session.SaveChanges();
}
});
});
var processTask = Task.Run(() =>
{
datasupplier.GetDataItems()
.Partition(batchSize)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.ForAll(batch =>
{
bc.Add(batch.Select(i => new Data()
{
...
}).ToList());
});
});
processTask.Wait();
bc.CompleteAdding();
importTask.Wait();