1

我想知道使用 ActiveMQ 而不是 RabbitMQ 实现更快的原始消息传递吞吐量(对于发布和消费)是否不寻常?我之所以这么问,是因为我遇到的所有其他在线参考都以 RabbitMQ 的速度更快而自豪。

我没有使用合法的基准测试工具进行测试;相反,我修改了两者的基本发布者/消费者示例,以测试具有 3 KB 消息正文的 100,000 条消息。请注意,我正在两个不同的 Amazon EC2 x-large 实例上测试发布和使用。也许我没有正确设置我的代码?请在下面查看我的结果和代码。

ActiveMQ Send 3kb   
Average Time per Message (ns):  497276.1179
Average # Messages per second:  2010.935101
Total Time (s):                 49.72810906

ActiveMQ Recv 3kb   
Average Time per Message (ns):  43813.35476
Average # Messages per second:  22823.86285
Total Time (s):                 4.381379289

RabbitMQ Send 3kb   
Average Time per Message (ns):  1041524.626
Average # Messages per second:  960.1309229
Total Time (s):                 104.1524626

RabbitMQ Recv 3kb   
Average Time per Message (ns):  612559.3732
Average # Messages per second:  1632.494814
Total Time (s):                 61.25593732

在 RabbitMQ Send.java 和 Recv.java 中删除 queueDeclare() 后更新了数字:

这极大地改善了 RabbitMQ 的时间,但是 ActiveMQ 的消耗时间只有 4 秒,这一定是有问题的......

ActiveMQ Send 3kb   
Average Time per Message (ns):  491404.5666
Average # Messages per second:  2034.983124
Total Time (s):                 49.14045666

ActiveMQ Recv 3kb   
Average Time per Message (ns):  41976.17158
Average # Messages per second:  23823.03965
Total Time (s):                 4.197617158

RabbitMQ Send 3kb   
Average Time per Message (ns):  354795.8818
Average # Messages per second:  2818.522005
Total Time (s):                 35.47958818

RabbitMQ Recv 3kb   
Average Time per Message (ns):  440349.3892
Average # Messages per second:  2270.924009
Total Time (s):                 44.03493892

ActiveMQ 发送.java

public class Send implements Runnable {

private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

public static void main(String[] argv) throws java.io.IOException {
    (new Thread(new Send())).start();
}

public void run() {
    try {
        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageProducer from the Session to the Topic or Queue
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

        for (int i=0; i <= NUMBER_OF_MESSAGES; i++){
            startTime = System.nanoTime();

            // 3kb
            String text = "AMFu8UlKW2zJBxUQbxNfU3HneB11uEOeC..."

            TextMessage message = session.createTextMessage(text);

// Tell the producer to send the message
            //System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
            producer.send(message);
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;
            System.out.println(i + "," + Long.toString(stopTime-startTime));

        }

        // Clean up
        session.close();
        connection.close();

        //System.out.println("");
        //System.out.println("Total Time: " + totalTime + "ns");
        //System.out.println("Avg. Time: " + totalTime/NUMBER_OF_MESSAGES + "ns");
        //System.out.println("");

    }
    catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}
}

ActiveMQ Recv.java

public class Recv implements Runnable {

private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv)
    throws java.io.IOException {

    (new Thread(new Recv())).start();

}

public void run() {
    try {

        // Create a ConnectionFactory
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://x.x.x.x:61616");

        // Create a Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // Create a Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // Create the destination (Topic or Queue)
        Destination destination = session.createQueue("TEST.FOO");

        // Create a MessageConsumer from the Session to the Topic or Queue
        MessageConsumer consumer = session.createConsumer(destination);

        // Message Listener
        MyListener listener = new MyListener();
        consumer.setMessageListener(listener);

        // Wait for a message
        //Message message = consumer.receive(1000);

       // consumer.close();
       // session.close();

// connection.close();
    } catch (Exception e) {
        System.out.println("Caught: " + e);
        e.printStackTrace();
    }
}

public class MyListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            startTime = System.nanoTime();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                String text = textMessage.getText();
                stopTime = System.nanoTime();
                totalTime = totalTime + stopTime-startTime;

                System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

                numMessages++;

            } else {
                System.out.println("Received: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
}

RabbitMQ 发送.java

public class Send implements Runnable {

private final static String QUEUE_NAME = "hello";
private final static int NUMBER_OF_MESSAGES = 100000;
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;

// 3kb
private static final String message = "AMFu8UlKW2zJB..."

public static void main(String[] argv)
 throws java.io.IOException {

 (new Thread(new Send())).start();

}

public void run() {

try {
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("localhost");
     Connection connection = factory.newConnection();
     Channel channel = connection.createChannel();

     for (int i=1; i <= NUMBER_OF_MESSAGES; i++){
         startTime = System.nanoTime();

         // No Persistence
         // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

         stopTime = System.nanoTime();
         totalTime = totalTime + stopTime-startTime;
         System.out.println(i + "," + Long.toString(stopTime-startTime));
     }

     channel.close();
     connection.close();

 } catch (Exception e) {
     e.printStackTrace();
 }
}
}

RabbitMQ Recv.java

private final static String QUEUE_NAME = "hello";
private static long startTime = 0;
private static long stopTime = 0;
private static long totalTime = 0;
private static int numMessages = 0;

public static void main(String[] argv) {
    (new Thread(new Recv())).start();
}

public void run(){
    try {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("x.x.x.x");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // No Persistence
        // channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, true, consumer);

        while (true) {
            startTime = System.nanoTime();
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            stopTime = System.nanoTime();
            totalTime = totalTime + stopTime-startTime;

            System.out.println(numMessages + "," + Long.toString(stopTime-startTime));

            numMessages++;

        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}
4

1 回答 1

2

好吧,我看了你的代码和你的基准标记,但只是在 Recv 方式。我看到 RabbitMq 的数字是 ActiveMq 的两倍。然后我看到了两者的源代码,并警告我..

在 Rabbitqm Recv 源代码中,您总是为每条消息做一个 queuDeclare,如果通信时间是当前的主要延迟,请确保 ActiveMq 的两倍时间比 Rabbitmq 来自这里。

于 2012-10-03T12:54:34.260 回答