1

我正在尝试示例项目来实现 Spring 3 + Active MQ 请求/响应同步。我创建了一个 spring 配置文件,一个将消息放入队列的消息生产者和一个消费消息并返回响应的消息消费者......

我收到了回复...但是我的示例程序似乎没有结束...当我检查 Apache Active MQ 管理控制台时,我发现每次运行测试类时消费者数量都会不断增加...我必须在 Eclipse 中手动终止它,以便在管理控制台中减少计数...

我在这里提到了这个线程 -堆栈溢出线程- 遇到同样问题的人.. 但看起来我已经有了这个解决方案,但仍然没有看到我的问题得到解决

我也提到了这里这里来创建我的解决方案

所以这是我的代码

    <!-- creates an activemq connection factory using the amq namespace -->

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616" />
</bean>

<!-- 
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
    init-method="start" destroy-method="stop">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="maxConnections" value="100" />
</bean>
 -->

<!-- CachingConnectionFactory Definition, sessionCacheSize property is the 
    number of sessions to cache -->

<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="jmsConnectionFactory" />
    <property name="exceptionListener" ref="jmsExceptionListener" />
    <property name="sessionCacheSize" value="1" />
    <property name="cacheConsumers" value="false" />
    <property name="cacheProducers" value="false" />
</bean>

<!-- JmsTemplate Definition -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <constructor-arg ref="connectionFactory" />

</bean>


<jms:listener-container connection-factory="connectionFactory">
    <jms:listener id="request.queue.listener" destination="test.request"
        ref="testMessageListener" />
</jms:listener-container>

<bean id="WorkerClient" class="com.vzwcorp.legal.eplm.active.mq.framework.WorkerClient" />

请求者类

@Component

公共类请求者{

private static final class ProducerConsumer implements
        SessionCallback<Message> {

    private static final int TIMEOUT = 5000;

    private final String msg;

    private final DestinationResolver destinationResolver;

    private final String queue;

    public ProducerConsumer(final String msg, String queue,
            final DestinationResolver destinationResolver) {
        this.msg = msg;
        this.queue = queue;
        this.destinationResolver = destinationResolver;
    }

    public Message doInJms(final Session session) throws JMSException {
        MessageConsumer consumer = null;
        MessageProducer producer = null;
        try {
            final String correlationId = UUID.randomUUID().toString();
            final Destination requestQueue = destinationResolver
                    .resolveDestinationName(session, queue + ".request",
                            false);
            final Destination replyQueue = destinationResolver
                    .resolveDestinationName(session, queue + ".response",
                            false);
            // Create the consumer first!
            consumer = session.createConsumer(replyQueue,
                    "JMSCorrelationID = '" + correlationId + "'");
            final TextMessage textMessage = session.createTextMessage(msg);
            textMessage.setJMSCorrelationID(correlationId);
            textMessage.setJMSReplyTo(replyQueue);
            // Send the request second!
            producer = session.createProducer(requestQueue);
            producer.send(requestQueue, textMessage);
            // Block on receiving the response with a timeout
            return consumer.receive(TIMEOUT);
        } finally {
            // Don't forget to close your resources
            JmsUtils.closeMessageConsumer(consumer);
            JmsUtils.closeMessageProducer(producer);
        }
    }
}

private final JmsTemplate jmsTemplate;

@Autowired
public Requestor(final JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
}

public String request(final String request, String queue) {
    // Must pass true as the second param to start the connection
    TextMessage message = (TextMessage) jmsTemplate.execute(
            new ProducerConsumer(request, queue, jmsTemplate
                    .getDestinationResolver()), true);
    try {
        return message.getText();
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return "exception in requestor";
    }
}

}

消息监听类

@Component

公共类 TestMessageListener 实现 MessageListener {

@Autowired
private WorkerClient WorkerClient;

@Override
public void onMessage(Message arg0) {
    WorkerClient.delegateToClient(arg0);

}

}

工人客户端类

@Component

公共类 WorkerClient 实现 ApplicationContextAware {

private ApplicationContext ctx;

private JmsTemplate jmsTemplate;

public void delegateToClient(Message arg0) {
    MessageProducer producer = null;
    if (arg0 instanceof TextMessage) {
        try {
            final TextMessage message = (TextMessage) arg0;

            System.out.println("Message received by Listener: "
                    + message.getJMSCorrelationID() + " - "
                    + message.getText());

            jmsTemplate.setDefaultDestination(message.getJMSReplyTo());

            Session session = jmsTemplate.getConnectionFactory()
                    .createConnection()
                    .createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(message.getJMSReplyTo());
            final TextMessage textMessage = session
                    .createTextMessage("I did it at last");
            textMessage.setJMSCorrelationID(message.getJMSCorrelationID());
            textMessage.setJMSReplyTo(message.getJMSReplyTo());
            producer.send(message.getJMSReplyTo(), textMessage);

        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            JmsUtils.closeMessageProducer(producer);
        }
    }
}

@Override
public void setApplicationContext(ApplicationContext arg0)
        throws BeansException {
    ctx = arg0;
    this.jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
}

终于到了测试班

public class TestSync {

public static void main(String[] args) {

    ApplicationContext ctx = new ClassPathXmlApplicationContext(
            "activeMQConfiguration.xml");
    OrderService orderService = (OrderService) ctx.getBean("orderService");
    JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
     Requestor req = new Requestor(jmsTemplate);
    //CopyOfRequestor req = new CopyOfRequestor(jmsTemplate);
    String response = req.request("Hello World", "test");
    System.out.println(response);

}

}

那么如何修复消息消费者返回和我的测试类结束?请帮忙....

4

1 回答 1

0

问题基本上是应用程序上下文保持活动状态可能是因为 JMS 侦听器容器传播的线程。解决方法是在 main 方法的末尾显式调用 ApplicationContext.close(),这应该会导致所有 bean(包括 JMS 侦听器)的有序关闭。

更好的解决方法是使用 Spring Test Support,它将负责初始化和关闭应用程序上下文,而不是您需要显式地执行它。

@RunWith(SpringJUnit4Runner.class)
@ContextConfiguration(...)
public class TestSync{

    @Autowired OrderService orderService;

    @Test
    public void testJMS(){
        ...
    }

}
于 2012-11-15T01:45:39.593 回答