2

我有一个案例,我想在同一个“主”线程中运行 DefaultMessageListenerContainer。现在它使用 SimpleAsyncTaskExecutor 每次收到消息时都会产生新线程。

我们有一个测试用例,它连接到不同的分布式系统并进行处理,最后它断言了一些事情。由于 DefaultMessageListenerContainer 在单独的线程中运行,主线程在 DefaultMessageListenerContainer 完成之前返回并开始执行断言。这会导致测试用例的失败。作为解决方法,我们让主线程休眠了几秒钟。

示例配置

  <int-jms:message-driven-channel-adapter
        id="mq.txbus.publisher.channel.adapter"
        container="defaultMessageListenerContainer"
        channel="inbound.endpoint.publisher"
        acknowledge="transacted"
        extract-payload="true" />

  <beans:bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <beans:property name="connectionFactory" ref="mockConnectionFactory"/>
    <beans:property name="destination" ref="publisherToTxmQueue"/>
    <beans:property name="taskExecutor" ref="taskExecutor"/>
    <beans:property name="maxMessagesPerTask" value="10"/>
    <beans:property name="sessionTransacted" value="true"/>
</beans:bean>

<beans:bean id="taskExecutor" class="org.springframework.scheduling.timer.TimerTaskExecutor" />

我在这里尝试使用 TimerTaskExecutor,因为它创建了单个线程,但该线程与主线程分开,因此问题未解决。我尝试使用 SyncTaskExecutor 但这也不起作用(或者我可能会提供正确的属性值?)。

答:
我们通过使用 SimpleMessageListenerContainer 解决了这个问题。这是新配置

     <int-jms:message-driven-channel-adapter
        id="mq.txbus.publisher.channel.adapter"
        container="messageListenerContainer"
        channel="inbound.endpoint.publisher"
        acknowledge="transacted"
        extract-payload="true" />

<beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">     
    <beans:property name="connectionFactory" ref="mockConnectionFactory"/>     
    <beans:property name="destination" ref="publisherToTxmQueue"/>
    <beans:property name="sessionTransacted" value="true"/>
    <beans:property name="exposeListenerSession" value="false"/>
</beans:bean> 
4

2 回答 2

1

首先你必须明白本质上是异步的,不会阻塞。这意味着一旦您将消息发送到队列,它将由另一个线程处理,可能在另一台机器上,如果消费者关闭,可能会在几分钟或几小时后处理。

阅读测试用例的描述,您似乎正在做一些系统/集成测试。不幸的是,除了等待,您无能为力,但是您不应盲目等待,因为这会使您的测试变慢但也不是很稳定 - 无论您等待多长时间,在繁忙的系统或一些冗长的 GC 过程中,您的测试可能仍然需要时间即使没有错误。

因此,与其休眠固定的秒数,不如休眠约 100 毫秒并检查仅在消息处理完成时才满足的某些条件。例如,如果处理消息将一些记录插入数据库,请定期检查数据库。

更优雅的方式(无需忙于等待)是实现请求/回复模式,请参阅如何使用 JMS 实现请求响应?实施细节。基本上,在发送消息时,您定义一个回复队列并阻止等待该队列中的消息。处理完原始消息后,消费者应将回复消息发送到定义的队列。当您收到该消息时 - 执行所有断言。

于 2012-05-07T16:26:26.613 回答
0

如果它用于测试目的,那么为什么不使用 CyclicBarrier 在 jms 活动完成后运行断言。

于 2012-05-08T19:58:34.747 回答