我们使用 JMS 在 Java 1.8 SE 环境中处理消息。消息来自 Oracle (12) 高级队列。
我们想从 JMS 队列中读取一条消息,根据它做一些工作,并将结果保存在数据库中。我们不想丢失任何消息,也不想重复处理任何消息。换句话说,我们希望 JMS 消息和相关数据库活动的处理成为单个事务。
我们已经阅读了有关如何执行此操作的各种文章(包括JMS 中的事务和重新传递、JMS 消息传递可靠性和确认模式、具有事务的可靠 JMS)。共识似乎是使用 JTA/XA,但我们希望使用更简单的东西。
我们使用 Oracle 的 Advanced Queuing 作为我们的 JMS 提供程序,因此我们决定看看我们是否可以为 JMS 和数据库活动使用相同的数据库连接,以便单个提交对 JMS 和数据库活动都有效。它似乎奏效了。
在下面的代码中,我们在初始化 JMS 队列时使用现有的 SQL 连接创建了一个 QueueConnection。处理完消息后,提交 JMS 会话也会提交数据库更改。我们还没有看到其他地方讨论过这种方法,所以我们想知道是否
- 我们有一个适用于 Oracle 高级队列的可靠解决方案,
- 我们有一个解决方案碰巧在这个版本的 Oracle Advanced Queueing 中工作,
- 我们的测试用例真的非常非常幸运,而这种方法充满了危险
请评论我们的方法是否应该可靠或者我们是否应该使用 JTA/XA。
public class OracleJmsQueue {
private DataSource dataSource;
protected Queue queue;
protected QueueConnection queueConnection;
protected QueueReceiver queueReceiver;
protected QueueSession queueSession;
private java.sql.Connection dbConnection = null;
protected void initQueueSession()
throws JMSException, SQLException {
// Connect to the database source of messages
DataSource dataSource = getDataSource();
dbConnection = dataSource.getConnection();
dbConnection.setAutoCommit(false);
queueConnection = AQjmsQueueConnectionFactory.createQueueConnection(
dbConnection);
queueSession =
queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
queue = ((AQjmsSession)queueSession).getQueue(queueUser, queueName);
queueReceiver = queueSession.createReceiver(queue);
}
public void run() {
initQueueSession();
// code omitted
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(200);
final Message message = queueReceiver.receiveNoWait();
if (message != null) {
processMessage(message); // alters DB tables
commitSession();
}
}
// catches omitted
}
}
protected void commitSession() throws JMSException {
logger.info("Committing " + queueName + " queue session");
queueSession.commit();
}
} // class OracleJmsQueue