1

背景

我正在使用 Azure 服务总线。在命名空间内,我有一个具有活动订阅的主题。我有一个生产者将消息推送到该主题,并且我有 n 个订阅相同订阅的服务(出于可伸缩性原因)。这些服务使用 PeekLock() 来获取消息,对它们执行一些工作,然后在工作完成后通过 Complete() 释放消息锁。

问题

每当服务锁定消息时,似乎没有新服务能够从同一主题/订阅中获取消息。似乎整个主题都被锁定了,而不仅仅是消息本身被锁定。

实际的问题是我的服务只是坐在那里什么都不做,只是等待具有活动 PeekLock() 的服务完成。

问题

  1. 有没有办法在主题(和订阅)或队列中启用并发读取器(或并发锁)?

  2. 我在上面解释的(对我来说不想要的)场景 - 它是某种保证按接收顺序传递消息的结果吗?

代码片段:Csharp

public static void main(...) 
{
  // configure the options
  OnMessageOptions messageOptions = new OnMessageOptions();
  messageOptions.AutoComplete = false;

  // look for messages
  serviceBusClient.OnMessage(msg => processMessage(msg, connectionKey), messageOptions);
}


private static void processMessage(BrokeredMessage msg, string connectionKey)
{
  try
  {
    // do stuff to the message
    //...

    // for debugging: no new clients grab messages 
    // while this client has lock active. the sleep
    // is to simulate heavy work load
    Thread.Sleep(5000);

    // release lock on message
    msg.Complete();
  }

  catch (Exception e)
  {
    msg.Abandon();
  }
}
4

3 回答 3

0

您可以尝试将客户端上的PrefetchCount显式设置为 0,根据此页面,它是默认设置的,但我怀疑如果您发现没有其他客户端可以获取消息,则不是:

public static void main(...) 
{
  // configure the options
  OnMessageOptions messageOptions = new OnMessageOptions();
  messageOptions.AutoComplete = false;

  serviceBusClient.PrefetchCount = 0;

  // look for messages
  serviceBusClient.OnMessage(msg => processMessage(msg, connectionKey), messageOptions);
}
于 2017-06-29T14:09:42.433 回答
0

您是否使用Recieve()Receieve(Int64)方法(或 Async 变体)来接收您的消息?

您必须知道消息本身上,而不是在订阅/队列上。但是,如果您将重载与 Int64 一起使用并给出如此大的数字 - 您会获得(锁定)所有消息,而您的其他进程则无能为力。

无法在 Azure 服务总线中锁定订阅。并发订阅接收者限制为5000

在这里,您可以阅读更多关于服务总线消息传递中的性能的信息。

下次您提出问题时,还请附上代码片段,以便我们更好地了解您的问题!

于 2017-04-07T07:53:31.980 回答
-1

在您的情况下,最简单的解决方案可能是使用分区主题/队列,请参阅https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning

于 2017-04-07T13:07:29.427 回答