-1

这是嵌套问题,请帮忙分析原因

整体代码结构说明:</p>

TransactionA 在一些DB 操作中,然后打开transactionB。TransactionA 提交时间触发自定义触发器,该触发器在开启事务C(PROPAGATION_REQUIRES_NEW)

在此处输入图像描述

报错过程是这样的:第一次运行是正确的没有错误,第二次运行时得到一个关闭的连接,测试代码是这样的:</p>

@Test
public void testNotify() {

    ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();

    while (true) {

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {

              //The following mainCode...

            }
        });
    }

}

错误堆栈:</p>

Caused by: java.sql.SQLException: connection holder is null
    at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1122) ~[druid-1.0.24.jar:1.0.24]
    at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1113) ~[druid-1.0.24.jar:1.0.24]
    at com.alibaba.druid.pool.DruidPooledConnection.prepareStatement(DruidPooledConnection.java:318) ~[druid-1.0.24.jar:1.0.24]
    at org.apache.ibatis.executor.statement.PreparedStatementHandler.instantiateStatement(PreparedStatementHandler.java:75) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.statement.BaseStatementHandler.prepare(BaseStatementHandler.java:85) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.statement.RoutingStatementHandler.prepare(RoutingStatementHandler.java:57) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.SimpleExecutor.prepareStatement(SimpleExecutor.java:73) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:59) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:267) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:137) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:96) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:77) ~[mybatis-3.2.8.jar:3.2.8]
    at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
    at org.apache.ibatis.plugin.Invocation.proceed(Invocation.java:49) ~[mybatis-3.2.8.jar:3.2.8]
    at com.youzan.pay.assetcenter.dal.monitor.SqlMonitorManager.intercept(SqlMonitorManager.java:53) ~[assetcenter.dal-0.0.1-SNAPSHOT.jar:na]
    at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:60) ~[mybatis-3.2.8.jar:3.2.8]
    at com.sun.proxy.$Proxy60.query(Unknown Source) ~[na:na]
    at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:108) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:102) ~[mybatis-3.2.8.jar:3.2.8]
    at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:66) ~[mybatis-3.2.8.jar:3.2.8]
    at sun.reflect.GeneratedMethodAccessor140.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_45]
    at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_45]
    at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:358) ~[mybatis-spring-1.2.2.jar:1.2.2]

主要代码:</p>

    //Open a local transaction PROPAGATION_REQUIRED
    transactionTemplate.execute(new TransactionCallback<Boolean>() {

            @Override
            public Boolean doInTransaction(TransactionStatus status) {

                AcquireOrder acquireOrder = acquireOrderRepository.active(payResult.getAcquireNo());

                // Do some set...
                    acquireOrderRepository.reStore(acquireOrder);
                }

                 //1. Look at the following
                    transactionActivityService.start("assetcenter", "", new HashMap<>());

                    //... Some of the code to construct command

                    //2.Look at the following
                    transactionActivityService.enrollAction(SETTLEMENT_TOPIC,
                        JSONObject.toJSONString(settleCommand));

                    //3.Look at the following
                    transactionActivityService.enrollAction(CHARGE_TOPIC, JSONObject.toJSONString(payCommand));
                }

                return Boolean.TRUE;
            }
        });

1 详细代码:</p>

try {
            TransactionActivityRecord activity = requiredTransactionTemplate.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                transactionActivityStore.addTransactionActivity(activityRecord);
            }
        });
            TransactionActivityContextHolder.setCurrent(activity);
        } catch (RuntimeException e) {
            TransactionActivityContextHolder.clear();
            throw e;
        } finally {
            if (TransactionActivityContextHolder.isActive()) {

                //The registration of synchronizer
                TransactionSynchronization synchronization = new FinalizeTransactionSynchronization(transactionActivityManager);
                TransactionSynchronizationManager.registerSynchronization(synchronization);
            }
        }

2、3 详细代码:</p>

requiredTransactionTemplate.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                transactionActivityStore.addTransactionAction(actionRecord);
            }
        });

3 同步码(问题码):</p>

public void afterCompletion(int status) {

        boolean actionRes = doSubmitActions(actions);

//This code raises questions.
        if(actionRes && activity.getState() == TransactionState.INIT) {
            transactionActivityStore.updateTransactionActivityState(activity.getTxId(),TransactionState.PREPARE);
        }

    }


/**
     * 执行事务参与者的提交
     * @param actions
     * @return
     */
    private boolean doSubmitActions(List<TransactionActionRecord> actions) {
        AtomicBoolean result = new AtomicBoolean(true);
        if(actions != null && !actions.isEmpty()) {
            actions.stream().filter(action -> action.getState() == TransactionState.INIT).forEach(action -> {
                boolean res = dtsTransactionActionProducer
                        .push(action.getActionName(), JSON.toJSONString(action.getContext()));
                if (res) {
//Open a local transaction PROPAGATION_REQUIRES_NEW
                    requiresNewTransactionTemplate.execute(new TransactionCallbackWithoutResult() {

                        @Override protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                            transactionActivityStore
                                    .updateTransactionActionState(action.getActionId(), TransactionState.PREPARE);
                        }
                    });
                } else {
                    log.warn("submit transaction-action failure topic:{},context:{}", action.getActionName(),
                            action.getContext());
                    result.compareAndSet(true,false);
                }
            });
        }
        return result.get();
    }

3 同步码(正确码):</p>

public void afterCompletion(int status) {

        boolean actionRes = doSubmitActions(actions);

//Put the following code in the doSubmitActions method in the requiresNewTransactionTemplate execution
        if(actionRes && activity.getState() == TransactionState.INIT) {
            transactionActivityStore.updateTransactionActivityState(activity.getTxId(),TransactionState.PREPARE);
        }

    }
4

1 回答 1

0

如果不使用 PROPAGATION_REQUIRES_NEW 之前的连接将无法释放,只是被关闭,导致问题

/**
     * Invoked after transaction commit. Can perform further operations right
     * <i>after</i> the main transaction has <i>successfully</i> committed.
     * <p>Can e.g. commit further operations that are supposed to follow on a successful
     * commit of the main transaction, like confirmation messages or emails.
     * <p><b>NOTE:</b> The transaction will have been committed already, but the
     * transactional resources might still be active and accessible. As a consequence,
     * any data access code triggered at this point will still "participate" in the
     * original transaction, allowing to perform some cleanup (with no commit following
     * anymore!), unless it explicitly declares that it needs to run in a separate
     * transaction. Hence: <b>Use {@code PROPAGATION_REQUIRES_NEW} for any
     * transactional operation that is called from here.</b>
     * @throws RuntimeException in case of errors; will be <b>propagated to the caller</b>
     * (note: do not throw TransactionException subclasses here!)
     */
    void afterCommit();
于 2016-10-09T01:07:33.043 回答