目前我们正在使用服务代理来回发送消息,这工作正常。但我们想使用 RELATED_CONVERSATION_GROUP 对这些消息进行分组。我们想使用我们自己的数据库持久化 uuid 作为数据库中的 RELATED_CONVERSATION_GROUP = @uuid,但即使每次收到队列时,每次转换组 ID 都不同时,我们使用相同的 uuid。
你们知道我创建代理或接收呼叫的方式有什么问题吗,我在下面提供了代理创建代码和接收呼叫代码。谢谢
下面是代码“服务代理创建代码”
CREATE PROCEDURE dbo.OnDataInserted
@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS
BEGIN
SET NOCOUNT ON;
DECLARE @conversation UNIQUEIDENTIFIER
BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;
SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))
下面是代码“接收代码”
WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON
BEGIN TRANSACTION;
DECLARE
@cID as uniqueidentifier,
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)
RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT
;WAITFOR (RECEIVE TOP (1)
@cID = Substring(CAST(message_body as nvarchar(max)),4,36),
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)
RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID;
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp <> ''
BEGIN
MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;
RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);
RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END
WAITFOR DELAY '000:00:10'
COMMIT
RAISERROR ('Committed' , 16, 1) WITH NOWAIT
细化
我们的情况是,我们需要循环接收来自该 Service Broker 队列的项目,阻塞 WAITFOR,然后通过不可靠的网络将它们交给另一个系统。从队列中接收到的项目的目的地是与该远程系统的许多连接之一。如果该项目未成功交付到其他系统,则应回滚该单个项目的事务并将该项目返回到队列中。我们在成功交付后提交事务,解锁由后续循环迭代获取的消息序列。
相关项目序列的延迟不应影响不相关序列的交付。单个项目一旦可用就会立即发送到队列中并立即转发。项目应该以单个文件的形式转发,尽管即使在一个序列中的交付顺序也不是很重要。
从一次接收一条消息的循环中,从我们的打开连接列表中选择一个新的或现有的 TcpClient,然后将消息和打开的连接通过异步 IO 回调链传递,直到传输完成。然后我们完成从 Service Broker Queue 接收到 Item 的 DB Transaction。
如何使用 Service Broker 和对话组来协助这种情况?