我有一个使用 RabbitMQ 的项目。在最好的情况下,它每秒可以接收 3000 条消息。这是我的消费者代码:
package com.mdnaRabbit.worker;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;
public class App {
private static final String TASK_QUEUE_NAME = "task_queue";
private static int i = 0;
private static long timeStart;
private static long timeFinish;
private static long messPerSec;
public static void main (String[] argv) throws IOException,InterruptedException{
ExecutorService threader = Executors.newFixedThreadPool(20);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection(threader);
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(50);
final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
timeStart = System.currentTimeMillis();
try {
while (i<100000) {
try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Data mess = Data.fromBytes(delivery.getBody());
System.out.println(" [" + (i++) +"] Received " + mess.getHeader());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}catch (Exception e){
}
}
} catch (Exception e){
e.printStackTrace();
}
timeFinish = System.currentTimeMillis();
messPerSec = Math.round ((i*1000)/(timeFinish - timeStart));
System.out.println( "receives " + messPerSec + " per second");
channel.close();
connection.close();
}
}
如您所见,我正在使用 ExecutorService 来提高速度和 channel.basicQos(),但这对我没有多大帮助。有没有办法提高接收/发送速度(发送速度增加我认为与接收速度相同)