3

我构建了一个服务来支持 Azure 服务总线中的多个队列订阅,但我遇到了一些奇怪的行为。

我的订阅单例类有一个如下所示的方法:

    public void Subscribe<TMessage>(Func<TMessage, Task> execution, int maxDop = 1, int ttl = 60) where TMessage : IServiceBusMessage
    {
        try
        {
            var messageLifespan = TimeSpan.FromSeconds(ttl);
            var messageType = typeof(TMessage);
            if (!_activeSubscriptionClients.TryGetValue(messageType, out var subscriptionClient))
            {
                subscriptionClient = _subscriptionClientFactory.Create(typeof(TMessage)).GetAwaiter().GetResult();
                if (subscriptionClient.OperationTimeout < messageLifespan) subscriptionClient.OperationTimeout = messageLifespan;
                if (subscriptionClient.ServiceBusConnection.OperationTimeout < messageLifespan)
                    subscriptionClient.ServiceBusConnection.OperationTimeout = messageLifespan;
                _activeSubscriptionClients.AddOrUpdate(messageType, subscriptionClient, (key, value) => value);
            }

            var messageHandlerOptions = new MessageHandlerOptions(OnException)
            {
                MaxConcurrentCalls = maxDop,
                AutoComplete = false,
                MaxAutoRenewDuration = messageLifespan,
            };


            subscriptionClient.RegisterMessageHandler(
                async (azureMessage, cancellationToken) =>
                {
                    try
                    {
                        var textPayload = _encoding.GetString(azureMessage.Body);
                        var message = JsonConvert.DeserializeObject<TMessage>(textPayload);
                        if (message == null)
                            throw new FormatException($"Cannot deserialize the message payload to type '{typeof(TMessage).FullName}'.");
                        await execution.Invoke(message);
                        await subscriptionClient.CompleteAsync(azureMessage.SystemProperties.LockToken);
                    }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "ProcessMessagesAsync(Message, CancellationToken)");
                        await subscriptionClient.AbandonAsync(azureMessage.SystemProperties.LockToken);
                    }
                }
                , messageHandlerOptions);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Subscribe(Action<TMessage>)");
            throw;
        }
    }

这个想法是,您订阅 Azure 服务总线以获取特定类型的消息,并且直接对应于队列。在您的订阅中,您传入一个委托以了解如何处理消息。

这似乎工作......有一个警告。

无论我为任何给定消息在长时间运行的进程中ttlMaxAutoRenewDurationOperationTimeout

我的理解是,这正是MaxAutoRenewDuration应该阻止的……但它似乎并没有阻止任何事情。

谁能告诉我我需要做些什么来确保消费者拥有消息直到完成?

4

2 回答 2

0

我可以想到一些您可能想要查看的选项。

  1. 与其ReceiveMode = PeekLock在 SubscriptionClient 中使用默认值,不如将其设置为 ReceiveAndDelete,这样一旦消息被消费,它将从队列中删除并且不会被任何其他客户端消费,这确实意味着您必须优雅地处理异常并执行重试自己;

  2. 看看OperationTimeout根据 doco 它是针对哪个Duration after which individual operations will timeout

于 2019-01-29T23:30:19.537 回答
0

It turns out the remote process that the consumer was running was failing silently and not returning a failure status code (or anything else); the auto-refresh mechanism hung waiting for the result, so the message ended up timing out.

I'm not clear on how to prevent that, but once I fixed the issue on the remote process, the problem was no longer reproducible.

Moral of the story: If everything looks right, and it's still timing out, it seems the autorefresh mechanism shares some resources with asynchronous operations you are waiting on. It may be another place to look for failures.

于 2019-02-04T01:07:11.207 回答