1

我使用多个同步线程来处理多个事务,这些事务通过使用休眠框架和使用锁定概念从数据库中获取。问题是,每个线程第一次正确处理,但下一次它抛出 SQL 01002 和 SQL 72000 异常。请查看代码-

    long _StartTimeForMe = System.currentTimeMillis();

    boolean _contextPushed;
    txContext =
        GlobalFunctions.generateId(
            true,
            DataBrokerConstants.CONTEXT_PREFIX,
            null,
            null);
    txContext = this.getName() + txContext;
    logger_U.pushContext(txContext);
    _contextPushed = true;

    try
    {
        coreSession = openTxCoreSession();
    }
    catch (Exception e1)
    {
        logger_U.info("exception while creating Tx_Core session");
        e1.printStackTrace();
        return;
    }

    try
    {
        stagSession = openStagSession();
    }
    catch (RuntimeException e2)
    {
        logger_U.info("exception while creating stag session");
        e2.printStackTrace();
        return;
    }

    String _txContextBackup = txContext;

    int i = 1;
    long _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe;

    try
    {
        while (doIhaveMoreTime(_timeElapsedByMe))
        {
            txContext = _txContextBackup + "-" + (i);
            logger_U.removeContext();
            logger_U.pushContext(txContext);

            coreTransaction = null;
            stagTransaction = null;

            InnerClassUpdate _rec = null;
            UpdateRecord _r = null;
            if (null != coreSession)
            {
                coreTransaction = coreSession.beginTransaction();
                _rec = getURForUpdates(coreSession);
            }
            else
            {
                logger_U.error("coreSession is not created.");
                break;
            }

            if (null != _rec)
            {
                _r = _rec.record;
                logger_U.info(
                    "record found. Going to process Record: ID = "
                        + _r.getId()
                        + " TX_IDENTIFIER = "
                        + _r.getTxIdentifier()
                        + "UPDATE_TYPE = "
                        + _r.getUpdateType());

                //If a record of same transaction is processing or has been processed before in same run, it will not run again in this run
                if (isTxProcessing(_r.getTxIdentifier()))
                {
                    if (coreTransaction != null
                        && !coreTransaction.wasCommitted()
                        && !coreTransaction.wasRolledBack())
                        coreTransaction.rollback();

                    if (stagTransaction != null
                        && !stagTransaction.wasCommitted()
                        && !stagTransaction.wasRolledBack())
                        stagTransaction.rollback();

                    logger_U.debug(
                        "A record of Tx_Identifier : "
                            + _r.getTxIdentifier()
                            + " is either processing or has been processed in this run.");
                }
                else
                {
                    try
                    {
                        stagTransaction = stagSession.beginTransaction();
                        if (processUpdates(_rec))
                        {
                            try
                            {
                                if (stagTransaction != null
                                    && !stagTransaction.wasCommitted()
                                    && !stagTransaction.wasRolledBack())
                                {
                                    stagTransaction.commit();

                                    logger_U.debug("stag commit success");
                                }
                                if (coreTransaction != null
                                    && !coreTransaction.wasCommitted()
                                    && !coreTransaction.wasRolledBack())
                                {
                                    coreTransaction.commit();

                                    logger_U.debug("core commit success");
                                }

                            }
                            catch (HibernateException e)
                            {
                                logger_U.debug("error while commit");
                                e.printStackTrace();
                            }

                        }
                        else
                        {
                            if (stagTransaction != null
                                && !stagTransaction.wasCommitted()
                                && !stagTransaction.wasRolledBack())
                                stagTransaction.rollback();

                            logger_U.debug("stagTransaction rollback...");
                            if (coreTransaction != null
                                && !coreTransaction.wasCommitted()
                                && !coreTransaction.wasRolledBack())
                                coreTransaction.rollback();

                            logger_U.debug("coreTransaction rolback...");
                        }
                    }
                    catch (Exception e1)
                    {
                        if (stagTransaction != null
                            && !stagTransaction.wasCommitted()
                            && !stagTransaction.wasRolledBack())
                            stagTransaction.rollback();

                        if (coreTransaction != null
                            && !coreTransaction.wasCommitted()
                            && !coreTransaction.wasRolledBack())
                            coreTransaction.rollback();

                        logger_U.error(
                            "Exception while processing UpdateRecord.");
                        e1.printStackTrace();
                    }
                }
            }
            else
            {
                if (stagTransaction != null
                    && !stagTransaction.wasCommitted()
                    && !stagTransaction.wasRolledBack())
                    stagTransaction.rollback();

                if (coreTransaction != null
                    && !coreTransaction.wasCommitted()
                    && !coreTransaction.wasRolledBack())
                    coreTransaction.rollback();

                logger_U.info(
                    "No record found. wait for "
                        + convertMilliesToSec(
                            DataBrokerConstants.SMALL_SLEEP)
                        + "Sec.");
                Thread.sleep(DataBrokerConstants.SMALL_SLEEP);
                i++;
                _timeElapsedByMe =
                    System.currentTimeMillis() - _StartTimeForMe;

                continue;
            }

            //No matter record is processed successfully or not, it will not be processed again in this run
            if (_rec != null && vectForUpdate.contains(_rec))
            {
                logger_U.debug(
                    "UpdateRecord of ID "
                        + _r.getId()
                        + " and TX_IDENTIFIER "
                        + _r.getTxIdentifier()
                        + " is removing from vectForUpdate.");
                removeObjectFromVect(_rec);
            }
            Thread.sleep(1000);
            _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe;
            i++;
        } //END while loop
    }
    catch (Exception e)
    {
        try
        {
            if (stagTransaction != null
                && !stagTransaction.wasCommitted()
                && !stagTransaction.wasRolledBack())
                stagTransaction.rollback();

            if (coreTransaction != null
                && !coreTransaction.wasCommitted()
                && !coreTransaction.wasRolledBack())
                coreTransaction.rollback();

        }
        catch (HibernateException e3)
        {
            // TODO Auto-generated catch block
            e3.printStackTrace();
        }
        logger_U.error("Unknown Exception occured while processing.");
        e.printStackTrace();
    }
    finally
    {
        txContext = _txContextBackup;
        logger_U.removeContext();
        logger_U.pushContext(txContext);
        closeSession(coreSession);
        closeSession(stagSession);
    }
4

1 回答 1

2

在使用 Hibernate 时,在多线程环境中应该非常小心。

  • 您应该验证的第一件事是事务已正确提交,并且 SQL 已执行。如果你不能通过阅读你的代码来验证它,你可以提高 Hibernate 的日志级别来验证它

  • Hibernate Session 不是线程安全的,您只能在线程之间共享 SessionFactory。您还必须验证每次打开会话时都要注意关闭它,因为这不会自动发生,并且会阻止新线程打开新会话。

于 2012-07-10T11:47:29.517 回答