我使用多个同步线程来处理多个事务,这些事务通过使用休眠框架和使用锁定概念从数据库中获取。问题是,每个线程第一次正确处理,但下一次它抛出 SQL 01002 和 SQL 72000 异常。请查看代码-
long _StartTimeForMe = System.currentTimeMillis();
boolean _contextPushed;
txContext =
GlobalFunctions.generateId(
true,
DataBrokerConstants.CONTEXT_PREFIX,
null,
null);
txContext = this.getName() + txContext;
logger_U.pushContext(txContext);
_contextPushed = true;
try
{
coreSession = openTxCoreSession();
}
catch (Exception e1)
{
logger_U.info("exception while creating Tx_Core session");
e1.printStackTrace();
return;
}
try
{
stagSession = openStagSession();
}
catch (RuntimeException e2)
{
logger_U.info("exception while creating stag session");
e2.printStackTrace();
return;
}
String _txContextBackup = txContext;
int i = 1;
long _timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe;
try
{
while (doIhaveMoreTime(_timeElapsedByMe))
{
txContext = _txContextBackup + "-" + (i);
logger_U.removeContext();
logger_U.pushContext(txContext);
coreTransaction = null;
stagTransaction = null;
InnerClassUpdate _rec = null;
UpdateRecord _r = null;
if (null != coreSession)
{
coreTransaction = coreSession.beginTransaction();
_rec = getURForUpdates(coreSession);
}
else
{
logger_U.error("coreSession is not created.");
break;
}
if (null != _rec)
{
_r = _rec.record;
logger_U.info(
"record found. Going to process Record: ID = "
+ _r.getId()
+ " TX_IDENTIFIER = "
+ _r.getTxIdentifier()
+ "UPDATE_TYPE = "
+ _r.getUpdateType());
//If a record of same transaction is processing or has been processed before in same run, it will not run again in this run
if (isTxProcessing(_r.getTxIdentifier()))
{
if (coreTransaction != null
&& !coreTransaction.wasCommitted()
&& !coreTransaction.wasRolledBack())
coreTransaction.rollback();
if (stagTransaction != null
&& !stagTransaction.wasCommitted()
&& !stagTransaction.wasRolledBack())
stagTransaction.rollback();
logger_U.debug(
"A record of Tx_Identifier : "
+ _r.getTxIdentifier()
+ " is either processing or has been processed in this run.");
}
else
{
try
{
stagTransaction = stagSession.beginTransaction();
if (processUpdates(_rec))
{
try
{
if (stagTransaction != null
&& !stagTransaction.wasCommitted()
&& !stagTransaction.wasRolledBack())
{
stagTransaction.commit();
logger_U.debug("stag commit success");
}
if (coreTransaction != null
&& !coreTransaction.wasCommitted()
&& !coreTransaction.wasRolledBack())
{
coreTransaction.commit();
logger_U.debug("core commit success");
}
}
catch (HibernateException e)
{
logger_U.debug("error while commit");
e.printStackTrace();
}
}
else
{
if (stagTransaction != null
&& !stagTransaction.wasCommitted()
&& !stagTransaction.wasRolledBack())
stagTransaction.rollback();
logger_U.debug("stagTransaction rollback...");
if (coreTransaction != null
&& !coreTransaction.wasCommitted()
&& !coreTransaction.wasRolledBack())
coreTransaction.rollback();
logger_U.debug("coreTransaction rolback...");
}
}
catch (Exception e1)
{
if (stagTransaction != null
&& !stagTransaction.wasCommitted()
&& !stagTransaction.wasRolledBack())
stagTransaction.rollback();
if (coreTransaction != null
&& !coreTransaction.wasCommitted()
&& !coreTransaction.wasRolledBack())
coreTransaction.rollback();
logger_U.error(
"Exception while processing UpdateRecord.");
e1.printStackTrace();
}
}
}
else
{
if (stagTransaction != null
&& !stagTransaction.wasCommitted()
&& !stagTransaction.wasRolledBack())
stagTransaction.rollback();
if (coreTransaction != null
&& !coreTransaction.wasCommitted()
&& !coreTransaction.wasRolledBack())
coreTransaction.rollback();
logger_U.info(
"No record found. wait for "
+ convertMilliesToSec(
DataBrokerConstants.SMALL_SLEEP)
+ "Sec.");
Thread.sleep(DataBrokerConstants.SMALL_SLEEP);
i++;
_timeElapsedByMe =
System.currentTimeMillis() - _StartTimeForMe;
continue;
}
//No matter record is processed successfully or not, it will not be processed again in this run
if (_rec != null && vectForUpdate.contains(_rec))
{
logger_U.debug(
"UpdateRecord of ID "
+ _r.getId()
+ " and TX_IDENTIFIER "
+ _r.getTxIdentifier()
+ " is removing from vectForUpdate.");
removeObjectFromVect(_rec);
}
Thread.sleep(1000);
_timeElapsedByMe = System.currentTimeMillis() - _StartTimeForMe;
i++;
} //END while loop
}
catch (Exception e)
{
try
{
if (stagTransaction != null
&& !stagTransaction.wasCommitted()
&& !stagTransaction.wasRolledBack())
stagTransaction.rollback();
if (coreTransaction != null
&& !coreTransaction.wasCommitted()
&& !coreTransaction.wasRolledBack())
coreTransaction.rollback();
}
catch (HibernateException e3)
{
// TODO Auto-generated catch block
e3.printStackTrace();
}
logger_U.error("Unknown Exception occured while processing.");
e.printStackTrace();
}
finally
{
txContext = _txContextBackup;
logger_U.removeContext();
logger_U.pushContext(txContext);
closeSession(coreSession);
closeSession(stagSession);
}