4

目前我们正在使用服务代理来回发送消息,这工作正常。但我们想使用 RELATED_CONVERSATION_GROUP 对这些消息进行分组。我们想使用我们自己的数据库持久化 uuid 作为数据库中的 RELATED_CONVERSATION_GROUP = @uuid,但即使每次收到队列时,每次转换组 ID 都不同时,我们使用相同的 uuid。

你们知道我创建代理或接收呼叫的方式有什么问题吗,我在下面提供了代理创建代码和接收呼叫代码。谢谢

下面是代码“服务代理创建代码”

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))

下面是代码“接收代码”

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT

细化

我们的情况是,我们需要循环接收来自该 Service Broker 队列的项目,阻塞 WAITFOR,然后通过不可靠的网络将它们交给另一个系统。从队列中接收到的项目的目的地是与该远程系统的许多连接之一。如果该项目未成功交付到其他系统,则应回滚该单个项目的事务并将该项目返回到队列中。我们在成功交付后提交事务,解锁由后续循环迭代获取的消息序列。

相关项目序列的延迟不应影响不相关序列的交付。单个项目一旦可用就会立即发送到队列中并立即转发。项目应该以单个文件的形式转发,尽管即使在一个序列中的交付顺序也不是很重要。

从一次接收一条消息的循环中,从我们的打开连接列表中选择一个新的或现有的 TcpClient,然后将消息和打开的连接通过异步 IO 回调链传递,直到传输完成。然后我们完成从 Service Broker Queue 接收到 Item 的 DB Transaction。

如何使用 Service Broker 和对话组来协助这种情况?

4

1 回答 1

9

对话组是本地的仅概念,专门用于锁定:相关对话属于一个组,因此当您在一个对话上处理消息时,另一个线程无法处理相关消息。没有关于两个端点交换的对话组的信息,因此在您的示例中,所有发起者端点最终都属于一个对话组,但目标端点都是一个不同的对话组(每个组只有一个对话)。系统之所以如此,是因为对话组旨在解决诸如旅行预订服务之类的问题:当它收到“预订旅行”的消息时,它必须预订航班、酒店和汽车出租。它必须发送三个消息,每个服务('flights'、'hotels'、'cars' ) 然后响应将异步返回。当它们返回时,处理必须确保它们不会被单独的线程同时处理,这些线程将尝试更新“旅行”记录状态。在消息传递中,这个问题被称为“消息关联问题”。

但是,会话组通常仅出于性能原因而部署在 SSB 中:它们允许更大的 RECEIVE 结果。目标端点可以通过使用一起移动到一个组中,MOVE CONVERSATION但实际上有一个更简单的技巧:反转对话的方向。让您的目的地开始对话(分组),并且来源发送其关于由目的地开始的对话的“更新”。

一些注意事项:

  • 不要使用 BEGIN/SEND/END 的即发即弃模式。您使将来无法诊断任何问题,请参阅Fire and Forget: Good for the military, but not for Service Broker conversations
  • 永远不要在生产代码中使用 WITH CLEANUP。它旨在用于管理上的最后手段,例如灾难恢复。如果您滥用它,您将拒绝 SSB 正确跟踪消息以正确重试传递的任何机会(如果消息在目标上反弹,无论出于何种原因,它将永远丢失)。
  • SSB 不保证对话之间的顺序,只保证一个对话中的顺序。为每个 INSERT 事件启动新对话并不能保证在目标上保留插入操作的顺序。
于 2011-06-23T00:11:54.777 回答