1

在我的文章站点中,我有一些需要数据库更新的统计操作,我想用线程异步执行此操作。

问题是当我试图暗示这里建议的内容时:在 C# 中做火并忘记方法的最简单方法?


ThreadPool.QueueUserWorkItem(o => FireAway());

因为 FireAway() 包含数据库更新,所以我收到“数据库连接已打开”错误。

我的问题是:编写一个线程作为原子操作在后台工作而不会发生冲突的最简单方法是什么。

我的网站:www.mentallica.co.il

添加了信息..... FireAway() 包含对存储我所有数据库更新的 dll 的调用。

我正在使用一个名为 bll MyBuisnessLogic bll = new MyBuisnessLogic() 的共享类

在 FireAway() 里面

bll.RunStatistics();

bll.RunStatistics() 函数打开和关闭 sql 连接。

我猜当一个线程打开 sql 连接而另一个线程试图打开一个已经打开的连接时,问题就出现了。

也许我应该为新线程创建一个单独的 MyBuisnessLogic 实例?也许我需要在内部使用 () 执行此操作?有点像使用 (MyBuisnessLogic bll = new MyBuisnessLogic) ?

---- 检查后我看到 MyBuisnessLogic 需要 Idisposible 才能工作......我应该那样做吗?

4

1 回答 1

1

您需要非阻塞 Db 处理的常见原因包括:

  1. 因为每次数据库操作都需要很长时间
  2. 因为要处理的数据很多,并行性可以提高吞吐量。(即我们假设瓶颈不会仅仅移动到数据库中)

正如您的帖子所建议的那样,SqlConnections需要考虑管理资源,因为共享SqlConnectionSqlCommand跨线程不是一个好主意。同步访问SqlConnection是不可取的,因为它会取消任何并行化的好处。

问题 1. 的一个简单解决方案是强制每个线程建立自己的SqlConnection,尽管这不利于高数据库吞吐量:

        Task.Factory.StartNew(() =>
        {
            using (var conn = new SqlConnection(connectionString))
            using (var cmd = conn.CreateCommand())
            {
                conn.Open();
                SetupCmd(cmd);
                SaveStat(cmd, statToSave);
            }
        });

存在后台写入的替代方案(案例 1.),例如通过拥有一个或多个长寿命的写入器线程,例如侦听队列ConcurrentQueue,或者更好的是,由 ASP.Net 页面线程馈送的BlockingCollection迭代 。GetConsumingEnumerablewriter(s) 线程将始终保持SqlConnection打开状态。

在像 2. 这样的大容量情况下,重用SqlConnectionSqlCommands对性能至关重要。然后数据需要在多个线程(或任务,如果使用 TPL)之间进行分区。Parallel.ForEach为我们完成了大部分艰苦的工作 - 下面,重载 withlocalInit用于建立SqlConnectionand SqlCommand,然后将其传递给每个主体,并localFinally在任务结束时调用 (经过 0..N 次迭代Body - 使用默认分区器,因此 TPL 决定需要多少个 Task,以及将多少项传递给每个 Task 主体)。localInit允许与使用线程本地存储类似的范例。

一个警告 - 如果处理仅用于大量插入操作,则SqlBulkCopy可能是一种更好的方法。

以下是使用 TPL 的几个选项: 给定表格:

create table StatsData
(
    x int ,
    y decimal(20,5),
    name nvarchar(50)
)

和一个模型:

public class StatsData
{
    public int X { get; private set; }
    public double Y { get; private set; }
    public string Name { get; private set; }
    public StatsData(int x, double y, string name)
    {
        X = x;
        Y = y;
        Name = name;
    }
}

以下类提供了 2 个异步选项(针对第 1 点和第 2 点):

public class Dispatcher
{
    // Helpers - refactoring
    private static void SetupCmd(SqlCommand cmd)
    {
        cmd.CommandText = "insert into dbo.statsdata(x, y, Name) values (@x, @y, @Name);";
        cmd.CommandType = CommandType.Text;
        cmd.Parameters.Add("@x", SqlDbType.Int);
        cmd.Parameters.Add("@y", SqlDbType.Decimal);
        cmd.Parameters.Add("@Name", SqlDbType.NVarChar, 30);
    }

    private static void SaveStat(SqlCommand cmd, StatsData statToSave)
    {
        cmd.Parameters["@x"].Value = statToSave.X;
        cmd.Parameters["@y"].Value = statToSave.Y;
        cmd.Parameters["@Name"].Value = statToSave.Name;
        cmd.ExecuteNonQuery();
    }

    // 1. Save 1 stat at a time on a background task. Use for low / intermittent volumes 
    public void SaveStatAsynch(string connectionString, StatsData statToSave)
    {
        Task.Factory.StartNew(() =>
        {
            using (var conn = new SqlConnection(connectionString))
            using (var cmd = conn.CreateCommand())
            {
                conn.Open();
                SetupCmd(cmd);
                SaveStat(cmd, statToSave);
            }
        });
    }

    // 2. For background writing of large volumes of stats. Uses the default partitioner in parallel foreach
    public void SaveStatsParallel(string connectionString, IEnumerable<StatsData> statsToSave)
    {
        Parallel.ForEach(
            statsToSave,
            // localInit. Return value is passed to each body invocation
            () =>
                {
                    var conn = new SqlConnection(connectionString);
                    var cmd = conn.CreateCommand();
                    SetupCmd(cmd);
                    conn.Open();
                    return new
                    {
                        Conn = conn,
                        Cmd = cmd
                    };
                },
            // Body, 0..N per Task decided by TPL
            (stat, loopState, initData) =>
                {
                    SaveStat(initData.Cmd, stat);
                    return initData;
                },
            // Disposables
            (initData) =>
                {
                    initData.Cmd.Dispose();
                    initData.Conn.Dispose();
                }
            );
    }

使用示例:

        const string connString = @"Server=.\SqlExpress;DataBase=StatsDb;Integrated Security=true";
        // Create some dummy data
        var statsToSave = 
            Enumerable
                .Range(0, 10000)
                .Select(i => new StatsData(i, i*Math.PI, string.Format("Stat #{0}", i)));
        // Insert this in parallel on background tasks / threads as determined by the TPL
        new Dispatcher().SaveStatsParallel(connString, statsToSave);
于 2013-09-21T10:27:37.693 回答