我希望提高针对 ActiveMQ 编写的高吞吐量生产者的性能,根据这个useAsyncSend 将:
强制使用异步发送,从而大幅提升性能;但意味着无论消息是否已发送,send() 方法都会立即返回,这可能导致消息丢失。
但是我看不出它对我的简单测试用例有什么影响。
使用这个非常基本的应用程序:
const string QueueName = "....";
const string Uri = "....";
static readonly Stopwatch TotalRuntime = new Stopwatch();
static void Main(string[] args)
{
TotalRuntime.Start();
SendMessage();
Console.ReadLine();
}
static void SendMessage()
{
var session = CreateSession();
var destination = session.GetQueue(QueueName);
var producer = session.CreateProducer(destination);
Console.WriteLine("Ready to send 700 messages");
Console.ReadLine();
var body = new byte[600*1024];
Parallel.For(0, 700, i => SendMessage(producer, i, body, session));
}
static void SendMessage(IMessageProducer producer, int i, byte[] body, ISession session)
{
var message = session.CreateBytesMessage(body);
var sw = new Stopwatch();
sw.Start();
producer.Send(message);
sw.Stop();
Console.WriteLine("Running for {0}ms: Sent message {1} blocked for {2}ms",
TotalRuntime.ElapsedMilliseconds,
i,
sw.ElapsedMilliseconds);
}
static ISession CreateSession()
{
var connectionFactory = new ConnectionFactory(Uri)
{
AsyncSend = true,
CopyMessageOnSend = false
};
var connection = connectionFactory.CreateConnection();
connection.Start();
var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
return session;
}
我得到以下输出:
Ready to send 700 messages
Running for 2430ms: Sent message 696 blocked for 12ms
Running for 4275ms: Sent message 348 blocked for 1858ms
Running for 5106ms: Sent message 609 blocked for 2689ms
Running for 5924ms: Sent message 1 blocked for 2535ms
Running for 6749ms: Sent message 88 blocked for 1860ms
Running for 7537ms: Sent message 610 blocked for 2429ms
Running for 8340ms: Sent message 175 blocked for 2451ms
Running for 9163ms: Sent message 89 blocked for 2413ms
.....
这表明每条消息发送大约需要 800 毫秒,并且调用session.Send()
阻塞大约需要两秒半。即使文档说
“send() 方法将立即返回”
AsyncSend = true
如果我将并行 for 更改为正常的 for 循环或更改为,这些数字也基本相同,AlwaysSyncSend = true
所以我不相信异步开关在所有工作......
谁能看到我在这里缺少什么来使发送异步?
进一步测试后:
根据 ANTS 性能分析器,绝大多数运行时间都花在等待同步上。看来问题在于各种传输类通过监视器在内部阻塞。特别是我似乎迷上了MutexTransport的 OneWay 方法,该方法一次只允许一个线程访问它。
看起来对 Send 的调用将阻塞,直到上一条消息完成,这解释了为什么我的输出显示第一条消息阻塞了 12 毫秒,而下一条消息阻塞了 1858 毫秒。我可以通过实现一个每消息连接模式来进行多种传输,这种模式可以改善问题并使消息发送并行工作,但大大增加了发送单个消息的时间,并且消耗了如此多的资源,看起来不像正确的解决方案。
我已经用 1.5.6 重新测试了所有这些,没有发现任何区别。