我建议您使用 Batch 发送消息。
Azure 服务总线客户端支持批量发送消息(QueueClient 和 TopicClient 的 SendBatch 和 SendBatchAsync 方法)。但是,单个批次的大小必须保持在 256k 字节以下,否则整个批次将被拒绝。
我们将从一个简单的用例开始:我们知道每条消息的大小。它由假设的 Func getSize 函数定义。这是一个有用的扩展方法,它将根据度量函数和最大块大小拆分任意集合:
public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
return source
.Aggregate(
new
{
Sum = 0L,
Current = (List<T>)null,
Result = new List<List<T>>()
},
(agg, item) =>
{
var value = metric(item);
if (agg.Current == null || agg.Sum + value > maxChunkSize)
{
var current = new List<T> { item };
agg.Result.Add(current);
return new { Sum = value, Current = current, agg.Result };
}
agg.Current.Add(item);
return new { Sum = agg.Sum + value, agg.Current, agg.Result };
})
.Result;
}
现在,SendBigBatchAsync 的实现很简单:
public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
foreach (var chunk in chunks)
{
var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
await client.SendBatchAsync(brokeredMessages);
}
}
private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;
我们如何确定每条消息的大小?我们如何实现 getSize 函数?
BrokeredMessage 类公开了 Size 属性,因此可能很想用以下方式重写我们的方法:
public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
foreach (var chunk in chunks)
{
await client.SendBatchAsync(chunk);
}
}
我要考虑的最后一种可能性实际上是允许自己违反批处理的最大大小,然后处理异常,重试发送操作并根据失败消息的实际测量大小调整未来的计算。尝试 SendBatch 后大小是已知的,即使操作失败,我们也可以使用此信息。
// Sender is reused across requests
public class BatchSender
{
private readonly QueueClient queueClient;
private long batchSizeLimit = 262000;
private long headerSizeEstimate = 54; // start with the smallest header possible
public BatchSender(QueueClient queueClient)
{
this.queueClient = queueClient;
}
public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
var packets = (from m in messages
let bm = new BrokeredMessage(m)
select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
foreach (var chunk in chunks)
{
try
{
await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
}
catch (MessageSizeExceededException)
{
var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
if (maxHeader > this.headerSizeEstimate)
{
// If failed messages had bigger headers, remember this header size
// as max observed and use it in future calculations
this.headerSizeEstimate = maxHeader;
}
else
{
// Reduce max batch size to 95% of current value
this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
}
// Re-send the failed chunk
await this.SendBigBatchAsync(packets.Select(p => p.Source));
}
}
}
}
您可以使用此博客作进一步参考。希望能帮助到你。