5

我正在使用 ActiveMQ 5.8.0 和 Camel 2.10.4。我正在从 JMS 队列中读取 ExchangePattern.InOnly 消息,并希望将那些在给定时间内未处理的消息显式过期到命名的死信队列。

我有以下路线:

public class FulfillmentRequestRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
        from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=10000&transacted=true")
            .transacted()
            .to("mock:initialProcessor");
    }
}

以及以下 ActiveMQ 配置:

<!-- Configure the ActiveMQ JMS broker server to listen on TCP port 61610 -->
<broker:broker useJmx="true" persistent="true" brokerName="myBroker">
    <broker:transportConnectors>
        <!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
        <broker:transportConnector name="vm" uri="vm://myBroker" />
        <!-- expose a TCP transport for clients to use -->
        <broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" />
    </broker:transportConnectors>
    <broker:persistenceAdapter>
        <broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
    </broker:persistenceAdapter>
    <broker:destinationPolicy>
        <broker:policyMap>
          <broker:policyEntries>
            <!-- Set the following policy on all queues using the '>' wildcard -->
            <broker:policyEntry queue=">">
              <broker:deadLetterStrategy>
                <broker:sharedDeadLetterStrategy processExpired="true"
                                                 processNonPersistent="true" />
              </broker:deadLetterStrategy>
            </broker:policyEntry>
          </broker:policyEntries>
        </broker:policyMap>
    </broker:destinationPolicy>
</broker:broker>

<!-- Configure Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<!-- Using the ActiveMQComponent gives us connection pooling for free -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="brokerURL" value="vm://myBroker" />
    <property name="transacted" value="true"/>
    <property name="transactionManager" ref="jmsTransactionManager"/>
    <property name="acceptMessagesWhileStopping" value="false"/>
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="vm://myBroker" />
</bean>

最后我有一个单元测试,它创建了两条消息,一条将被处理,另一条应该超时。

@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {

    @EndpointInject(uri = "mock:initialProcessor")
    protected MockEndpoint mockEndpoint;

    @Produce
    protected ProducerTemplate template;

    protected ConsumerTemplate consumer;

    @Autowired
    @Qualifier("camel-server")
    protected CamelContext context;

    @DirtiesContext
    @Test
    public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception {

        // Given
        consumer = context.createConsumerTemplate();

        int expectedValidMessageCount = 2;
        mockEndpoint.expectedMessageCount(expectedValidMessageCount);        

        // When 
        String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT TIMEOUT</body>";
        template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1);

        long ttl = System.currentTimeMillis() - 12000000;
        String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED OUT!!!!!</body>";
        template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);

        // Then
        // The second message is not processed
        mockEndpoint.assertIsSatisfied();                         // This should not pass with "2" set above

        List<Exchange> list = mockEndpoint.getReceivedExchanges();
        String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class);

        assertEquals(xmlBody1, notTimedOutMessageBody);

        Thread.sleep(5000);

        // And is instead routed to the timedOut JMS queue
        Object dlqBody  = consumer.receiveBodyNoWait("jms:queue:dead");
        assertNotNull("Should not lose the message", dlqBody);          // This fails
        assertEquals(xmlBody2, dlqBody);
    }

    @Configuration
    public static class ContextConfig extends SingleRouteCamelConfiguration {

        @Bean
        public RouteBuilder route() {
            return new FulfillmentRequestRoute();
        }
    }
}

尽管考虑了下面@Petter 的提示(谢谢),但第二条消息根本没有过期。

我有这个单元测试模式在测试中的其他地方工作,这些测试显式地从 Camel 中的事务中抛出异常,但是当这一切似乎已经被处理时,我宁愿不必自己手动开始查看标头。

4

1 回答 1

9

在队列中过期一次或在 ActiveMQ 中命中队列时的消息将采取以下任一操作:

  1. 如果消息是持久的,那么它将被放置在队列ActiveMQ.DLQ中。
  2. 如果消息是非持久的,它将立即被丢弃,恕不另行通知。

在您的 ActiveMQ 配置中,您已禁用持久性,因此快速猜测您的消息将被删除,而不会通知您的应用程序。

在这里阅读更多

于 2013-05-17T20:05:33.433 回答