您需要非阻塞 Db 处理的常见原因包括:
- 因为每次数据库操作都需要很长时间
- 因为要处理的数据很多,并行性可以提高吞吐量。(即我们假设瓶颈不会仅仅移动到数据库中)
正如您的帖子所建议的那样,SqlConnections
需要考虑管理资源,因为共享SqlConnection
或SqlCommand
跨线程不是一个好主意。同步访问SqlConnection
是不可取的,因为它会取消任何并行化的好处。
问题 1. 的一个简单解决方案是强制每个线程建立自己的SqlConnection
,尽管这不利于高数据库吞吐量:
Task.Factory.StartNew(() =>
{
using (var conn = new SqlConnection(connectionString))
using (var cmd = conn.CreateCommand())
{
conn.Open();
SetupCmd(cmd);
SaveStat(cmd, statToSave);
}
});
存在后台写入的替代方案(案例 1.),例如通过拥有一个或多个长寿命的写入器线程,例如侦听队列ConcurrentQueue
,或者更好的是,由 ASP.Net 页面线程馈送的BlockingCollection
迭代 。GetConsumingEnumerable
writer(s) 线程将始终保持SqlConnection
打开状态。
在像 2. 这样的大容量情况下,重用SqlConnection
和SqlCommands
对性能至关重要。然后数据需要在多个线程(或任务,如果使用 TPL)之间进行分区。Parallel.ForEach为我们完成了大部分艰苦的工作 - 下面,重载 withlocalInit
用于建立SqlConnection
and SqlCommand
,然后将其传递给每个主体,并localFinally
在任务结束时调用 (经过 0..N 次迭代Body - 使用默认分区器,因此 TPL 决定需要多少个 Task,以及将多少项传递给每个 Task 主体)。localInit
允许与使用线程本地存储类似的范例。
一个警告 - 如果处理仅用于大量插入操作,则SqlBulkCopy可能是一种更好的方法。
以下是使用 TPL 的几个选项: 给定表格:
create table StatsData
(
x int ,
y decimal(20,5),
name nvarchar(50)
)
和一个模型:
public class StatsData
{
public int X { get; private set; }
public double Y { get; private set; }
public string Name { get; private set; }
public StatsData(int x, double y, string name)
{
X = x;
Y = y;
Name = name;
}
}
以下类提供了 2 个异步选项(针对第 1 点和第 2 点):
public class Dispatcher
{
// Helpers - refactoring
private static void SetupCmd(SqlCommand cmd)
{
cmd.CommandText = "insert into dbo.statsdata(x, y, Name) values (@x, @y, @Name);";
cmd.CommandType = CommandType.Text;
cmd.Parameters.Add("@x", SqlDbType.Int);
cmd.Parameters.Add("@y", SqlDbType.Decimal);
cmd.Parameters.Add("@Name", SqlDbType.NVarChar, 30);
}
private static void SaveStat(SqlCommand cmd, StatsData statToSave)
{
cmd.Parameters["@x"].Value = statToSave.X;
cmd.Parameters["@y"].Value = statToSave.Y;
cmd.Parameters["@Name"].Value = statToSave.Name;
cmd.ExecuteNonQuery();
}
// 1. Save 1 stat at a time on a background task. Use for low / intermittent volumes
public void SaveStatAsynch(string connectionString, StatsData statToSave)
{
Task.Factory.StartNew(() =>
{
using (var conn = new SqlConnection(connectionString))
using (var cmd = conn.CreateCommand())
{
conn.Open();
SetupCmd(cmd);
SaveStat(cmd, statToSave);
}
});
}
// 2. For background writing of large volumes of stats. Uses the default partitioner in parallel foreach
public void SaveStatsParallel(string connectionString, IEnumerable<StatsData> statsToSave)
{
Parallel.ForEach(
statsToSave,
// localInit. Return value is passed to each body invocation
() =>
{
var conn = new SqlConnection(connectionString);
var cmd = conn.CreateCommand();
SetupCmd(cmd);
conn.Open();
return new
{
Conn = conn,
Cmd = cmd
};
},
// Body, 0..N per Task decided by TPL
(stat, loopState, initData) =>
{
SaveStat(initData.Cmd, stat);
return initData;
},
// Disposables
(initData) =>
{
initData.Cmd.Dispose();
initData.Conn.Dispose();
}
);
}
使用示例:
const string connString = @"Server=.\SqlExpress;DataBase=StatsDb;Integrated Security=true";
// Create some dummy data
var statsToSave =
Enumerable
.Range(0, 10000)
.Select(i => new StatsData(i, i*Math.PI, string.Format("Stat #{0}", i)));
// Insert this in parallel on background tasks / threads as determined by the TPL
new Dispatcher().SaveStatsParallel(connString, statsToSave);