0

我有一个场景,其中一个活动函数检索了一组记录,这些记录可以从 1000 到 100 万不等,并存储在一个对象中。然后下一个活动函数使用该对象将消息并行发送到服务总线。

目前,我在该对象上使用 for 循环将对象中的每条记录发送到服务总线。请让我知道是否有更好的替代模式,其中对象或内容(无论它存储在何处)被清空以发送到服务总线,并且该功能自动扩展而不将处理限制为 for 循环。

  • 使用了来自一个函数的 for 循环,该函数编排为对象中的记录调用活动函数。
  • 查看了活动函数的扩展,对于一组 18000 条记录,它已扩展到 15 个实例并在 4 分钟内处理了整个集合。
  • 目前该功能正在使用消费计划。检查发现只有该功能应用正在使用该计划并且未共享。
  • 消息发送到的主题有另一个服务正在监听它,以读取消息。
  • 编排和活动功能的实例计数默认情况下可用。
    for(int i=0;i<number_messages;i++)
    {
       taskList[i] = 
    context.CallActivityAsync<string>("Sendtoservicebus", 
       (messages[i],runId,CorrelationId,Code));
     }

    try
     {
      await Task.WhenAll(taskList);
     }
    catch (AggregateException ae)
     {
      ae.Flatten();
     }

应通过适当地扩展活动功能,将消息快速发送到服务总线。

4

1 回答 1

1

我建议您使用 Batch 发送消息。

Azure 服务总线客户端支持批量发送消息(QueueClient 和 TopicClient 的 SendBatch 和 SendBatchAsync 方法)。但是,单个批次的大小必须保持在 256k 字节以下,否则整个批次将被拒绝。

我们将从一个简单的用例开始:我们知道每条消息的大小。它由假设的 Func getSize 函数定义。这是一个有用的扩展方法,它将根据度量函数和最大块大小拆分任意集合:

public static List<List<T>> ChunkBy<T>(this IEnumerable<T> source, Func<T, long> metric, long maxChunkSize)
{
    return source
        .Aggregate(
            new
            {
                Sum = 0L,
                Current = (List<T>)null,
                Result = new List<List<T>>()
            },
            (agg, item) =>
            {
                var value = metric(item);
                if (agg.Current == null || agg.Sum + value > maxChunkSize)
                {
                    var current = new List<T> { item };
                    agg.Result.Add(current);
                    return new { Sum = value, Current = current, agg.Result };
                }

                agg.Current.Add(item);
                return new { Sum = agg.Sum + value, agg.Current, agg.Result };
            })
        .Result;
}

现在,SendBigBatchAsync 的实现很简单:

public async Task SendBigBatchAsync(IEnumerable<T> messages, Func<T, long> getSize)
{
    var chunks = messages.ChunkBy(getSize, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        var brokeredMessages = chunk.Select(m => new BrokeredMessage(m));
        await client.SendBatchAsync(brokeredMessages);
    }
}

private const long MaxServiceBusMessage = 256000;
private readonly QueueClient client;

我们如何确定每条消息的大小?我们如何实现 getSize 函数?

BrokeredMessage 类公开了 Size 属性,因此可能很想用以下方式重写我们的方法:

public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
{
    var brokeredMessages = messages.Select(m => new BrokeredMessage(m));
    var chunks = brokeredMessages.ChunkBy(bm => bm.Size, MaxServiceBusMessage);
    foreach (var chunk in chunks)
    {
        await client.SendBatchAsync(chunk);
    }
}

我要考虑的最后一种可能性实际上是允许自己违反批处理的最大大小,然后处理异常,重试发送操作并根据失败消息的实际测量大小调整未来的计算。尝试 SendBatch 后大小是已知的,即使操作失败,我们也可以使用此信息。

// Sender is reused across requests
public class BatchSender
{
    private readonly QueueClient queueClient;
    private long batchSizeLimit = 262000;
    private long headerSizeEstimate = 54; // start with the smallest header possible

    public BatchSender(QueueClient queueClient)
    {
        this.queueClient = queueClient;
    }

    public async Task SendBigBatchAsync<T>(IEnumerable<T> messages)
    {
        var packets = (from m in messages
                     let bm = new BrokeredMessage(m)
                     select new { Source = m, Brokered = bm, BodySize = bm.Size }).ToList();
        var chunks = packets.ChunkBy(p => this.headerSizeEstimate + p.Brokered.Size, this.batchSizeLimit);
        foreach (var chunk in chunks)
        {
            try
            {
                await this.queueClient.SendBatchAsync(chunk.Select(p => p.Brokered));
            }
            catch (MessageSizeExceededException)
            {
                var maxHeader = packets.Max(p => p.Brokered.Size - p.BodySize);
                if (maxHeader > this.headerSizeEstimate)
                {
                    // If failed messages had bigger headers, remember this header size 
                    // as max observed and use it in future calculations
                    this.headerSizeEstimate = maxHeader;
                }
                else
                {
                    // Reduce max batch size to 95% of current value
                    this.batchSizeLimit = (long)(this.batchSizeLimit * .95);
                }

                // Re-send the failed chunk
                await this.SendBigBatchAsync(packets.Select(p => p.Source));
            }

        }
    }
}

您可以使用此博客作进一步参考。希望能帮助到你。

于 2019-06-18T09:38:00.150 回答