2

考虑以下场景:启用了重复数据删除的 Azure 服务总线,具有单个主题、单个订阅和订阅该队列的应用程序。

如何确保应用程序从队列中接收消息一次且仅一次?

这是我在应用程序中用于接收消息的代码:

public abstract class ServiceBusListener<T> : IServiceBusListener
{
    private SubscriptionClient subscriptionClient;
    // ..... snip

    private void ReceiveMessages()
    {
        message = this.subscriptionClient.Receive(TimeSpan.FromSeconds(5));

        if (message != null)
        {
            T payload = message.GetBody<T>(message);                                    

            try
            {
                DoWork(payload);

                message.Complete();
            }
            catch (Exception exception)
            {
                // message.Complete failed
            }
        }
    }
}

我预见的问题是,如果message.Complete()由于某种原因失败,那么刚刚处理的消息将保留在 Azure 中的订阅队列中。再次ReceiveMessages()调用时,它将从队列中获取相同的消息,并且应用程序将再次执行相同的工作。

虽然最好的解决方案是具有幂等域逻辑 ( DoWork(payload)),但在这种情况下很难编写。

我能看到的确保一次且仅一次交付给应用程序的唯一方法是构建另一个队列以充当 Azure 服务总线和应用程序之间的中介。我相信这被称为“持久的客户端队列”。

但是我可以看到这对于许多使用 Azure 服务总线的应用程序来说是一个潜在的问题,那么持久的客户端队列是唯一的解决方案吗?

4

3 回答 3

3

The default behavior when you dequeue a message is called "Peek-Lock" it will lock the message so no one else can get it while your processing it and will remove it when you commit. It will unlock if you fail to commit, so it could be picked up again. This is probably what you are experiencing. You can change the behavior to use "Receive and Delete" which will delete it from the queue as soon as you receive it for processing. https://msdn.microsoft.com/en-us/library/azure/hh780770.aspx

https://azure.microsoft.com/en-us/documentation/articles/service-bus-dotnet-how-to-use-topics-subscriptions/#how-to-receive-messages-from-a-subscription

于 2015-06-19T15:54:15.247 回答
2

在我负责的大型 Azure 平台中,我遇到了类似的挑战。我使用了补偿事务模式 ( https://msdn.microsoft.com/en-us/library/dn589804.aspx ) 和事件溯源模式 ( https://msdn.microsoft.com ) 所体现的概念的逻辑组合/en-us/library/dn589792.aspx)。具体如何结合这些概念会有所不同,但最终,您可能需要计划自己的“回滚”逻辑,或者检测到先前的过程 100% 成功完成减去消息的删除。如果您可以预先检查某些内容,您将知道一条消息根本没有被删除,然后完成它并继续前进。“检查”有多昂贵可能会使这成为一个坏主意。您甚至可以“创建”一个人为的最后一步,例如向数据库添加一行,该步骤仅在 DoWork 到达末尾时运行。然后,您可以在处理任何其他消息之前检查该行。

IMO,最好的方法是确保 DoWork() 中的所有步骤都检查是否存在已经执行的工作(如果可能的话)。例如,如果它正在创建一个数据库表,请运行“IF NOT EXISTS(SELECT TABLE_NAME FROM INFORMATION_SCHEMA...”。在这种情况下,即使发生这种情况的可能性很小,再次处理该消息也是安全的。

我使用的其他方法是存储前 X 条消息(即 10,000 条)的 MessageID(每条消息上的顺序 bigint),然后在继续处理消息之前检查它们是否存在(NOT IN)。没有你想象的那么贵,而且非常安全。如果找到,只需 Complete() 消息并继续。在其他情况下,我将消息更新为“开始”类型状态(在某些队列类型中内联,在其他队列类型中持续存在),然后继续。如果您阅读了一条消息并且该消息已设置为“已启动”,则您知道某些事情要么失败要么没有正确清除。

抱歉,这不是一个明确的答案,但有很多考虑因素。

最亲切的问候...

于 2015-06-19T20:59:19.640 回答
1

如果您包含用于检测消息是否已成功处理或其已到达您的消息处理阶段的逻辑,则您可以继续使用单个订阅。

例如,我使用服务总线消息将来自外部支付系统的付款插入 CRM 系统。在插入之前,消息处理逻辑首先检查 CRM 中是否已经存在付款(使用与付款关联的唯一 ID)。这是必需的,因为有时付款会成功添加到 CRM 中,但不会这样报告(超时或连接)。在获取消息时使用接收/删除意味着付款可能会丢失,不检查付款是否已经存在可能会导致重复付款。

如果这不可能,那么我应用的另一个解决方案是更新表存储以记录处理消息的进度。收到消息时,会检查表格以查看是否已完成任何阶段。这允许消息从它之前到达的阶段继续。

您概述的场景的最可能原因是 DoWork 花费的时间超过了消息的锁定。消息锁定超时可以调整为安全地超过预期 DoWork 周期的值。如果您能够跟踪处理消息锁定到期所花费的时间,也可以在处理程序中对消息调用 RenewLock。

也许我误解了第二个队列的设计原则,但似乎这同样容易受到您概述的原始场景的影响。

在不知道您的 DoWork() 涉及什么的情况下很难给出明确的答案,但我会考虑将上述一种或组合作为更好的解决方案。

于 2015-06-19T18:13:30.437 回答