1

我有一个控制台应用程序来读取 Azure 服务总线上订阅中存在的所有代理消息。我那里有大约 3500 条消息。这是我阅读消息的代码:

SubscriptionClient client = messagingFactory.CreateSubscriptionClient(topic, subscription);   
long count = namespaceManager.GetSubscription(topic, subscription).MessageCountDetails.ActiveMessageCount;
Console.WriteLine("Total messages to process : {0}", count.ToString()); //Here the number is showing correctly
IEnumerable<BrokeredMessage> dlIE = null;
dlIE = client.ReceiveBatch(Convert.ToInt32(count));

当我执行代码时,在 dlIE 中,我只能看到 256 条消息。我也尝试过像这样给出预取计数,client.PrefetchCount但它也只返回 256 条消息。

我认为一次可以检索的消息数量有一些限制。但是该方法的msdn页面上没有提到这样的事情RecieveBatch。我可以做些什么来一次检索所有消息?

笔记:

  1. 我只想阅读消息,然后让它存在于服务总线上。因此我不使用message.complete方法。

  2. 我无法从服务总线中删除并重新创建主题/订阅。

编辑:

我像这样使用 PeekBatch 而不是 ReceiveBatch:

    IEnumerable<BrokeredMessage> dlIE = null;
                            List<BrokeredMessage> bmList = new List<BrokeredMessage>();
  long i = 0;
   dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count)); // count is the total number of messages in the subscription.
  bmList.AddRange(dlIE);
  i = dlIE.Count();
 if(i < count)
  {           
 while(i < count)
  {
  IEnumerable<BrokeredMessage> dlTemp = null;
   dlTemp = subsciptionClient.PeekBatch(i, Convert.ToInt32(count));
    bmList.AddRange(dlTemp);
    i = i + dlTemp.Count();
    }
    }

我在订阅中有3255条消息。当第一次调用 peekBatch 时,它会收到 250 条消息。所以它进入while循环PeekBatch(250,3225)。每次只收到 250 条消息。我在输出列表中的最终总消息是3500条重复消息。我无法理解这是如何发生的。

4

3 回答 3

3

我已经想通了。订阅客户端会记住它检索到的最后一批,并在再次调用时检索下一批。

所以代码是:

    IEnumerable<BrokeredMessage> dlIE = null;
List<BrokeredMessage> bmList = new List<BrokeredMessage>();
  long i = 0;  
  while (i < count)
  {
   dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count));
   bmList.AddRange(dlIE);
   i = i + dlIE.Count();
  }

感谢 MikeWo 的指导

注意:一次可以查看的消息数量似乎有某种大小限制。我尝试了不同的订阅,并且每个订阅的消息数量都不同。

于 2015-10-19T12:21:52.617 回答
2

你写的主题是偶然划分的吗?当您从分区实体接收消息时,它一次只会从其中一个分区中获取。来自 MSDN

“当客户端想要从分区队列或订阅分区主题接收消息时,服务总线会查询所有片段以获取消息,然后将从任何消息传递存储返回的第一条消息返回给接收者。服务总线缓存其他消息并在收到其他接收请求时返回它们。接收客户端不知道分区;分区队列或主题的面向客户端的行为(例如,读取、完成、延迟、死信、预取)与常规实体的行为相同。”

假设即使使用非分区实体,您也可以使用 Receive 或 Peek 方法一次性获取所有消息,这可能不是一个好主意。以更小的批次循环遍历消息会更有效率,特别是如果您的消息对它们有任何合适的大小或大小不确定。

由于您实际上并不想从队列中删除消息,我建议使用PeekBatch而不是 ReceiveBatch。这使您可以获取消息的副本并且不会锁定它。我强烈建议将相同的 SubscriptionClient 与 PeekBatch 结合使用的循环。通过在引擎盖下使用与 PeekBatch 相同的 SubscriptionClient,最后拉出的序列号被保留,因为您循环遍历它应该跟踪并遍历整个队列。这基本上可以让您通读整个队列。

于 2015-10-19T11:07:40.170 回答
0

我遇到了一个类似的问题,即client.ReceiveBatchAsync(....)不会从 azure 服务总线中的订阅中检索任何数据。

经过一番挖掘,我发现每个订阅者都有一点可以启用批处理操作。这只能通过 powershell 启用。以下是我使用的命令:

$subObject = Get-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName' -SubscriptionName '#subscriptionName' 
$subObject.EnableBatchedOperations = $True

Set-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName'-SubscriptionObj $subObject

更多细节可以在这里找到。虽然它仍然没有加载所有消息,但至少它开始清除队列。据我所知,批量大小参数只是对服务总线的建议,而不是规则。

希望能帮助到你!

于 2017-09-05T13:42:56.453 回答