0

(有关解决方案,请参阅我在此原始帖子底部的编辑)

设置

我的 Microsoft SQL Server 2005 Express 数据库中有两个存储过程:

  • WaitForMyMessage(@myName NVARCHAR(50), @myMessage NVARCHAR(MAX) OUTPUT)
  • ProvideMessage(@name NVARCHAR(50), @message NVARCHAR(MAX))

我想WaitForMyMessage()阻止,直到有人ProvideMessage()用相应的名字打电话。如果有人已经ProvideMessage()使用该名称呼叫过,那么WaitForMyMessage()将立即返回提供的值。

我最初考虑使用具有 FIFO 队列行为的简单表来实现这一点,但找不到阻止INSERT进入该表的方法。因此WaitForMyMessage()必须进行民意调查,出于显而易见的原因,这是不可接受的。

Q1:

有没有一种有效的方法来阻止直到某个记录出现在表中?WAITFOR语句会很棒,但 SQL 似乎不支持它进行查询(仅支持DELAYTIMERECEIVE)。但类似的东西会很棒,

例如:

-- It would be great is SQL supported this, but as far as I can tell it doesn't.
DECLARE @t TABLE (ans NVARCHAR(MAX));
WAITFOR (
  WITH A AS (
    SELECT TOP (1) * 
    FROM ProviderMessage A 
    WHERE ProviderMessage.Name = @myName
    ORDER BY A.ID
  )
  DELETE FROM A 
  OUTPUT deleted.ID INTO @t
);
SET @myMessage = (SELECT ans FROM @t);

所以它会一直闲置,直到有人将具有适当的记录插入NameProviderMessage中,一旦发生这种情况,该记录将被上面删除,同时检索其Value字段以返回给调用者。

其他想法

唉,我找不到Q1的答案,所以我继续使用 Service Broker 提供的实际消息队列来实现这个。考虑到 Service Broker 的力量和影响力,这似乎有点过头了,但是如果没有Q1的答案,我不得不试一试。我将我的服务和简单​​队列定义如下:

CREATE QUEUE q1
CREATE SERVICE s1 ON QUEUE q1 ([DEFAULT])

然后WaitForMyMessage()变成了:

DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (
  SELECT FarHandle 
  FROM ProviderInfo 
  WHERE ProviderInfo.Name = @myName
);
WAITFOR (
  RECEIVE @myMessage = CONVERT(NVARCHAR(MAX), message_body) 
  FROM q1
  WHERE conversation_handle = @farHandle
);

ProvideMessage()会发送消息,如下所示:

DECLARE @nearHandle UNIQUEIDENTIFIER;
SET @nearHandle = (
  SELECT NearHandle 
  FROM ProviderInfo 
  WHERE ProviderInfo.Name = @name
);
SEND ON CONVERSATION @nearHandle (@message)

这一切都运行良好,除了一件事:似乎 Service Broker 不支持发现给定对话的近端和远端句柄。我必须两者都知道,这样我才能为两个过程填充ProviderInfo表格以进行私下通信。

Q2:

如何才能同时获得新对话的近端对话句柄和远端对话句柄? 现在我通过这样的查询来做到这sys.conversation_endpoints一点:

-- Create the conversation
DECLARE @nearHandle UNIQUEIDENTIFIER; 
BEGIN DIALOG CONVERSATION @nearHandle 
FROM SERVICE s1 
TO SERVICE 's1' 
WITH ENCRYPTION = OFF; 

-- Queue an initialization message
SEND ON CONVERSATION @nearHandle ('');

-- Figure out the handle to the receiving side of this conversation
DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (
  SELECT conversation_handle 
  FROM sys.conversation_endpoints 
  WHERE conversation_id = (
    SELECT A.conversation_id 
    FROM sys.conversation_endpoints A 
    WHERE A.conversation_handle = @nearHandle 
  ) AND conversation_handle <> @nearHandle 
);

-- Get our initialization message out of the queue
DECLARE @unused TINYINT;
WAITFOR (
  RECEIVE @unused = status 
  FROM q1
  WHERE conversation_handle = @farHandle
);

-- Store both conversation handles, associated with this name
INSERT INTO ProviderInfo (Name, NearHandle, FarHandle)
 SELECT @name, @nearHandle, @farHandle

但是由于 Service Broker 架构旨在支持更复杂的场景,包括分布式服务等,消息sys.transmission_queue甚至可能被放置在本地,以及其他实现的复杂性,我对我的方法对于生产来说足够健壮不是很有信心.

因此,如果我这样做的方式不健全,是否有“正确”的方式?我考虑尝试通过使用对话组来避免这种需要,但由于基本相同的问题而无法解决(对话组 ID 也没有传达给远端),并且我发现这些主题没有提供解决方案任何一个:

