我使用 ActiveMQ 作为代理来传递消息。这些消息旨在以数据库形式编写。有时,数据库无法访问或关闭。在这种情况下,我想回滚我的消息以稍后重试这条消息,并且我想继续阅读其他消息。
这段代码工作正常,除了一点:回滚的消息阻止我阅读其他消息:
private Connection getConnection() throws JMSException {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages
redeliveryPolicy.setInitialRedeliveryDelay(5 *1000); // will wait 5s to read that message
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
((ActiveMQConnection)connection).setUseAsyncSend(true);
((ActiveMQConnection)connection).setDispatchAsync(true);
((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
((ActiveMQConnection)connection).setStatsEnabled(true);
connection.setClientID("myClientID");
return connection;
}
我以这种方式创建我的会话:
session = connection.createSession(true, Session.SESSION_TRANSACTED);
回滚很容易问:
session.rollback();
假设我的队列中有 3 条消息:
1: ok
2: KO (will need to be treated again : the message I want to rollback)
3: ok
4: ok
我的消费者会做(线性序列):
commit 1
rollback 2
wait 5s
rollback 2
wait 5s
rollback 2
put 2 in dead letter queue (ActiveMQ.DLQ)
commit 3
commit 4
但我想要 :
commit 1
rollback 2
commit 3
commit 4
wait 5s
rollback 2
wait 5s
rollback 2
wait 5s
put 2 in dead letter queue (ActiveMQ.DLQ)
那么,如何配置我的消费者以延迟我的回滚消息?