我正在使用 ActiveMQ 5.8.0 和 Camel 2.10.4。我正在从 JMS 队列中读取 ExchangePattern.InOnly 消息,并希望将那些在给定时间内未处理的消息显式过期到命名的死信队列。
我有以下路线:
public class FulfillmentRequestRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=10000&transacted=true")
.transacted()
.to("mock:initialProcessor");
}
}
以及以下 ActiveMQ 配置:
<!-- Configure the ActiveMQ JMS broker server to listen on TCP port 61610 -->
<broker:broker useJmx="true" persistent="true" brokerName="myBroker">
<broker:transportConnectors>
<!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
<broker:transportConnector name="vm" uri="vm://myBroker" />
<!-- expose a TCP transport for clients to use -->
<broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" />
</broker:transportConnectors>
<broker:persistenceAdapter>
<broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
</broker:persistenceAdapter>
<broker:destinationPolicy>
<broker:policyMap>
<broker:policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<broker:policyEntry queue=">">
<broker:deadLetterStrategy>
<broker:sharedDeadLetterStrategy processExpired="true"
processNonPersistent="true" />
</broker:deadLetterStrategy>
</broker:policyEntry>
</broker:policyEntries>
</broker:policyMap>
</broker:destinationPolicy>
</broker:broker>
<!-- Configure Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<!-- Using the ActiveMQComponent gives us connection pooling for free -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="vm://myBroker" />
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="acceptMessagesWhileStopping" value="false"/>
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://myBroker" />
</bean>
最后我有一个单元测试,它创建了两条消息,一条将被处理,另一条应该超时。
@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {
@EndpointInject(uri = "mock:initialProcessor")
protected MockEndpoint mockEndpoint;
@Produce
protected ProducerTemplate template;
protected ConsumerTemplate consumer;
@Autowired
@Qualifier("camel-server")
protected CamelContext context;
@DirtiesContext
@Test
public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception {
// Given
consumer = context.createConsumerTemplate();
int expectedValidMessageCount = 2;
mockEndpoint.expectedMessageCount(expectedValidMessageCount);
// When
String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT TIMEOUT</body>";
template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1);
long ttl = System.currentTimeMillis() - 12000000;
String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED OUT!!!!!</body>";
template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);
// Then
// The second message is not processed
mockEndpoint.assertIsSatisfied(); // This should not pass with "2" set above
List<Exchange> list = mockEndpoint.getReceivedExchanges();
String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class);
assertEquals(xmlBody1, notTimedOutMessageBody);
Thread.sleep(5000);
// And is instead routed to the timedOut JMS queue
Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead");
assertNotNull("Should not lose the message", dlqBody); // This fails
assertEquals(xmlBody2, dlqBody);
}
@Configuration
public static class ContextConfig extends SingleRouteCamelConfiguration {
@Bean
public RouteBuilder route() {
return new FulfillmentRequestRoute();
}
}
}
尽管考虑了下面@Petter 的提示(谢谢),但第二条消息根本没有过期。
我有这个单元测试模式在测试中的其他地方工作,这些测试显式地从 Camel 中的事务中抛出异常,但是当这一切似乎已经被处理时,我宁愿不必自己手动开始查看标头。