结论

实现这项工作的障碍让我担心它不应该以这种方式使用,因此要么在某些生产场景中不起作用,要么将来可能不受支持。任何人都可以提供表明这种方式可靠的文档,或提供可靠且仍然有效的替代解决方案(有或没有 Service Broker)吗?

谢谢!

编辑:(解决方案)

这里的中心问题是 Q2(如何同时获得新对话的近端对话句柄和远端对话句柄?)。

多亏了几位贡献者的好主意,答案浮出水面(现在看起来很明显!)是通过在初始化消息SELECT之后立即从队列本身中获取远程句柄,因为该语句允许您过滤任何排队列!SENDSELECT

因此,不要像我原来的帖子那样这样做:

-- Queue an initialization message
SEND ON CONVERSATION @nearHandle ('');

-- Figure out the handle to the receiving side of this conversation
DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (
  SELECT conversation_handle 
  FROM sys.conversation_endpoints 
  WHERE conversation_id = (
    SELECT A.conversation_id 
    FROM sys.conversation_endpoints A 
    WHERE A.conversation_handle = @nearHandle 
  ) AND conversation_handle <> @nearHandle 
);

-- Get our initialization message out of the queue
...

我们可以更简单(更有效!)这样做:

-- Queue an initialization message with a unique identifier
DECLARE @initbin VARBINARY(36);
SET @initbin = CONVERT(VARBINARY(32), @nearHandle);
SEND ON CONVERSATION @nearHandle (@initbin);

-- Figure out the handle to the receiving side of this conversation using the known unique identifier
DECLARE @farHandle UNIQUEIDENTIFIER;
SET @farHandle = (SELECT conversation_handle FROM q1 WHERE message_body = @initbin)

-- Get our initialization message out of the queue
...

谢谢大家!

4

2 回答 2

1

如果您需要来回传递消息,Service Broker 将非常适合您。在插入行之前没有内置功能可以阻止,但是您可以使用 aquery subscription在查询的结果集发生更改时通知您。这是建立在 Service Broker 架构之上的,我认为您需要从 .NET 而不是 SQL 设置订阅。

回到Service Broker,如果它是本地的,您不必担心很多,例如路由等,因此您的案例是最简单的。传输队列只是需要发送的消息的存放区,无需担心。

我认为让你绊倒的事情是你不必担心获得对话句柄,因为无论如何你都会得到它们。

(1)让消息的接收者阻塞在队列中:

declare @status tinyint, @far_handle uniqueidentifier, @myMessage nvarchar(max);

waitfor (
    receive @status = status, 
            @far_handle = conversation_handle, 
            @myMessage = CONVERT(NVARCHAR(MAX), message_body) 
    from q1
)

(2)从你的服务开始对话回到它自己:

declare @near_handle uniqueidentifier

begin dialog conversation @near_handle
from service s1 to service 's1'
with encryption = off

send on conversation @near_handle ('hello')

现在,当您在 (2) 中开始对话时,您将获得连接侧的对话句柄,然后您可以做您想做的事情,即插入表格等。

在阻塞方面,当消息到达时,您会收集状态和消息正文,以及对话句柄,即对话方的句柄。您可以使用它来回复、存储它、用它更新表行等。

现在的问题是,因为起初它是在没有对话句柄的情况下接收的,因为它没有对话句柄,所以当对话建立时,它应该将它的对话句柄放在接收的 where 子句中。

receive @status = status, 
        @myMessage = CONVERT(NVARCHAR(MAX), message_body) 
from q1
where conversation_handle = @far_handle

否则它将开始接收自己发送的消息。这是因为您有一个服务在同一个队列中与自己对话。您可以通过使用 2 个服务相互交谈来解决这个问题。这通常是一种更清洁的方法。

这基本上消除了 go to 的需要sys.conversation_endpoints,这实际上是为了管理对话。

此外,为了干净利落地结束对话,您应该从双方结束它们。永远不要让自己陷入需要使用end conversation with cleanup.

要同时处理多个对话,您可以使用名为queue activation. 如果您不需要同时处理它们,则不需要这个。要使用激活,最好使用两个服务和队列。

完整示例

(1)做一些设置

create queue srcq
create service src on queue srcq([DEFAULT])
GO

create queue destq
create service dest on queue destq([DEFAULT])
GO

(2)创建一个过程来处理收到的消息

create procedure messageHandler as 

declare @far_handle uniqueidentifier,
        @message xml,
        @message_type nvarchar(256),
        @name varchar(32),
        @payload nvarchar(max),
        @handler varchar(128)

waitfor (
    receive @far_handle = conversation_handle, @message_type = message_type_name, @message = cast(message_body as xml) 
    from destq
)

