0

我正在使用 ServiceBusReceiver(我想继续使用)从服务总线上的死信队列中读取。

但是,receiver.PeekMessagesAsync(550) 最多只能从死信队列中获取 250 条消息???我怎样才能改变这个?我已经将它设置为获取 550 条消息,所以不知道该怎么做。

        string topic = "myTopic";
        string subscriptionAndDeadletterPath = @"mySubscription/$deadletterqueue";
        var paginationSize = 1000;
        var dlqCount = 0;
        var numberOfMessagesToFetchFromDlq = 550;

        try
        {
            await using var client = new ServiceBusClient(connectionString);
            var receiver = client.CreateReceiver(topic, subscriptionAndDeadletterPath, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.PeekLock, PrefetchCount = paginationSize });
           
            var messagesFromDlq = await receiver.PeekMessagesAsync(numberOfMessagesToFetchFromDlq);
            dlqCount = messagesFromDlq.Count();
       

dlqCount 由于某种原因不会超过 250?int maxMessages 参数似乎在这个数字之上不起作用?

4

2 回答 2

1

您传递给PeekMessagesAsync的消息数是将返回的最大消息数。无法保证批次的最小尺寸;包含的消息数量取决于几个因素,其中影响最大的是网络将消息流式传输到客户端的速度。

我对Optmize 在批处理模式下从 Azure 服务总线接收消息的回答更详细地讨论了上下文,并提供了一些可能适用于您的场景的想法。

重要的是要注意,与收到的消息相比,偷看的消息存在一些行为差异。最有影响的是偷看的消息没有被锁定,也无法解决,所以关于锁定令牌过期的考虑在这里不适用。

于 2022-02-08T17:09:08.247 回答
0

这就是我现在解决它的方法:

        string topic = "myTopic";
        string subscriptionAndDeadletterPath = @"mySubscription/$deadletterqueue";
        var sequence = 0;
        var continueWithNextIteration = true;
        var paginationSize = 100;
        var numberOfMessagesToFetchFromDlq = 250

            await using var client = new ServiceBusClient(connectionString);
            var receiver = client.CreateReceiver(topic, subscriptionAndDeadletterPath, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.PeekLock, PrefetchCount = paginationSize });

            do
            {
                var messagesFromDlq = await receiver.PeekMessagesAsync(numberOfMessagesToFetchFromDlq, sequence);
                if (messagesFromDlq.Count() > 249)
                {
                    var nextSequenceSetting = await receiver.PeekMessagesAsync(2, Convert.ToInt32(messagesFromDlq.Last().SequenceNumber));
                    sequence = Convert.ToInt32(nextSequenceSetting.Last().SequenceNumber);
                    continueWithNextIteration = nextSequenceSetting.Count() > 1 ? true : false;
                }
             
            } while (continueWithNextIteration);
于 2022-02-09T11:08:34.753 回答