1

一个问题

我知道有一个类似的问题,但在 SO 中不一样。

我试图了解JMS 中MessageProducerMessageConsumer的幕后情况。使用ActiveMQ的实现,我编写了一个简单的MessageProducer示例来将消息发送到队列,并编写了一个MessageConsumer示例来使用队列中的消息,同时在本地运行ActiveMQ 。

向队列发送消息需要Connection#start方法。确切的调试点如下。Connection#start触发ActiveMQSession#start方法。此方法在调用Connection#start时触发。请参阅以下调试点org.apache.activemq.ActiveMQSession#start

ActiveMQ 调试点

问题是,Connection#start不是MessageProducer明确需要的,而是MessageConsumer需要的。但是,对于这两个示例,我们都需要清除资源(会话连接)。我意识到的是,如果我在生产者上删除Connection#start方法,代码将执行,调试点不会被触发(甚至在引擎盖下也不会),我会在队列中看到消息。但是,如果我在消费者上删除 Connection#start 方法,代码将不会执行,这就是问题,为什么在MessageProducer中不需要它并且代码成功执行但在MessageConsumer上需要?还有为什么我们甚至不使用MessageProducerConnection#start甚至我们需要关闭连接以刷新资源。好像代码有异味。

我看到那个字段开始是一个AtomicBoolean. 我不是并发和多线程方面的专家,所以,可能有人可以解释为什么对于 MessageProducer,Connection#start 不是强制性的;

org.apache.activemq.ActiveMQSession - 开始字段

B - 带有 ActiveMQ 的 JMS MessageProducer 的示例代码

package com.bzdgn.jms.stackoverflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSSendMessageToQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        String messageContent = "Hello StackOverflow!";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // Send Message to Queue
        Queue queue = session.createQueue(queueName);
        TextMessage msg = session.createTextMessage(messageContent);
        MessageProducer messageProducer = session.createProducer(queue);
        messageProducer.send(msg);
        
        // Clear resources
        session.close();
        connection.close();
    }

}

C - 带有 ActiveMQ 的 JMS MessageConsumer 示例代码

package com.bzdgn.jms.stackoverflow;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumeMessageFromQueue {
    
    private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";

    public static void main(String[] args) throws JMSException {
        String queueName = "test_queue";
        
        // Connection Factory from ActiveMQ Implementation
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
        
        // Get connection from Connection Factory
        Connection connection = connectionFactory.createConnection();
        
        // Create session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // Consume Message from the Queue
        Queue queue = session.createQueue(queueName);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        
        connection.start();
        
        Message message = messageConsumer.receive(500);
        
        if ( message != null ) {
            if ( message instanceof TextMessage ) {
                TextMessage textMessage = (TextMessage) message;
                String messageContent = textMessage.getText();
                System.out.println("Message Content: " + messageContent);
            }
        } else {
            System.out.println("No message in the queue: " + queueName);
        }
        
        // Clear resources
        session.close();
        connection.close();
    }
    
}

D - 配置和 Maven 依赖

JDK版本是1.8,我正在运行ActiveMQ 5.15.12,并且客户端依赖项也使用相同的版本;

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.15.12</version>
</dependency>
4

1 回答 1

2

此处的行为由 JMS 规范规定。简单地说,javax.jms.Connection.start()适用于消费者而非生产者。它告诉代理开始向与连接关联的消费者传递消息。JavaDoc 是Connection这样说的:

通常将连接保持在停止模式,直到设置完成(即,直到所有消息使用者都已创建)。此时,客户端调用连接的 start 方法,消息开始到达连接的消费者。此设置约定最大限度地减少了客户端仍在设置自身时异步消息传递可能导致的任何客户端混淆。

可以立即开始连接,之后可以进行设置。执行此操作的客户端必须准备好在仍处于设置过程中时处理异步消息传递。

start()方法对生产者没有影响。您正在看到预期的行为。

值得注意的是,如果您使用的是 JMS 2 中的简化 API,则此行为会有所不同。如果您使用 aJMSContext创建 a,JMSConsumer则消息传递会自动开始。需要明确的是,ActiveMQ 5.x 没有实现 JMS 2,但ActiveMQ Artemis实现了。

于 2020-10-02T12:39:45.880 回答