4

我已经设法让 SqlDependency 工作,但只要我不使用IsolationLevel.ReadUncommited我认为与 SqlDependency 无关的 SQL 事务。

当我IsolationLevel.ReadUncommitted在事务中使用(下面有大量评论)时,SqlDependency 订阅失败,并立即OnChange通知:

sqlNotificationEventArgs.Info = "Isolation";
sqlNotificationEventArgs.Source = "Statement";
sqlNotificationEventArgs.Type = "Subscribe";

当我删除 IsolationLevel 时,一切都按预期工作(当然,隔离是不正确的)。

这是我的相关代码:

private static string connString = "the connection string";
[MTAThread]
private static void Main(string[] args)
    while(true)
    {
        using (var context = new LinqDataContext(connString))
        {
            var conn = context.Connection;
            conn.Open();
            /***********************************************************************/
            /* Remove `IsolationLevel.ReadUncommitted` and the SqlDependency works */
            /***********************************************************************/
            using (var trans = conn.BeginTransaction(IsolationLevel.ReadUncommitted))
            {
                // simplified query, the real query uses UPDATE OUTPUT INSERTED
                const string sqlCommand = "SELECT [Columns] FROM dbo.[TABLE] WHERE [Status] = 'ready'";
                results = conn.Query({transaction: trans, sql: sqlCommand});
                trans.Commit();
            }
            DoAwesomeStuffWithTheResults(results, context);
        }
        WaitForWork();
    }
}

SqlDependency 相关代码:

private static ManualResetEvent _quitEvent = new ManualResetEvent(false);

/// <summary>
/// Sets up a SqlDependency a doesn't return until it receives a Change notification
/// </summary>
private static void WaitForWork(){
    // in case we have dependency running we need to go a head and stop it first. 
    SqlDependency.Stop(connString);
    SqlDependency.Start(connString);

    using (var conn = new SqlConnection(connString))
    {
        using (var cmd = new SqlCommand("SELECT [Status] From dbo.[TABLE]", conn))
        {
            cmd.Notification = null;

            var dependency = new SqlDependency(cmd);
            dependency.OnChange += dependency_OnDataChangedDelegate;

            conn.Open();

            cmd.ExecuteReader();
        }
    }
    _quitEvent.WaitOne();
    SqlDependency.Stop(connString);
}
private static void dependency_OnDataChangedDelegate(object sender, SqlNotificationEventArgs e)
{
    ((SqlDependency)sender).OnChange -= dependency_OnDataChangedDelegate;
    _quitEvent.Set();
}

我觉得好像我已经正确处理了上下文、它的连接和事务——在设置 SqlDependency 之前,但似乎情况并非如此。

我在这里做错了什么?

4

2 回答 2

8

恭喜你开始SqlDependency工作(我一点也不讽刺,很多人在这方面都失败了)。

现在是时候阅读MSDN 上的为通知创建查询主题了。您将看到查询对通知有效的条件,包括此要求:

该语句不得在 READ_UNCOMMITTED 或 SNAPSHOT 隔离级别下运行。

我写了关于如何SqlDependency工作的基础知识,也许会澄清一些误解。而且,作为一个侧节点,由于您使用的是 Linq,您可能对LinqToCache感兴趣,它提供了Linq查询和SqlDependency.

另一条评论:不要Start()Stop()SqlDependency的傻瓜。你很快就会后悔的。Start()应该在应用程序启动期间和Stop()应用程序关闭期间仅调用一次(严格来说,是在应用程序域加载和卸载期间)。

现在,关于您的问题:重要的隔离级别是通知查询之一。这意味着,您附加订阅的查询,而不是您执行的查询UPDATE(我不会评论在脏读下执行 UPDATE的智慧......或对任何事情使用脏读的智慧)。据我所知,您显示的代码不应在 read_uncommitted 下发布查询。在您发出SET TRANSACTION ISOLATION ...该会话中的所有后续事务(ergo all 语句)后,将处于该隔离级别。您关闭连接(通过处理 DataContext),然后使用不同的连接。除非...您使用连接池。欢迎来到无辜受害者俱乐部:)。Close()Open()连接池会跨边界泄漏隔离级别更改那是你的问题。有一些简单的解决方案:

在我们讨论的同时,您还需要阅读以下内容:使用表作为队列

于 2013-11-05T14:29:03.927 回答
1

以下是根据 Remus Rusanu 在回答中给出的提示更新的代码:

private static string connString = "the connection string";
[MTAThread]
private static void Main(string[] args)
    // Start() is supposed to be called exactly once, during app startup
    // and Stop() exactly once during app shutdown:
    SqlDependency.Start(connString);
    AppDomain.CurrentDomain.ProcessExit += delegate
    {
        SqlDependency.Stop(connString);
    };

    while(true) // to infinity, and beyond.
    {
        using (var context = new LinqDataContext(connString))
        {
            var conn = context.Connection;
            // Connection pooling leaks isolation level changes across 
            // Close()/Open() boundaries, use TransactionScope to avoid this.
            using (var scope = CreateTransactionScope(TransactionScopeOption.Required, transactionOptions))
            {
                conn.Open();
                const string sqlCommand = "UPDATE TOP(1) [Table] SET [Status] = 'budy' OUTPUT INSERTED.[Column], */... MORE ...*/ WHERE [Status] = 'ready'";
                results = conn.Query(sqlCommand);
                scope.Complete();
            }
            DoAwesomeStuffWithTheResults(results, context);
        }
        WaitForWork();
    }
}

SqlDependency 相关代码:

/// <summary>
/// Sets up a SqlDependency and doesn't return until it receives 
/// a Change notification
/// </summary>
private static void WaitForWork(string connString)
{
    var changedEvent = new AutoResetEvent(false);
    OnChangeEventHandler dataChangedDelegate = (sender, e) => changedEvent.Set();
    using (var conn = new SqlConnection(connString))
    {
        using (var scope = Databases.TransactionUtils.CreateTransactionScope())
        {
            conn.Open();
            var txtCmd = "SELECT [FileID] FROM dbo.[File] WHERE [Status] = 'ready'";
            using (var cmd = new SqlCommand(txtCmd, conn))
            {
                var dependency = new SqlDependency(cmd);
                OnChangeEventHandler dataChangedDelegate = null;
                dataChangedDelegate = (sender, e) =>
                {
                    dependency.OnChange -= dataChangedDelegate;
                    changedEvent.Set();
                };
                dependency.OnChange += dataChangedDelegate;
                cmd.ExecuteScalar();
            }
            scope.Complete();
        }
    }
    changedEvent.WaitOne();
    dependency.OnChange -= dependencyOnDataChangedDelegate;
}

新的 TransactionScope 代码:

/// <summary>
/// Using {the default} new TransactionScope Considered Harmful
/// http://blogs.msdn.com/b/dbrowne/archive/2010/06/03/using-new-transactionscope-considered-harmful.aspx
/// </summary>
private static TransactionScope CreateTransactionScope(System.Transactions.IsolationLevel isolationLevel = System.Transactions.IsolationLevel.ReadCommitted)
{
    var transactionOptions = new TransactionOptions
    {
        IsolationLevel = isolationLevel,
        Timeout = TransactionManager.MaximumTimeout
    };
    return new TransactionScope(TransactionScopeOption.Required, transactionOptions);
}
于 2013-11-05T17:15:02.253 回答