首先:不要DataTable
用于这样的操作:
- 很慢
- 它消耗太多内存
- 你需要等待很长时间才能真正开始处理数据
- 在此期间,额外的核心什么也不做,因为将数据读入 a
DataTable
没有被并行化。
- 此外,在读取数据时,CPU 通常几乎没有得到充分利用,因为网络或其他 I/O 延迟通常是主要因素。
再说一遍:不要DataTable
用于这样的操作。
而是使用DataReader
. 这使您可以立即开始使用/处理数据,而不是等待它被加载。最简单的版本是(MS SQL Server 示例):
var command = new SqlCommand()
{
CommandText = "SELECT * FROM Table";
Connection = new SqlConnection("InsertConnectionString");
};
using(var reader = command.ExecuteReader())
{
while(reader.Read())
{
var values = new object[reader.FieldCount];
reader.GetValues(values);
// process values of row
}
}
在执行您的处理代码时,阅读器将被阻止,这意味着不再从数据库中读取行。
如果处理代码很繁重,使用该库创建执行检查的任务可能是值得Task
的,这将使您能够使用多个内核。但是,创建一个 有一个开销Task
,如果一个Task
不包含足够的“工作”,您可以将几行批处理在一起:
public void ReadData()
{
var taskList = new List<Task<SomeResultType>>();
var command = new SqlCommand()
{
CommandText = "SELECT * FROM Table";
Connection = new SqlConnection("InsertConnectionString");
};
using(var reader = command.ExecuteReader())
{
var valueList = new List<object[]>(100);
while(reader.Read())
{
var values = new object[reader.FieldCount];
reader.GetValues(values);
valueList.Add(values);
if(valueList.Count == 100)
{
var localValueList = valueList.ToList();
valueList.Clear();
taskList.Add(Task<SomeResultType>.Factory.StartNew(() => Process(localValueList));
}
}
if(valueList.Count > 0)
taskList.Add(Task<SomeResultType>.Factory.StartNew(() => Process(valueList));
}
// this line completes when all tasks are done
Task.WaitAll(taskList.ToArray());
}
public SomeResultType Process(List<object[]> valueList)
{
foreach(var vals in valueList)
{
// put your processing code here, be sure to synchronize your actions properly
}
}
- 批量大小(当前为 100)取决于正在完成的实际处理,可能需要进行调整。
- 同步有它自己的挑战,你需要非常小心共享资源