if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
    -- Deal with error
    exec dealWithError 
else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') 
begin
    -- End the Conversation
    end conversation @far_handle; 
end 
else  
begin
    set @name = @message.value('(/xml/name)[1]', 'varchar(32)');
    set @payload = @message.value('(/xml/payload)[1]', 'nvarchar(max)');

    if (select ReceiverHandle from ProviderInfo where Name = @name) is null
        update ProviderInfo
            set ReceiverHandle = @far_handle
        where Name = @name;

    -- Now Process @name however you want to
    -- This basically creates a string, say 'bobHandler', and then executes it as an sp, passing it the payload
    set @handler = @name + 'Handler';
    exec @handler @payload; 
end 
GO

(3)为与名称'bob'关联的消息创建一个处理程序

create procedure bobHandler (@payload nvarchar(max))
as
    print 'hello'
GO

(4)设置目的队列使用激活

alter queue destq 
with activation (
    status = on,
    procedure_name = messageHandler,
    max_queue_readers = 10,
    execute as 'dbo'
)
GO

(5)在Sender上,开始一个会话,存储发送句柄,然后发送一条消息

declare @near_handle uniqueidentifier
begin dialog conversation @near_handle
from service src to service 'dest'
with encryption = off

-- Store this handle somewhere for future use
merge into ProviderInfo p
using (
    select 'bob' as Name, @near_handle as SenderHandle
) t on p.Name = t.Name
when matched then
    update set SenderHandle = t.SenderHandle, ReceiverHandle = null
when not matched then
    insert (Name, SenderHandle) values (t.Name, t.SenderHandle);

send on conversation @near_handle ('<xml><name>bob</name><payload>89237981273982173</payload></xml>')
GO

发送消息将导致消息处理程序唤醒,并调用“bobHandler”存储过程。设置max_queue_readers为 10 意味着可以同时处理 10 条消息。

如果您不想使用激活,因此接收器上的一个线程处理所有传入的消息,您可以通过简单地在目标队列上将其关闭,并将“messageHandler”存储过程更改为使用wait for (receive)并运行它来做到这一点循环中的代码。

如果这一切都结束了,因为您实际上希望一个人来调用接收过程,忘记激活,然后试试这个:

create procedure handleMessage (@name varchar(32))
as
    declare @far_handle uniqueidentifier,
        @message xml,
        @message_type nvarchar(256),
        @payload nvarchar(max),
        @handler varchar(128),
        @loop bit = 1

    while (@loop = 1)
    begin
        -- Wait for a handle with our name
        select @far_handle = conversation_handle
        from destq
        where cast(message_body as xml).value('(/xml/name)[1]', 'varchar(32)') = @name

        if (@far_handle is not null)
            set @loop = 0
        else
            waitfor delay '00:00:02'
    end

    set @loop = 1

    while (@loop = 1)
    begin
        waitfor (
            receive @message_type = message_type_name, @message = cast(message_body as xml) 
            from destq
            where conversation_handle = @far_handle
        )

        if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
            -- Deal with error
            exec dealWithError
        else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
        begin
            -- End the Conversation
            end conversation @far_handle;

            --Exit
            set @loop = 0
        end
        else 
        begin
            set @payload = @message.value('(/xml/payload)[1]', 'nvarchar(max)');

            if (select ReceiverHandle from ProviderInfo where Name = @name) is null
                update ProviderInfo
                    set ReceiverHandle = @far_handle
                where Name = @name;

            -- Now Process @name however you want to
            -- This basically creates a string, say 'bobHandler', and then executes it as an sp, passing it the payload
            set @handler = @name + 'Handler';
            exec @handler @payload;
        end
    end
GO
于 2013-02-28T22:08:22.470 回答
0

我认为,就好像您在谈论 SQL Server 环境中的消息交换一样,Service Broker 肯定会为您提供解决方案。这不是所有任务和所有可能要求的完美解决方案,但它肯定会起作用。

通常,需要更详细地指定您的要求,但考虑到一些假设,我们可以为您找到两种可能的解决方案。假设 SB 配置为 Sender (ProvideMessage) 和 Target (WaitForMyMessage) 之间的对话

  1. 假设我们的 Target 使用 WaitFor_Any_Message 过程。当 Target 收到任何消息时,它会调用 WaitFor_Any_Message 来检查名称并依次调用 WaitForMyMessage 来处理此消息。问题是 - 您是否需要处理其他消息?如果没有 - 没问题,请丢弃它们。如果是 - 调用另一个程序来处理它们。

  2. 第二种方法是为任何名称配置单独的队列。

无论如何,SB 非常适合与单个数据库进行本地消息交换。这在几个生产部署中对我来说效果很好。

于 2013-03-01T12:00:49.050 回答