我很确定空闲连接没有被重新使用,或者我正在泄漏连接。我有一条从文件使用者开始的简单路线。文件使用者使用文本文件。拿起文件后,我检查了一张表格以确保这不是重复文件。
然后我将消息正文从文件转换为字符串。然后,我将文件拆分并根据记录的类型通过一条路线运行各个部分。这些路由中的每一个最终都会将此记录插入到运行在 MySQL 上的服务器上的临时表中。
以下是路线的简化版本。
<bean id="myPool" class="java.util.concurrent.Executors" factory-method="newFixedThreadPool">
<argument index="0" value="8"/>
</bean>
<camelContext trace="false" handleFault="true" errorHandlerRef="redeliveryErrorHandler" id="FileETLProcess" xmlns="http://camel.apache.org/schema/blueprint">
<errorHandler type="DefaultErrorHandler" useOriginalMessage="true" id="redeliveryErrorHandler">
<redeliveryPolicy maximumRedeliveries="3" redeliveryDelay="25" backOffMultiplier="2" useExponentialBackOff="false" retryAttemptedLogLevel="INFO"/>
</errorHandler>
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<log message="STARTING ERROR GENERAL HANDLER"/>
<bean ref="GeneralError"/>
<to uri="smtp://mailsrv?to=x.x@yadda.com&from=Error@Camel.com.au&subject=GENERAL ERROR: A File Could Not Be Imported&contentType=text/html"/>
<to uri="file:d:/Inbox/.badFile?fileName=${file:onlyname.noext}_GENERALERROR_${date:now:yyyyMMddHHmmss}.${file:name.ext}"/>
</onException>
<route id="ExtractFileRoute">
<from uri="file:d:/Inbox?delay=10000&move=.donebackup/${date:now:yyyyMMdd}/${file:onlyname.noext}_DONE_${date:now:yyyyMMddHHmmss}.${file:name.ext}&readLock=changed&include=.*.dl&maxMessagesPerPoll=0&sortBy=${file:length}"/>
<bean ref="FileCheck"/>
<choice>
<when>
<simple>${header.ACCEPTEDFILE} == 'YES'</simple>
<log message="File Import Route Started At:${date:now:yyyy-MM-dd HH:mm:ss}"/>
<convertBodyTo type="java.lang.String"/>
<log message="Converted File To String:${date:now:yyyy-MM-dd HH:mm:ss} handing data to File To DB route."/>
<split parallelProcessing="true" executorServiceRef="myPool" streaming="true" shareUnitOfWork="true">
<tokenize token="\n"></tokenize>
<setHeader headerName="SPLITFINISHED">
<simple>${property.CamelSplitComplete}</simple>
</setHeader>
<setHeader headerName="SPLITNUMBER">
<simple>${property.CamelSplitIndex}</simple>
</setHeader>
<bean ref="EnrichHeader"/>
<choice>
<when>
<simple>${header.RECORDTYPE} == 'HEADER'</simple>
<doTry>
<unmarshal ref="bindyHeader"/>
<bean ref="HeaderPersist"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
<doCatch>
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<bean ref="RecordErrorReport"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
</doCatch>
</doTry>
</when>
<when>
<simple>${header.RECORDTYPE} == 'A'</simple>
<doTry>
<unmarshal ref="bindyAccount"/>
<bean ref="AccountPersist"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
<doCatch>
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<bean ref="RecordErrorReport"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
</doCatch>
</doTry>
</when>
<when>
<simple>${header.RECORDTYPE} == 'C'</simple>
<doTry>
<unmarshal ref="bindyComaker"/>
<bean ref="CoMakerPersist"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
<doCatch>
<exception>java.lang.Exception</exception>
<handled>
<constant>true</constant>
</handled>
<bean ref="RecordErrorReport"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
</doCatch>
</doTry>
</when>
Some other beans here........
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
<otherwise>
<to uri="smtp://ims-mail?to=ops@ipanic&from=Error@yadda.com&subject=URGENT:UNKOWN RECORD TYPE FOUND IN FILE"/>
<choice>
<when>
<simple>${property.CamelSplitComplete} == true</simple>
<to uri="direct:auxrecordsmove"/>
</when>
</choice>
</otherwise>
</choice>
</split>
</when>
<otherwise>
<to uri="file:d:/RMSInbox/.badFile?fileName=${file:onlyname.noext}_POSSIBLE_DUPLICATE_ERROR_${date:now:yyyyMMddHHmmss}.${file:name.ext}"/>
<bean ref="FileErrorReport"/>
<to uri="smtp://ims-mail?to=ops@panic.com&from=Error@yadda.com&subject=ERROR: A File Could Not Be Imported&contentType=text/html"/>
</otherwise>
</choice>
</route>
因此,这条路由上的每条消息最终都会命中一个 bean,该 bean 会将其插入数据库。所以我将 DBCP 添加到依赖项中,然后在我的 osgi xml 蓝图中声明它,如下所示:
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://Canttouchthis:3306/ETLDB"/>
<property name="username" value="ETLUser"/>
<property name="password" value="password"/>
<property name="initialSize" value="2"/>
<property name="maxActive" value="16"/>
<property name="maxIdle" value="16"/>
<property name="minIdle" value="2"/>
<property name="timeBetweenEvictionRunsMillis" value="180000"/>
<property name="minEvictableIdleTimeMillis" value="180000"/>
<property name="testOnBorrow" value="true"/>
<property name="testWhileIdle" value="true"/>
<property name="testOnReturn" value="true"/>
<property name="validationQuery" value="SELECT 1"/>
<property name="maxWait" value="10000"/>
<property name="removeAbandoned" value="true"/>
<property name="logAbandoned" value="false"/>
<property name="removeAbandonedTimeout" value="300"/>
</bean>
我还声明我的 bean 将执行如下处理:
<bean id="AccountPersist" class="com.foo.NewAccount.AccountInformationToDatabase">
<property name="dataSource" ref="dataSource"/>
</bean>
现在,当文件拆分完成后,我想确保记录匹配。基本上该文件有帐户记录和一些辅助信息。所以我在拆分完成时检查路由,然后一旦文件完全在临时表中,我会在 MySQL 中运行一些额外的健全性检查。
第二条路线看起来像这样:
<route trace="false" id="MoveMatchingAuxRecordsFromStage">
<from uri="direct:auxrecordsmove"/>
<log message="File Import Route Ended At:${date:now:yyyy-MM-dd HH:mm:ss}"/>
<log message="ETL Route Start AT: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<log message="Moving Matching Comaker records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<bean ref="CoMakerETL"/>
<log message="Matching Comaker records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<log message="Moving Matching Credit History records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<bean ref="CreditHistoryETL"/>
<log message="Matching Credit History records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<log message="Moving Matching Extra Detail records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<bean ref="ExtraDetailInformationETL"/>
<log message="Matching Extra Detail records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<log message="Moving Legal Information records at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<bean ref="LegalInformationETL"/>
<log message="Matching Legal Information records move finised at: ${date:now:yyyy-MM-dd HH:mm:ss}"/>
<log message="ETL Route Finished At ${date:now:yyyy-MM-dd HH:mm:ss}"/>
</route>
所以在我的测试中,一切都很顺利,我可以 像这样有效地导入文件。当我在文件夹中放置超过 5 个文件时,我的问题就开始了。基本上我观察 MySQL 将连接池增加到最大大小,然后不重新使用连接。
所以我们打了 16 个并发连接,他们在加载了几个文件后进入睡眠状态,然后在 4、5、6 文件的某个地方突然出现以下错误:
Cannot get a connection, pool error Timeout waiting for idle object
或者它出现在日志中
[ pool-3-thread-35] oveMatchingAuxRecordsFromStage INFO Matching Extra Detail records move finised at: 2013-07-26 17:41:59
[ pool-3-thread-35] oveMatchingAuxRecordsFromStage INFO Moving Legal Information records at: 2013-07-26 17:41:59
[ pool-3-thread-35] DefaultErrorHandler INFO Failed delivery for (MessageId: ID-IMS-WS2013-001-52799-1374824474993-0-2693 on ExchangeId: ID-IMS-WS2013- 001-52799-1374824474993-0-3230). On delivery attempt: 0 caught: java.lang.Exception: Cannot get a connection, pool error Timeout waiting for idle object
[thread #0 - file://d:/RMSInbox] ExtractRMSNewAccountFileRoute INFO STARTING ERROR GENERAL HANDLER
MySQL 的最大连接数已达到 512。我尝试了各种池大小、线程选项等。
有趣的是,我所有的 JDBC 代码都遵循这种结构。它并不复杂的 SQL 只是插入语句....
public class RecordParserErrorReporter {
private static final String SQL_INSERT="INSERT INTO `ETL`.`ETLLog` "+
" ( "+
" `ETL_log_text`, "+
" `filename`, "+
" `record_number`, "+
" `error_message`) "+
" VALUES "+
" ( "+
" ?, "+
" ?, "+
" ?, "+
" ? "+
" ); ";
private BasicDataSource dataSource;
public BasicDataSource getDataSource() {
return dataSource;
}
public void setDataSource(BasicDataSource dataSource) {
this.dataSource = dataSource;
}
public void HandleError
(
@Body String msgBody
, @Headers Map hdr
, Exchange exch
)
{
Connection conn = null;
PreparedStatement stmt=null;
try
{
Exception e = exch.getProperty(Exchange.EXCEPTION_CAUGHT,Exception.class);
conn= dataSource.getConnection();
stmt =conn.prepareStatement(SQL_INSERT);
stmt.setString(1, msgBody);
stmt.setString(2, (String)hdr.get("CamelFileName"));
stmt.setInt(3, (Integer)hdr.get("SPLITNUMBER"));
stmt.setString(4, e.getMessage());
stmt.executeUpdate();
}
catch (Exception e)
{
System.out.println(e.getMessage());
}
finally
{
try
{
if (stmt!=null)
{
stmt.close();
}
if (conn!=null)
{
conn.close();
conn= null;
}
}
catch(SQLException e)
{
System.out.println(e.getMessage());
}
}
}
}
我如何追查为什么我的连接没有被重用?或者,如果我正在泄漏连接,我该如何追踪?
更新:
我将文件使用者设置为在每次轮询时使用 1 个文件,轮询之间有第二个延迟。我可以看到它在每次路由启动时创建一个新的连接池,然后不重用以前创建的池。似乎对于我排队的每个文件,都会创建一个新的连接池。不完全是我想要的。这个对吗。
我已经包含了如下所示的屏幕截图:
更新 2:
这不是在 spring DM 下运行,而是在 OSGI 蓝图下运行。我非常确信数据源不止一次被实例化。
更新 3:
好吧,所以数据源不会多次实例化。空闲连接根本不被使用。我确实发现了一些有趣的东西,所以我怀疑这可能与我在此处的链接中看到的更多信息有关: //fusesource.com/forums/thread.jspa?threadID=4659 &tstart=15
从这个问题的观点来看,我几乎被困在那里。