我在 C# 应用程序中使用 2008 R2 中的 SQL Server Broker,并尝试处理 SQL Server 检测到有害消息并禁用目标队列的情况。
发生这种情况时,当我尝试接收消息时会引发 SqlException。那时,我正在使用的 SqlTransaction 似乎不再是可提交的。
我将使用本教程与我的 C# 代码一起演示。
首先使用教程中的 T-SQL 代码创建必要的服务代理对象并发送消息,使其位于目标队列中。
CREATE MESSAGE TYPE
[//AWDB/1DBSample/RequestMessage]
VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE
[//AWDB/1DBSample/ReplyMessage]
VALIDATION = WELL_FORMED_XML;
GO
CREATE CONTRACT [//AWDB/1DBSample/SampleContract]
([//AWDB/1DBSample/RequestMessage]
SENT BY INITIATOR,
[//AWDB/1DBSample/ReplyMessage]
SENT BY TARGET
);
GO
CREATE QUEUE TargetQueue1DB;
CREATE SERVICE
[//AWDB/1DBSample/TargetService]
ON QUEUE TargetQueue1DB
([//AWDB/1DBSample/SampleContract]);
GO
CREATE QUEUE InitiatorQueue1DB;
CREATE SERVICE
[//AWDB/1DBSample/InitiatorService]
ON QUEUE InitiatorQueue1DB;
GO
DECLARE @InitDlgHandle UNIQUEIDENTIFIER;
DECLARE @RequestMsg NVARCHAR(100);
BEGIN TRANSACTION;
BEGIN DIALOG @InitDlgHandle
FROM SERVICE
[//AWDB/1DBSample/InitiatorService]
TO SERVICE
N'//AWDB/1DBSample/TargetService'
ON CONTRACT
[//AWDB/1DBSample/SampleContract]
WITH
ENCRYPTION = OFF;
SELECT @RequestMsg =
N'<RequestMsg>Message for Target service.</RequestMsg>';
SEND ON CONVERSATION @InitDlgHandle
MESSAGE TYPE
[//AWDB/1DBSample/RequestMessage]
(@RequestMsg);
SELECT @RequestMsg AS SentRequestMsg;
COMMIT TRANSACTION;
GO
接下来运行这个 C# 代码,它是一个控制台应用程序。
using System.Data.SqlClient;
namespace ServerConsoleApplication
{
class Program
{
static SqlConnection conn = null;
static void Main(string[] args)
{
conn = new SqlConnection("connection string");
conn.Open();
Receive(); // 1
Receive(); // 2
Receive(); // 3
Receive(); // 4
Receive(); // 5
Receive(); // 6 - Poison Message exception invoked
conn.Close();
}
static void Receive()
{
using (SqlTransaction tran = conn.BeginTransaction())
{
try
{
using (SqlCommand waitCommand = conn.CreateCommand())
{
waitCommand.Transaction = tran;
waitCommand.CommandText = string.Format("WAITFOR (RECEIVE TOP (1) conversation_handle, convert(xml,message_body) FROM TargetQueue1DB), TIMEOUT 1000");
using (SqlDataReader reader = waitCommand.ExecuteReader())
{
}
}
// Rollback on purpose to cause the poison message
tran.Rollback();
}
catch (SqlException ex)
{
if (ex.Number == 9617)
{
// Re-Enable the queue
using (SqlCommand enableCmd = conn.CreateCommand())
{
enableCmd.Transaction = tran;
enableCmd.CommandText = string.Format(@"ALTER QUEUE TargetQueue1DB WITH STATUS = ON");
enableCmd.ExecuteNonQuery();
}
System.Data.SqlTypes.SqlGuid handle = System.Data.SqlTypes.SqlGuid.Null;
// Pull the poison message off the queue
using (SqlCommand waitCommand = conn.CreateCommand())
{
waitCommand.Transaction = tran;
waitCommand.CommandText = string.Format("WAITFOR (RECEIVE TOP (1) conversation_handle, convert(xml,message_body) FROM TargetQueue1DB), TIMEOUT 1000");
using (SqlDataReader reader = waitCommand.ExecuteReader())
{
while (reader.Read())
{
handle = reader.GetSqlGuid(0);
}
}
}
// End the conversation just for clean up
using (SqlCommand endCmd = conn.CreateCommand())
{
endCmd.Transaction = tran;
endCmd.CommandText = "End Conversation @handle";
endCmd.Parameters.Add("@handle", System.Data.SqlDbType.UniqueIdentifier);
endCmd.Parameters["@handle"].Value = handle;
endCmd.ExecuteNonQuery();
}
// Commit the transaction so the message is removed from queue.
tran.Commit();
}
}
}
}
}
}
上面的代码只是行为的演示。当然,您通常不会像这样接收和调用 Rollback。
Receive 方法接收消息并在事务上调用 Rollback 以激发毒消息行为。在第六次调用 Receive 时抛出 SQLException,因为队列按预期禁用。
在这一点上,我想重新启用队列,关闭有害消息并结束对话(不需要结束)。这一切都有效,但随后我提交了事务,因为我真的希望将有毒消息从队列中删除。
结果 Commit 调用引发异常,说明
此 SqlTransaction 已完成;它不再可用。
在第 6 次调用 Receive 时没有调用 Rollback 或 Commit,这个事务是如何完成的?
另外,TargetQueue1DB 中的消息是如何被删除的?我认为接收不会将消息从队列中删除,除非它位于已提交的事务中。但是,如果您在调用该提交之前查看 TargetQueue1DB,则队列为空。
如果您稍微修改代码,以便在捕获 SqlException 时 waitCommand 在范围内,您将看到 waitCommand 实例的 Connection 和 Transaction 属性已设置为 null。这对我来说是奇怪的行为。