我正在尝试使用 spring + atomikos 设计一个应用程序,该应用程序读取一批 N 消息并将它们保存到单个事务中的数据库中。它必须是一批消息,因为数据只有在成批时才是一致的,即单个消息不足以进行一致的事务。此外,每条消息都有一个事务绝对会影响我的表现。这不是典型的 JMS + DB 应用程序,因此我很难在网上找到示例(我尝试使用 atomikos 网站上建议的 MessageListener ,但每条消息创建一个事务)。使用 Spring 实现这一目标的最佳方法是什么?谢谢
乔瓦尼
我正在尝试使用 spring + atomikos 设计一个应用程序,该应用程序读取一批 N 消息并将它们保存到单个事务中的数据库中。它必须是一批消息,因为数据只有在成批时才是一致的,即单个消息不足以进行一致的事务。此外,每条消息都有一个事务绝对会影响我的表现。这不是典型的 JMS + DB 应用程序,因此我很难在网上找到示例(我尝试使用 atomikos 网站上建议的 MessageListener ,但每条消息创建一个事务)。使用 Spring 实现这一目标的最佳方法是什么?谢谢
乔瓦尼
我认为您的问题与 Spring 或 Atomikos 无关。这更像是一个设计问题。
如果您需要在单个事务中提交所有消息,请在将完整消息发送到持久层之前将它们全部合并到一个更大的结构中。
弄清楚我需要做什么。在我的spring配置文件中,我需要配置以下内容:
org.springframework.transaction.jta.JtaTransactionManager
(例如配置为使用atomikos),例如,带有类的实例com.atomikos.icatch.jta.UserTransactionManager
和com.atomikos.icatch.jta.UserTransactionImp
;<tx:annotation-driven transaction-manager="txManager">
客户端代码中使用注释驱动事务的标签;com.atomikos.jms.AtomikosConnectionFactoryBean
;com.tibco.tibjms.TibjmsQueue
或类似的实例;com.atomikos.jdbc.AtomikosDataSourceBean
(或com.atomikos.jdbc.nonxa.AtomikosNonXADataSourceBean
为最后一个资源策略场景);一个自定义 DAO 类的实例,其中注入了先前定义的 JmsTemplate 和 SqlUpdate 对象:此类将具有一个或多个带注释@Transactional
的方法,这些方法将使用 JmsTemplate 执行事务以接收消息,并使用 SqlUpdate(s) 将它们持久保存在数据库中,例如,它可能是一个如下所示的类:
public class MyDao{
JmsTemplate jmsTemplate; // getter/setter omitted for clarity
BatchSqlUpdate sqlUpdate; // getter/setter omitted for clarity
@Transactional
public void persistMessages(int n){
// map used for the sqlUpdate object
Map<String, Object> params = new HashMap<>();
// need to reset this since it's being reused
sqlUpdate.reset();
for(int i=0;i<n;i++){
// retrieve a message synchronously
Message msg = jmsTemplate.receive();
// transform the message
doSomeMagic(msg,params);
// set parameters in the SQL
sqlUpdate.updateByNamedParam(params);
}
// now the batch can be flushed
sqlUpdate.flush();
}
private void doSomeMagic(Message msg, Map<String,Object> params){
// implementation is application-dependent!
// the only assumption is that somehow the
// message can be used to set the named
// parameters in the sqlUpdate object
}
}
唯一值得注意的是DAO必须是spring管理的,因为使用注解需要spring创建代理。