我有一个带有 3 个分布式数据源(com.atomikos.jdbc.AtomikosDataSourceBean)的应用程序。我正在使用 Atomikos 事务管理器作为 JTA 实现。每个数据源都适用于 PostgreSQL 数据库。现在,我依次对每个数据源调用我的查询,一切正常。
我想知道,是否可以使用 JTA 并行调用我的查询(多线程,并发)?
我尝试使用 jdbcTemplate (Spring) 在新创建的线程中简单地调用查询。首先,我遇到了一个春天的问题。Spring 将事务上下文存储在 ThreadLocal 字段中,因此在我的新线程(Spring 事务管理器和多线程)中没有正确解析)。我通过将相同的事务上下文设置到新创建的线程的 ThreadLocal 中解决了这个问题。但是我在 Atomikos 代码中遇到的同样的问题。它们还将 CompositeTransactionImp 存储在线程范围映射 (BaseTrancationManager#getCurrentTx) 中。但在 Atomikos 的情况下,不可能为新线程设置值。所以我不能同时执行我的查询,因为 Atomikos 似乎不支持这种方法。但我也查看了 JTA 规范,发现它们如下:“多个线程可能同时与同一个全局事务相关联。” (“3.2 TransactionManager 接口”,http://download.oracle.com/otndocs/jcp/jta-1.1-spec-oth-JSpec/?submit=Download)
问题:如何在一个全局事务的范围内使用 JTA(2 阶段提交)同时调用两个或多个对不同数据源的查询?
Tomcat 上下文中的 DataSources 配置:
<Resource name="jdbc/db1" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db1"
uniqueResourceName="jdbc/db1"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
<Resource name="jdbc/db2" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db2"
uniqueResourceName="jdbc/db2"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
<Resource name="jdbc/db3" auth="Container" type="com.atomikos.jdbc.AtomikosDataSourceBean"
factory="com.company.package.AtomikosDataSourceBeanFactory"
xaDataSourceClassName="org.postgresql.xa.PGXADataSource"
xaProperties.serverName="localhost"
xaProperties.portNumber="5451"
xaProperties.databaseName="db3"
uniqueResourceName="jdbc/db3"
xaProperties.user="secretpassword"
xaProperties.password="secretpassword"
minPoolSize="5"
maxPoolSize="10"
testQuery="SELECT 1" />
Spring上下文中的事务管理器配置:
<bean id="transactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close" lazy-init="true">
<property name="forceShutdown" value="false" />
</bean>
代码:
final SqlParameterSource parameters = getSqlParameterSourceCreator().convert(entity);
// Solving Spring's ThreadLocal issue: saving thread local params
final Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
final List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
final boolean actualTransactionActive = TransactionSynchronizationManager.isActualTransactionActive();
final String currentTransactionName = TransactionSynchronizationManager.getCurrentTransactionName();
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>();
// Running query in a separate thread.
final Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
// Solving Spring's ThreadLocal issue: setting thread local values to newly created thread.
for (Map.Entry<Object, Object> entry : resourceMap.entrySet()) {
TransactionSynchronizationManager.bindResource(entry.getKey(), entry.getValue());
}
if (synchronizations != null && !synchronizations.isEmpty()) {
TransactionSynchronizationManager.initSynchronization();
for (TransactionSynchronization synchronization : synchronizations) {
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
// Executing query.
final String query = "insert into ...";
NamedParameterJdbcTemplate template = new NamedParameterJdbcTemplate(dataSourceOne);
template.update(query, parameters);
} catch (final Throwable ex) {
exceptionHolder.set(ex);
}
}
});
thread.start();
// ... same code as above for other dataSources.
// allThreds.join(); - joining to all threads.