5

我在 Azure 上运行服务总线,每秒发送大约 10-100 条消息

最近我已经切换到.net 4.5并且所有人都兴奋地重构了所有代码以在每行中至少有两次“async”和“await ”,以确保它“正确”完成:)

现在我想知道它实际上是好还是坏。如果您可以查看代码片段并让我知道您的想法。我特别担心如果线程上下文切换没有给我带来更多的痛苦而不是好处,从所有的异步中......(看看!dumpheap 这绝对是一个因素)

只是一点描述 - 我将发布 2 种方法 - 一种在 ConcurrentQueue 上执行 while 循环,等待新消息,另一种方法一次发送一条消息。我也完全按照 Azure 博士的规定使用瞬态故障处理模块。

发送循环(从头开始,等待新消息):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码来自每秒发送 1 条消息的“Sender”类。在任何给定时间我都有大约 50-100 个实例在运行,所以它可能是相当多的线程。

顺便说一句,不要担心 EnsureMessageSender、RecreateMessageFactory、EnsureTopicExists 太多,它们不会经常被调用。

如果我只需要一次发送一条消息,而不用担心异步内容并避免随之而来的开销,那么让一个后台线程处理消息队列并同步发送消息不是更好吗?

请注意,将一条消息发送到 Azure 服务总线通常只需几毫秒,这并不昂贵。(除了速度很慢、超时或服务总线后端出现问题时,它可能会在尝试发送内容时挂起一段时间)。

感谢和抱歉这么长的帖子,

斯蒂沃

建议的解决方案

这个例子可以解决我的情况吗?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }
4

2 回答 2

5

你说:

上面的代码来自每秒发送 1 条消息的“Sender”类。在任何给定时间我都有大约 50-100 个实例在运行,所以它可能是相当多的线程。

这是异步的一个很好的例子。你在这里保存了很多线程。异步减少了上下文切换,因为它不是基于线程的。在需要等待的情况下,它不会进行上下文切换。相反,下一个工作项正在同一个线程上处理(如果有的话)。

出于这个原因,您的异步解决方案肯定会比同步解决方案更好地扩展。需要衡量它是否在 50-100 个工作流实例中实际使用更少的 CPU。实例越多,异步速度越快的可能性就越高。

现在,实现存在一个问题:您使用的ConcurrentQueue不是异步就绪的。因此,即使在异步版本中,您实际上也确实使用了 50-100 个线程。它们要么阻塞(你想避免)要么忙等待燃烧 100% 的 CPU(这在你的实现中似乎就是这种情况!)。您需要摆脱这个问题并使排队也异步。也许 aSemaphoreSlim在这里有帮助,因为它可以异步等待。

于 2013-03-25T12:01:15.097 回答
4

首先,请记住Task!= Thread。任务(和async方法延续)被安排到线程池中,微软在线程池中进行了大量优化,只要您的任务相当短,它们就会产生奇迹。

查看您的代码,一行会引发一个标志:semaphore.WaitOne. 我假设您将其用作一种信号,表明队列中有可用数据。这很糟糕,因为它是方法内部的阻塞async等待。通过使用阻塞等待,代码从轻量级延续变为更重的线程池线程。

所以,我会按照@usr 的建议,用async-ready 队列替换队列(和信号量)。TPL 数据流BufferBlock<T>是一个可通过 NuGetasync获得的生产者/消费者队列。我首先推荐这个,因为听起来您的项目可以从更广泛地使用数据流中受益,而不仅仅是作为队列(但队列是一个很好的起点)。

存在其他async现成的数据结构;我的AsyncEx 库有几个。自己构建一个简单的也不难;我有一篇关于这个主题的博文。但我建议在您的情况下使用 TPL Dataflow。

于 2013-03-25T13:35:42.503 回答