我想知道使用 ActiveMQ 而不是 RabbitMQ 实现更快的原始消息传递吞吐量(对于发布和消费)是否不寻常?我之所以这么问,是因为我遇到的所有其他在线参考都以 RabbitMQ 的速度更快而自豪。
我没有使用合法的基准测试工具进行测试;相反,我修改了两者的基本发布者/消费者示例,以测试具有 3 KB 消息正文的 100,000 条消息。请注意,我正在两个不同的 Amazon EC2 x-large 实例上测试发布和使用。也许我没有正确设置我的代码?请在下面查看我的结果和代码。
ActiveMQ Send 3kb
Average Time per Message (ns): 497276.1179
Average # Messages per second: 2010.935101
Total Time (s): 49.72810906
ActiveMQ Recv 3kb
Average Time per Message (ns): 43813.35476
Average # Messages per second: 22823.86285
Total Time (s): 4.381379289
RabbitMQ Send 3kb
Average Time per Message (ns): 1041524.626
Average # Messages per second: 960.1309229
Total Time (s): 104.1524626
RabbitMQ Recv 3kb
Average Time per Message (ns): 612559.3732
Average # Messages per second: 1632.494814
Total Time (s): 61.25593732
在 RabbitMQ Send.java 和 Recv.java 中删除 queueDeclare() 后更新了数字:
这极大地改善了 RabbitMQ 的时间,但是 ActiveMQ 的消耗时间只有 4 秒,这一定是有问题的......
ActiveMQ Send 3kb
Average Time per Message (ns): 491404.5666
Average # Messages per second: 2034.983124
Total Time (s): 49.14045666
ActiveMQ Recv 3kb
Average Time per Message (ns): 41976.17158
Average # Messages per second: 23823.03965
Total Time (s): 4.197617158
RabbitMQ Send 3kb
Average Time per Message (ns): 354795.8818
Average # Messages per second: 2818.522005
Total Time (s): 35.47958818
RabbitMQ Recv 3kb
Average Time per Message (ns): 440349.3892
Average # Messages per second: 2270.924009
Total Time (s): 44.03493892
ActiveMQ 发送.java
public class Send implements Runnable {
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
public static void main(String[] argv) throws java.io.IOException {
(new Thread(new Send())).start();
}
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i=0; i <= NUMBER_OF_MESSAGES; i++){
startTime = System.nanoTime();
// 3kb
String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..."
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
//System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
producer.send(message);
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(i + "," + Long.toString(stopTime-startTime));
}
// Clean up
session.close();
connection.close();
//System.out.println("");
//System.out.println("Total Time: " + totalTime + "ns");
//System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns");
//System.out.println("");
}
catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}
ActiveMQ Recv.java
public class Recv implements Runnable {
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;
public static void main(String[] argv)
throws java.io.IOException {
(new Thread(new Recv())).start();
}
public void run() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue("TEST.FOO");
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Message Listener
MyListener listener = new MyListener();
consumer.setMessageListener(listener);
// Wait for a message
//Message message = consumer.receive(1000);
// consumer.close();
// session.close();
// connection.close();
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public class MyListener implements MessageListener {
public void onMessage(Message message) {
try {
startTime = System.nanoTime();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(numMessages + "," + Long.toString(stopTime-startTime));
numMessages++;
} else {
System.out.println("Received: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
RabbitMQ 发送.java
public class Send implements Runnable {
private final static String QUEUE_NAME = "hello";
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
// 3kb
private static final String message = "AMFu8UlKW2zJB..."
public static void main(String[] argv)
throws java.io.IOException {
(new Thread(new Send())).start();
}
public void run() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
for (int i=1; i <= NUMBER_OF_MESSAGES; i++){
startTime = System.nanoTime();
// No Persistence
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(i + "," + Long.toString(stopTime-startTime));
}
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
RabbitMQ Recv.java
private final static String QUEUE_NAME = "hello";
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;
public static void main(String[] argv) {
(new Thread(new Recv())).start();
}
public void run(){
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("x.x.x.x");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// No Persistence
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
startTime = System.nanoTime();
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
stopTime = System.nanoTime();
totalTime = totalTime + stopTime-startTime;
System.out.println(numMessages + "," + Long.toString(stopTime-startTime));
numMessages++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}