0

我在 C# 应用程序中使用 2008 R2 中的 SQL Server Broker,并尝试处理 SQL Server 检测到有害消息并禁用目标队列的情况。

发生这种情况时,当我尝试接收消息时会引发 SqlException。那时,我正在使用的 SqlTransaction 似乎不再是可提交的。

我将使用本教程与我的 C# 代码一起演示。

首先使用教程中的 T-SQL 代码创建必要的服务代理对象并发送消息,使其位于目标队列中。

CREATE MESSAGE TYPE
       [//AWDB/1DBSample/RequestMessage]
       VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE
       [//AWDB/1DBSample/ReplyMessage]
       VALIDATION = WELL_FORMED_XML;
GO

CREATE CONTRACT [//AWDB/1DBSample/SampleContract]
      ([//AWDB/1DBSample/RequestMessage]
       SENT BY INITIATOR,
       [//AWDB/1DBSample/ReplyMessage]
       SENT BY TARGET
      );
GO

CREATE QUEUE TargetQueue1DB;

CREATE SERVICE
       [//AWDB/1DBSample/TargetService]
       ON QUEUE TargetQueue1DB
       ([//AWDB/1DBSample/SampleContract]);
GO

CREATE QUEUE InitiatorQueue1DB;

CREATE SERVICE
       [//AWDB/1DBSample/InitiatorService]
       ON QUEUE InitiatorQueue1DB;
GO

DECLARE @InitDlgHandle UNIQUEIDENTIFIER;
DECLARE @RequestMsg NVARCHAR(100);

BEGIN TRANSACTION;

BEGIN DIALOG @InitDlgHandle
     FROM SERVICE
      [//AWDB/1DBSample/InitiatorService]
     TO SERVICE
      N'//AWDB/1DBSample/TargetService'
     ON CONTRACT
      [//AWDB/1DBSample/SampleContract]
     WITH
         ENCRYPTION = OFF;

SELECT @RequestMsg =
       N'<RequestMsg>Message for Target service.</RequestMsg>';

SEND ON CONVERSATION @InitDlgHandle
     MESSAGE TYPE 
     [//AWDB/1DBSample/RequestMessage]
     (@RequestMsg);

SELECT @RequestMsg AS SentRequestMsg;

COMMIT TRANSACTION;
GO

接下来运行这个 C# 代码,它是一个控制台应用程序。

using System.Data.SqlClient;

namespace ServerConsoleApplication
{
    class Program
    {
        static SqlConnection conn = null;

        static void Main(string[] args)
        {
            conn = new SqlConnection("connection string");
            conn.Open();

            Receive(); // 1
            Receive(); // 2
            Receive(); // 3
            Receive(); // 4
            Receive(); // 5
            Receive(); // 6 - Poison Message exception invoked

            conn.Close();
        }

        static void Receive()
        {
            using (SqlTransaction tran = conn.BeginTransaction())
            {
                try
                {
                    using (SqlCommand waitCommand = conn.CreateCommand())
                    {
                        waitCommand.Transaction = tran;
                        waitCommand.CommandText = string.Format("WAITFOR (RECEIVE TOP (1) conversation_handle, convert(xml,message_body) FROM TargetQueue1DB), TIMEOUT 1000");

                        using (SqlDataReader reader = waitCommand.ExecuteReader())
                        {
                        }
                    }

                    // Rollback on purpose to cause the poison message
                    tran.Rollback();
                }
                catch (SqlException ex)
                {
                    if (ex.Number == 9617)
                    {
                        // Re-Enable the queue
                        using (SqlCommand enableCmd = conn.CreateCommand())
                        {
                            enableCmd.Transaction = tran;
                            enableCmd.CommandText = string.Format(@"ALTER QUEUE TargetQueue1DB WITH STATUS = ON");
                            enableCmd.ExecuteNonQuery();
                        }

                        System.Data.SqlTypes.SqlGuid handle = System.Data.SqlTypes.SqlGuid.Null;

                        // Pull the poison message off the queue
                        using (SqlCommand waitCommand = conn.CreateCommand())
                        {
                            waitCommand.Transaction = tran;
                            waitCommand.CommandText = string.Format("WAITFOR (RECEIVE TOP (1) conversation_handle, convert(xml,message_body) FROM TargetQueue1DB), TIMEOUT 1000");

                            using (SqlDataReader reader = waitCommand.ExecuteReader())
                            {
                                while (reader.Read())
                                {
                                    handle = reader.GetSqlGuid(0);
                                }
                            }
                        }

                        // End the conversation just for clean up
                        using (SqlCommand endCmd = conn.CreateCommand())
                        {
                            endCmd.Transaction = tran;
                            endCmd.CommandText = "End Conversation @handle";
                            endCmd.Parameters.Add("@handle", System.Data.SqlDbType.UniqueIdentifier);
                            endCmd.Parameters["@handle"].Value = handle;
                            endCmd.ExecuteNonQuery();
                        }

                        // Commit the transaction so the message is removed from queue.
                        tran.Commit();
                    }
                }
            }
        }
    }
}

上面的代码只是行为的演示。当然,您通常不会像这样接收和调用 Rollback。

Receive 方法接收消息并在事务上调用 Rollback 以激发毒消息行为。在第六次调用 Receive 时抛出 SQLException,因为队列按预期禁用。

在这一点上,我想重新启用队列,关闭有害消息并结束对话(不需要结束)。这一切都有效,但随后我提交了事务,因为我真的希望将有毒消息从队列中删除。

结果 Commit 调用引发异常,说明

此 SqlTransaction 已完成;它不再可用。

在第 6 次调用 Receive 时没有调用 Rollback 或 Commit,这个事务是如何完成的?

另外,TargetQueue1DB 中的消息是如何被删除的?我认为接收不会将消息从队列中删除,除非它位于已提交的事务中。但是,如果您在调用该提交之前查看 TargetQueue1DB,则队列为空。

如果您稍微修改代码,以便在捕获 SqlException 时 waitCommand 在范围内,您将看到 waitCommand 实例的 Connection 和 Transaction 属性已设置为 null。这对我来说是奇怪的行为。

4

1 回答 1

2

SqlTransaction 的客户端状态不一定反映服务器上的事务状态。考虑您捕获的异常是否为 1205,死锁。在这种情况下,在服务器中引发异常之前,事务已经在服务器上回滚,即使当前帧中有一个既未提交也未回滚的 SqlTransaction 对象。

在你的 catch 块中,你需要处理你当前的事务对象并开始一个新的对象来执行你的错误处理逻辑。

该消息已被删除,因为您执行了捕获处理逻辑而没有在服务器上启动实际事务。tran您使用了不再相关的过期对象。您的 RECEIVE 立即被提交(根本没有周围的事务)。

于 2014-08-26T21:13:38.693 回答