我第一次使用 MQ,并尝试使用 RabbitMQ 实现日志系统。我的实施涉及“发件人”
/*
* This class sends messages over MQ
*/
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
}
channel.close();
connection.close();
}
}
它将我的每种颜色的一条消息发送到交换,颜色将用作绑定。它涉及一个“接收者”
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
receiveMessagesFromQueue(2);
}
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
它作为参数给出一个数字,表示我希望它从交换中提供哪些颜色绑定。
在我的实现中,通常在 RabbitMQ 中,似乎消息存储在交换器中,直到Consumer
请求它们,此时它们被分发到各自的队列,然后一次发送一个到客户端(或 MQ 中的消费者行话)。我的问题是,当我在运行MQSender
课程之前运行MQReceiver
课程时,消息永远不会被传递。但是,当我MQReceiver
首先运行课程时,会收到消息。根据我对 MQ 的理解,我认为消息应该存储在服务器上,直到MQReceiver
类运行,然后消息应该被传递给他们的消费者,但这不是正在发生的事情。我的主要问题是这些消息是否可以存储在交换中,如果不能,它们应该存储在哪里,以便在调用消费者(即我的MQReceiver
类)时将它们传递?
谢谢你的帮助!