1

我正在从工作角色向服务总线队列发送消息。我注意到随机丢失了一些消息。

当我调试时,我在 Send 方法之后设置了一个断点并登录到我的 Azure 面板以检查消息队列是否增加。我发现奇怪的是,有时消息没有添加到队列中。但这不是随机的。模式是:一条消息添加正确,下一条丢失,然后再次下一条正常,下一条丢失。

我已经实现了如下所示的重试模式,但显然并非所有消息都正确发送。

我的代码是:

var baseAddress = RoleEnvironment.GetConfigurationSettingValue("namespaceAddress");
var issuerName = RoleEnvironment.GetConfigurationSettingValue("issuerName");
var issuerKey = RoleEnvironment.GetConfigurationSettingValue("issuerKey");
var retryStrategy = new FixedInterval(5, TimeSpan.FromSeconds(2));
var retryPolicy = new RetryPolicy<ServiceBusTransientErrorDetectionStrategy>(retryStrategy);
Uri namespaceAddress = ServiceBusEnvironment.CreateServiceUri("sb", baseAddress, string.Empty);

this.namespaceManager = new NamespaceManager(namespaceAddress, TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey));
this.messagingFactory = MessagingFactory.Create(namespaceAddress, TokenProvider.CreateSharedSecretTokenProvider(issuerName, issuerKey));
//namespaceManager.GetQueue("chatmessage");
QueueClient client = messagingFactory.CreateQueueClient("chatmessage");
APPService.Model.MessageReceived messageReceived = new Model.MessageReceived();
messageReceived.From= e.From;
messageReceived.Op = e.Operator;
messageReceived.Message = e.Body;
BrokeredMessage msg = null;

// Use a retry policy to execute the Send action in an asynchronous and reliable fashion.
retryPolicy.ExecuteAction
(
    (cb) =>
    {
        // A new BrokeredMessage instance must be created each time we send it. Reusing the original BrokeredMessage instance may not 
        // work as the state of its BodyStream cannot be guaranteed to be readable from the beginning.
        msg = new BrokeredMessage(messageReceived);

        // Send the event asynchronously.
        client.BeginSend(msg, cb, null);
    },
    (ar) =>
    {
        try
        {
            // Complete the asynchronous operation. This may throw an exception that will be handled internally by the retry policy.
            client.EndSend(ar);
        }
        finally
        {
            // Ensure that any resources allocated by a BrokeredMessage instance are released.
            if (msg != null)
            {
                msg.Dispose();
                msg = null;
            }
        }
    },
    ()=>{},
    (ex) =>
    {
        // Always dispose the BrokeredMessage instance even if the send operation has completed unsuccessfully.
        if (msg != null)
        {
            msg.Dispose();
            msg = null;
        }

        // Always log exceptions.
        Trace.TraceError(ex.Message);
    }
);

有什么问题?

4

1 回答 1

2

仅查看您的代码,我无法注意到云导致该问题的任何内容。我想门户上的计数器没有实时更新,所以我不会将其视为测试结果。

我知道我现在没有帮助,但如果我有你表达的疑问,我会建立一个端到端的测试来发送和接收消息,比较 messageId。

您是否已经在 MSDN 上使用 Paolo Salvatori 的Service Bus Explorer ?您可以使用它来测试实体。它是探索和测试服务总线的可靠工具。

于 2012-09-04T07:48:41.287 回答