我们正在尝试将 ActiveMQ 5.9.0 设置为使用 JMS 主题的消息代理,但我们在使用消息时遇到了一些问题。
出于测试目的,我们有 1 个主题、1 个事件生产者和 1 个消费者的简单配置。我们一个接一个地发送 10 条消息,但是每次运行应用程序时,其中 1-3 条消息没有被消费!其他消息被消费并处理得很好。我们可以在 ActiveMQ 管理控制台中看到我们发布到主题的所有消息,但它们永远不会到达消费者,即使我们重新启动应用程序(我们可以看到“入队”和“出队”列中的数字是不同的)。
编辑:我还应该提到,当使用队列而不是主题时,不会出现这个问题。
为什么会这样?它可能与atomikos(即事务管理器)有关吗?或者也许配置中的其他东西?欢迎任何想法/建议。:)
这是 ActiveMQ/JMS 弹簧配置:
<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName" value="amq" />
<property name="xaConnectionFactory">
<bean class="org.apache.activemq.spring.ActiveMQXAConnectionFactory"
p:brokerURL="${activemq_url}" />
</property>
<property name="maxPoolSize" value="10" />
<property name="localTransactionMode" value="false" />
</bean>
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="connectionFactory" />
</bean>
<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="sessionTransacted" value="true" />
<property name="pubSubDomain" value="true"/>
</bean>
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="test.topic" />
</bean>
<!-- The Spring message listener container configuration -->
<jms:listener-container destination-type="topic"
connection-factory="connectionFactory" transaction-manager="transactionManager"
acknowledge="transacted" concurrency="1">
<jms:listener destination="test.topic" ref="testReceiver"
method="receive" />
</jms:listener-container>
制作人:
@Component("producer")
public class EventProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Transactional
public void produceEvent(String message) {
this.jmsTemplate.convertAndSend("test.topic", message);
}
}
消费者:
@Component("testReceiver")
public class EventListener {
@Transactional
public void receive(String message) {
System.out.println(message);
}
}
考试:
@Autowired
private EventProducer eventProducer;
public void testMessages() {
for (int i = 1; i <= 10; i++) {
this.eventProducer.produceEvent("message" + i);
}