我们最近从 Artemis 迁移到 ActiveMQ,现在我们的消息驱动 Bean 面临一些奇怪的行为。
基本上,当某些应用程序失败时,我们会意识到某些消息丢失了。我们进行了一些测试,以确定是否存在 ack 类型、消息持久性等配置问题。但最后,我们得出的结论和我们得到的测试场景是,当我们有一个 MDB 处理消息时,那里没有等待提交的事务,当应用程序死亡时,消息丢失,代理将其从挂起的消息中删除。
但是,如果我们确实有一个带有待提交提交的事务,代理会保留它,并且很快应用程序启动该消息就可以再次处理。
我们将 XA 和资源适配器与 WildFly Swarm 应用程序一起使用。看起来只要一个事务从 MDB 开始并且有一个提交挂起,即使消费者死亡,应用程序也会通知代理保留消息。如果没有事务,一旦代理识别出消费者失去连接,消息就会从队列中删除。
一些信息:
- 即使定义了所需的事务和容器管理,也会发生同样的情况
- 使用 auto 或 dups ack,也会发生同样的情况
- 消息是持久的
我们缺少一些带有 activeMQ 的配置?我们没有找到任何其他属性说明这种 MDB 和事务的行为。什么告诉经纪人他应该持有正在进行交易的消息?有没有办法在使用 MDB 和 XA 失败时保留所有消息?
这是一个带有测试场景的简单 MDB 代码:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "task-processor"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "maxSessions", propertyValue = "20")
})
@ResourceAdapter("activemq-rar.rar")
public class MyMDB implements MessageListener {
@Inject
private CartaoService service;
@Override
public void onMessage(Message message) {
try {
// 0 - No transaction
// 1 - Will be a transaction with a pending commit
int t = new Random().nextInt(2);
// Just a entity to persist and have a pending commit
Cartao c = new Cartao();
c.setIdBoard(1l);
c.setNome(((TextMessage) message).getText());
c.setStatus("ON");
c.setTempo(10l);
// Prints the message id and if there is a transaction
System.out.println(((TextMessage) message).getJMSMessageID() + " - " + t);
// Sometimes we will have a commit pending
if (t == 1)
service.insert(c);
TimeUnit.SECONDS.sleep(30);
System.out.println("END");
} catch (Exception e) {
e.printStackTrace();
}
}
}
insert 方法只是一种持久化实体的方法,并且是事务所需的。