我有一个案例,我想在同一个“主”线程中运行 DefaultMessageListenerContainer。现在它使用 SimpleAsyncTaskExecutor 每次收到消息时都会产生新线程。
我们有一个测试用例,它连接到不同的分布式系统并进行处理,最后它断言了一些事情。由于 DefaultMessageListenerContainer 在单独的线程中运行,主线程在 DefaultMessageListenerContainer 完成之前返回并开始执行断言。这会导致测试用例的失败。作为解决方法,我们让主线程休眠了几秒钟。
示例配置
<int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="defaultMessageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />
<beans:bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="taskExecutor" ref="taskExecutor"/>
<beans:property name="maxMessagesPerTask" value="10"/>
<beans:property name="sessionTransacted" value="true"/>
</beans:bean>
<beans:bean id="taskExecutor" class="org.springframework.scheduling.timer.TimerTaskExecutor" />
我在这里尝试使用 TimerTaskExecutor,因为它创建了单个线程,但该线程与主线程分开,因此问题未解决。我尝试使用 SyncTaskExecutor 但这也不起作用(或者我可能会提供正确的属性值?)。
答:
我们通过使用 SimpleMessageListenerContainer 解决了这个问题。这是新配置
<int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="messageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />
<beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="sessionTransacted" value="true"/>
<beans:property name="exposeListenerSession" value="false"/>
</beans:bean>