这是嵌套问题,请帮忙分析原因
整体代码结构说明:</p>
TransactionA 在一些DB 操作中,然后打开transactionB。TransactionA 提交时间触发自定义触发器,该触发器在开启事务C(PROPAGATION_REQUIRES_NEW)
报错过程是这样的:第一次运行是正确的没有错误,第二次运行时得到一个关闭的连接,测试代码是这样的:</p>
@Test
public void testNotify() {
ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
//The following mainCode...
}
});
}
}
错误堆栈:</p>
Caused by: java.sql.SQLException: connection holder is null
at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1122) ~[druid-1.0.24.jar:1.0.24]
at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1113) ~[druid-1.0.24.jar:1.0.24]
at com.alibaba.druid.pool.DruidPooledConnection.prepareStatement(DruidPooledConnection.java:318) ~[druid-1.0.24.jar:1.0.24]
at org.apache.ibatis.executor.statement.PreparedStatementHandler.instantiateStatement(PreparedStatementHandler.java:75) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.statement.BaseStatementHandler.prepare(BaseStatementHandler.java:85) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.statement.RoutingStatementHandler.prepare(RoutingStatementHandler.java:57) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:73) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:59) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:267) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:137) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:96) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:77) ~[mybatis-3.2.8.jar:3.2.8]
at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
at org.apache.ibatis.plugin.Invocation.proceed(Invocation.java:49) ~[mybatis-3.2.8.jar:3.2.8]
at com.youzan.pay.assetcenter.dal.monitor.SqlMonitorManager.intercept(SqlMonitorManager.java:53) ~[assetcenter.dal-0.0.1-SNAPSHOT.jar:na]
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:60) ~[mybatis-3.2.8.jar:3.2.8]
at com.sun.proxy.$Proxy60.query(Unknown Source) ~[na:na]
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:108) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:102) ~[mybatis-3.2.8.jar:3.2.8]
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:66) ~[mybatis-3.2.8.jar:3.2.8]
at sun.reflect.GeneratedMethodAccessor140.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:358) ~[mybatis-spring-1.2.2.jar:1.2.2]
主要代码:</p>
//Open a local transaction PROPAGATION_REQUIRED
transactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
AcquireOrder acquireOrder = acquireOrderRepository.active(payResult.getAcquireNo());
// Do some set...
acquireOrderRepository.reStore(acquireOrder);
}
//1. Look at the following
transactionActivityService.start("assetcenter", "", new HashMap<>());
//... Some of the code to construct command
//2.Look at the following
transactionActivityService.enrollAction(SETTLEMENT_TOPIC,
JSONObject.toJSONString(settleCommand));
//3.Look at the following
transactionActivityService.enrollAction(CHARGE_TOPIC, JSONObject.toJSONString(payCommand));
}
return Boolean.TRUE;
}
});
1 详细代码:</p>
try {
TransactionActivityRecord activity = requiredTransactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
transactionActivityStore.addTransactionActivity(activityRecord);
}
});
TransactionActivityContextHolder.setCurrent(activity);
} catch (RuntimeException e) {
TransactionActivityContextHolder.clear();
throw e;
} finally {
if (TransactionActivityContextHolder.isActive()) {
//The registration of synchronizer
TransactionSynchronization synchronization = new FinalizeTransactionSynchronization(transactionActivityManager);
TransactionSynchronizationManager.registerSynchronization(synchronization);
}
}
2、3 详细代码:</p>
requiredTransactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
transactionActivityStore.addTransactionAction(actionRecord);
}
});
3 同步码(问题码):</p>
public void afterCompletion(int status) {
boolean actionRes = doSubmitActions(actions);
//This code raises questions.
if(actionRes && activity.getState() == TransactionState.INIT) {
transactionActivityStore.updateTransactionActivityState(activity.getTxId(),TransactionState.PREPARE);
}
}
/**
* 执行事务参与者的提交
* @param actions
* @return
*/
private boolean doSubmitActions(List<TransactionActionRecord> actions) {
AtomicBoolean result = new AtomicBoolean(true);
if(actions != null && !actions.isEmpty()) {
actions.stream().filter(action -> action.getState() == TransactionState.INIT).forEach(action -> {
boolean res = dtsTransactionActionProducer
.push(action.getActionName(), JSON.toJSONString(action.getContext()));
if (res) {
//Open a local transaction PROPAGATION_REQUIRES_NEW
requiresNewTransactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
transactionActivityStore
.updateTransactionActionState(action.getActionId(), TransactionState.PREPARE);
}
});
} else {
log.warn("submit transaction-action failure topic:{},context:{}", action.getActionName(),
action.getContext());
result.compareAndSet(true,false);
}
});
}
return result.get();
}
3 同步码(正确码):</p>
public void afterCompletion(int status) {
boolean actionRes = doSubmitActions(actions);
//Put the following code in the doSubmitActions method in the requiresNewTransactionTemplate execution
if(actionRes && activity.getState() == TransactionState.INIT) {
transactionActivityStore.updateTransactionActivityState(activity.getTxId(),TransactionState.PREPARE);
}
}