2

我在使用 Azure 服务总线实例时遇到了一个非常令人沮丧的问题。具体来说,我设置了一个名为“commands”的主题,一段时间后,它将停止正确地将消息代理到其订阅。我观察到以下行为使我处于目前的困惑状态:

  1. 这只会在主题存在一段时间后才会发生,通常是一夜之间。晚上一切都会正常工作,但是当我早上回来时,主题已经停止运作,唯一的解决方案似乎是删除并重新创建它。

  2. 如下图所示,主题似乎正在接收消息,因为我在发送后没有收到任何异常,并且主题的“活动消息计数”和“字节大小”属性确实增加了。它只是不将消息发送到订阅:在此处输入图像描述

  3. 我认为也许我订阅的过滤器把事情搞砸了,所以为了测试这一点,我删除了这些订阅,并通过资源管理器使用默认过滤器创建了一个新订阅。完成此操作后,我通过主题发送了一些新消息,但新订阅仍未收到它们。

  4. 我在服务总线上运行了其他主题(例如上图中的“事件”),但它们似乎没有表现出相同的行为。它们以相同的方式配置,但运行良好。

我对任何可能导致这种奇怪行为的理论持开放态度。如果这有助于解决这个问题,我很乐意提供更多信息。

代码块:

创建主题:

private async Task<bool> CreateTopicAsync(NamespaceManager namespaceManager, string topicName, CancellationToken cancel, TimeSpan maxWaitTime)
        {
            var retVal = false;
            var maxTimeToCreateTopic = DateTime.UtcNow + maxWaitTime;

            while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateTopic)
            {
                try
                {
                    await namespaceManager.CreateTopicAsync(new TopicDescription(topicName)
                    {
                        EnableBatchedOperations = true,
                        EnableFilteringMessagesBeforePublishing = true
                    });
                    retVal = true;
                    break;
                }
                catch (Exception ex)
                {
                    LogError("Exception thrown when creating topic: {0}", ex);
                }

                if (!retVal)
                {
                    LogWarning("Topic still does not exist, pausing and then retrying creation.");
                    await Task.Delay(_delayMs);
                }
            }

            return retVal;
        }

创建订阅:

private async Task<bool> CreateSubscriptionAsync(NamespaceManager namespaceManager, string topicName, string subscriptionName, string filter, CancellationToken cancel, TimeSpan maxWaitTime)
        {
            var retVal = false;
            var maxTimeToCreateSubscription = DateTime.UtcNow + maxWaitTime;

            while (!cancel.IsCancellationRequested && DateTime.UtcNow < maxTimeToCreateSubscription)
            {
                try
                {
                    if (string.IsNullOrEmpty(filter))
                    {
                        namespaceManager.CreateSubscription(topicName, subscriptionName);
                    }
                    else
                    {
                        namespaceManager.CreateSubscription(topicName, subscriptionName, new SqlFilter(filter));
                    }
                    retVal = true;
                    break;
                }
                catch (Exception ex)
                {
                    LogError("Exception thrown when creating subscription: {0}", ex);
                }

                LogWarning("Subscription still does not exist, pausing and then retrying creation.");
                await Task.Delay(_delayMs);
            }

            return retVal;
        }

向主题发送消息:

BrokeredMessage brokeredMessage = null;
                    try
                    {
                        var type = nextMessage.GetType().AssemblyQualifiedName;
                        var jsonString = JsonConvert.SerializeObject(nextMessage);
                        var jsonStream = jsonString.ToStream();

                        brokeredMessage = new BrokeredMessage(jsonStream, true);
                        brokeredMessage.Properties["__messageType__"] = type;
                        if (nextData.Properties != null && nextData.Properties.Count > 0)
                        {
                            foreach (var prop in nextData.Properties)
                            {
                                brokeredMessage.Properties.Add(prop.Key, prop.Value);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        LogError("Exception thrown when creating brokered message: {0}", ex);

                        brokeredMessage = null;
                    }

                    if (brokeredMessage != null)
                    {
                        var messageSentSuccessfully = false;
                        try
                        {
                            await client.SendAsync(brokeredMessage);
                            numConsecutiveFailures = 0;
                            messageSentSuccessfully = true;
                        }
                        catch (Exception ex)
                        {
                            numConsecutiveFailures++;
                            LogError("Exception thrown from SendAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
                            await Task.Delay(_delayMs);
                        }
                    }

传入的主题客户端只是使用 TopicClient.CreateFromConnectionString 方法创建的。

从订阅接收消息:

private async Task ReceiveLoopAsync(SubscriptionClient client, CancellationToken cancel, TimeSpan maxReceiveWaitTime)
        {
            var numConsecutiveFailures = 0;
            var maxConsecutiveFailures = 5;

            while (!cancel.IsCancellationRequested && numConsecutiveFailures < maxConsecutiveFailures)
            {
                BrokeredMessage newMsg = null;

                try
                {
                    newMsg = await client.ReceiveAsync(maxReceiveWaitTime);
                    numConsecutiveFailures = 0;
                }
                catch (Exception ex)
                {
                    numConsecutiveFailures++;
                    LogError("Exception thrown from ReceiveAsync: {0}. Fail count is {1}.", ex, numConsecutiveFailures);
                    await Task.Delay(_delayMs);
                }

                // newMsg will be null if there were no messages to process after the allotted timeout expired.
                if (newMsg != null)
                {
                    // Just a function call.
                    _onMessageReceived?.Invoke(newMsg);
                }

                //LogDebug("Bottom of Receive");
            }

            //LogDebug("Exit Receive");
        }

传入的订阅客户端只是使用 SubscriptionClient.CreateFromConnectionString 方法创建的。

4

0 回答 0