一个问题
我知道有一个类似的问题,但在 SO 中不一样。
我试图了解JMS 中MessageProducer和MessageConsumer的幕后情况。使用ActiveMQ的实现,我编写了一个简单的MessageProducer示例来将消息发送到队列,并编写了一个MessageConsumer示例来使用队列中的消息,同时在本地运行ActiveMQ 。
向队列发送消息需要Connection#start方法。确切的调试点如下。Connection#start触发ActiveMQSession#start方法。此方法在调用Connection#start时触发。请参阅以下调试点org.apache.activemq.ActiveMQSession#start
;
问题是,Connection#start不是MessageProducer明确需要的,而是MessageConsumer需要的。但是,对于这两个示例,我们都需要清除资源(会话和连接)。我意识到的是,如果我在生产者上删除Connection#start方法,代码将执行,调试点不会被触发(甚至在引擎盖下也不会),我会在队列中看到消息。但是,如果我在消费者上删除 Connection#start 方法,代码将不会执行,这就是问题,为什么在MessageProducer中不需要它并且代码成功执行但在MessageConsumer上需要?还有为什么我们甚至不使用MessageProducer的Connection#start甚至我们需要关闭连接以刷新资源。好像代码有异味。
我看到那个字段开始是一个AtomicBoolean
. 我不是并发和多线程方面的专家,所以,可能有人可以解释为什么对于 MessageProducer,Connection#start 不是强制性的;
B - 带有 ActiveMQ 的 JMS MessageProducer 的示例代码
package com.bzdgn.jms.stackoverflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSSendMessageToQueue {
private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String queueName = "test_queue";
String messageContent = "Hello StackOverflow!";
// Connection Factory from ActiveMQ Implementation
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
// Get connection from Connection Factory
Connection connection = connectionFactory.createConnection();
// Create session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Send Message to Queue
Queue queue = session.createQueue(queueName);
TextMessage msg = session.createTextMessage(messageContent);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(msg);
// Clear resources
session.close();
connection.close();
}
}
C - 带有 ActiveMQ 的 JMS MessageConsumer 示例代码
package com.bzdgn.jms.stackoverflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSConsumeMessageFromQueue {
private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String queueName = "test_queue";
// Connection Factory from ActiveMQ Implementation
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
// Get connection from Connection Factory
Connection connection = connectionFactory.createConnection();
// Create session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Consume Message from the Queue
Queue queue = session.createQueue(queueName);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
Message message = messageConsumer.receive(500);
if ( message != null ) {
if ( message instanceof TextMessage ) {
TextMessage textMessage = (TextMessage) message;
String messageContent = textMessage.getText();
System.out.println("Message Content: " + messageContent);
}
} else {
System.out.println("No message in the queue: " + queueName);
}
// Clear resources
session.close();
connection.close();
}
}
D - 配置和 Maven 依赖
JDK版本是1.8
,我正在运行ActiveMQ 5.15.12
,并且客户端依赖项也使用相同的版本;
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.12</version>
</dependency>