11

下面是我们将在辅助角色中使用的 Azure 服务总线代码的基本包装器。这ServiceBusClient将在每次运行工作者角色时实例化;然后用于访问队列,直到没有剩余的项目可以枚举。

public class ServiceBusClient : IDisposable, IServiceBusClient
{
    private const int DEFAULT_WAIT_TIME_IN_SECONDS = 120;

    private const string SERVICE_BUS_CONNECTION_STRING_KEY = "service.bus.connection.string";

    private readonly MessagingFactory _messagingFactory;

    private readonly NamespaceManager _namespaceManager;

    private readonly QueueClient _queueClient;

    private readonly ISettingsManager _settingsManager;

    public ServiceBusClient(ISettingsManager settingsManager, string queueName)
    {
        _settingsManager = settingsManager;

        var connectionString = _settingsManager.GetSetting<string>(SERVICE_BUS_CONNECTION_STRING_KEY);

        _namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        _messagingFactory = MessagingFactory.CreateFromConnectionString(connectionString);

        _queueClient = GetOrCreateQueue(queueName);
    }

    public void Dispose()
    {
        _messagingFactory.Close();
    }

    public BrokeredMessage ReceiveTopMessage()
    {
        return _queueClient.Receive(TimeSpan.FromSeconds(DEFAULT_WAIT_TIME_IN_SECONDS));
    }

    public void SendMessage(object bodyObject)
    {
        var message = new BrokeredMessage(bodyObject);

        _queueClient.Send(message);
    }

    private QueueClient GetOrCreateQueue(string queueName)
    {
        var queue = !_namespaceManager.QueueExists(queueName)
                        ? _namespaceManager.CreateQueue(queueName)
                        : _namespaceManager.GetQueue(queueName);

        return _messagingFactory.CreateQueueClient(queue.Path, ReceiveMode.PeekLock);
    }

}

如您所见,我在构造函数内部初始化了NamespaceManager,MessagingFactoryQueueClient:然后在调用时重用它们,SendMessage()ReceiveTopMessage()使用Dispose()方法关闭连接。

我的问题是我使用的方法是否安全;将保持打开的单个实例,QueueClient同时工作角色枚举队列上的所有消息(一个可以保持连接打开很长一段时间并在调用之间等待很长时间的进程ReceiveTopMessage())始终如一地工作而没有暂时的问题,或者是每次都谨慎地打开和关闭连接?

作为旁白; Microsoft 服务总线代码中的瞬态故障处理是如何进行的?它是默认执行的还是我们需要实现瞬态故障处理框架

4

1 回答 1

11

该类使用由用于创建它的 MessagingFactory 对象管理的连接QueueClient建议对多个请求重用相同的客户端对象。如使用 Service Bus Brokered Messaging 提高性能的最佳实践中所述

服务总线使客户端能够通过两种协议发送和接收消息:服务总线客户端协议和 HTTP。Service Bus 客户端协议更高效,因为只要消息工厂存在,它就会保持与 Service Bus 服务的连接。它还实现了批处理和预取。服务总线客户端协议可用于使用 .NET 托管 API 的 .NET 应用程序。(...) Service Bus 客户端对象,例如 QueueClient 或 MessageSender,是通过 MessagingFactory 对象创建的,该对象还提供连接的内部管理。您不应在发送消息后关闭消息传递工厂或队列、主题和订阅客户端,然后在发送下一条消息时重新创建它们。关闭消息传递工厂会删除与服务总线服务的连接,并在重新创建工厂时建立新的连接。建立连接是一项昂贵的操作,可以通过为多个操作重用相同的工厂和客户端对象来避免。(...) 由同一工厂创建的所有客户端(发送者和接收者)共享一个 TCP 连接。

关于瞬态故障处理,QueueClient有一个RetryPolicy 属性,用于确定是否应重试请求。还有瞬态故障处理应用程序块,它取代了瞬态故障处理框架。

关于消息接收循环,Implementing Reliable Message Receive Loops中有指导。Microsoft 已经认识到很难实现一个编写良好、有弹性的消息接收循环,因此他们引入了事件驱动消息编程模型作为替代方案。

于 2013-07-11T19:51:27.653 回答