1

我希望提高针对 ActiveMQ 编写的高吞吐量生产者的性能,根据这个useAsyncSend 将:

强制使用异步发送,从而大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失。

但是我看不出它对我的简单测试用例有什么影响。

使用这个非常基本的应用程序:

const string QueueName = "....";
const string Uri = "....";

static readonly Stopwatch TotalRuntime = new Stopwatch();

static void Main(string[] args)
{
    TotalRuntime.Start();
    SendMessage();
    Console.ReadLine();
}

static void SendMessage()
{
    var session = CreateSession();
    var destination = session.GetQueue(QueueName);
    var producer = session.CreateProducer(destination);

    Console.WriteLine("Ready to send 700 messages");
    Console.ReadLine();

    var body = new byte[600*1024];

    Parallel.For(0, 700, i => SendMessage(producer, i, body, session));         
}

static void SendMessage(IMessageProducer producer, int i, byte[] body, ISession session)
{
     var message = session.CreateBytesMessage(body);

     var sw = new Stopwatch();
     sw.Start();
     producer.Send(message);
     sw.Stop();

     Console.WriteLine("Running for {0}ms: Sent message {1} blocked for {2}ms", 
            TotalRuntime.ElapsedMilliseconds, 
            i, 
            sw.ElapsedMilliseconds);
}       

static ISession CreateSession()
{
     var connectionFactory = new ConnectionFactory(Uri)
                                    {
                                        AsyncSend = true,
                                        CopyMessageOnSend = false
                                    };
     var connection = connectionFactory.CreateConnection();
     connection.Start();
     var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
     return session;
}

我得到以下输出:

Ready to send 700 messages

Running for 2430ms: Sent message 696 blocked for 12ms
Running for 4275ms: Sent message 348 blocked for 1858ms
Running for 5106ms: Sent message 609 blocked for 2689ms
Running for 5924ms: Sent message 1 blocked for 2535ms
Running for 6749ms: Sent message 88 blocked for 1860ms
Running for 7537ms: Sent message 610 blocked for 2429ms
Running for 8340ms: Sent message 175 blocked for 2451ms
Running for 9163ms: Sent message 89 blocked for 2413ms
.....

这表明每条消息发送大约需要 800 毫秒,并且调用session.Send()阻塞大约需要两秒半。即使文档说

“send() 方法将立即返回”

AsyncSend = true如果我将并行 for 更改为正常的 for 循环或更改为,这些数字也基本相同,AlwaysSyncSend = true所以我不相信异步开关在所有工作......

谁能看到我在这里缺少什么来使发送异步?


进一步测试后:

根据 ANTS 性能分析器,绝大多数运行时间都花在等待同步上。看来问题在于各种传输类通过监视器在内部阻塞。特别是我似乎迷上了MutexTransport的 OneWay 方法,该方法一次只允许一个线程访问它。

看起来对 Send 的调用将阻塞,直到上一条消息完成,这解释了为什么我的输出显示第一条消息阻塞了 12 毫秒,而下一条消息阻塞了 1858 毫秒。我可以通过实现一个每消息连接模式来进行多种传输,这种模式可以改善问题并使消息发送并行工作,但大大增加了发送单个消息的时间,并且消耗了如此多的资源,看起来不像正确的解决方案。

我已经用 1.5.6 重新测试了所有这些,没有发现任何区别。

4

1 回答 1

0

一如既往,最好的办法是更新到最新版本(撰写本文时为 1.5.6)。如果代理启用了生产者流控制并且您已达到队列大小限制,则发送可能会阻塞,尽管使用异步发送这不应该发生,除非您使用 producerWindowSize 集进行发送。获得帮助的一种好方法是创建一个测试用例并通过 Jira 问题将其提交到 NMS.ActiveMQ 站点,以便我们可以使用您的测试代码对其进行调查。自 1.5.1 以来已经有很多修复,所以我建议尝试新版本,因为它可能已经不是问题了。

于 2012-08-03T21:55:24.413 回答