0

批处理有 4 个步骤 1. 执行一些基本任务 2. 从输入表中提取记录 -> 处理 -> 输出表 3. 验证错误计数,检查输入和输出表中记录的计数。4.从表中拉取记录,更新状态为业务表。

我有一个问题,即输出表中的记录数正确,但是当第 4 步完整记录的记录较少时。当我使用 Oracle 时,我觉得它是由于在提取记录时未完成提交。

请建议是否有人面临类似问题。

配置如下:

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:util="http://www.springframework.org/schema/util" xmlns:orcl="http://www.springframework.org/schema/data/orcl"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans    
                       http://www.springframework.org/schema/beans/spring-beans.xsd 
                       http://www.springframework.org/schema/context  
                       http://www.springframework.org/schema/context/spring-context-2.5.xsd 
                       http://www.springframework.org/schema/tx       
                       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
                       http://www.springframework.org/schema/aop 
                       http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
                       http://www.springframework.org/schema/util 
                       http://www.springframework.org/schema/util/spring-util.xsd
                       http://www.springframework.org/schema/data/orcl
                       http://www.springframework.org/schema/data/orcl/spring-data-orcl-1.0.xsd
                       http://www.springframework.org/schema/batch
                        http://www.springframework.org/schema/batch/spring-batch-3.0.xsd ">

<context:component-scan base-package="com.test." />

<import resource="classpath:context-datasource.xml" />

<bean id="parameterSetter" class="com.test.helper.QueryParamSetter"
    scope="step">
    <constructor-arg value="#{jobParameters['fileName']}" />
</bean>

<bean id="pagingItemReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause" value="SELECT *" />
            <property name="fromClause" value="from INPUT_TABLE" />
            <property name="whereClause"
                value="FILE_NAME= :fileName AND rownum &lt; 3001" /> 
            <property name="sortKey" value="INPIUT_STG_ID" />

        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="fileName" value="#{jobParameters['fileName']}" />
        </map>
    </property>
    <property name="pageSize" value="100" />
    <property name="rowMapper">
        <bean class="com.test.InputVOMapper" />
    </property>
</bean>

<bean id="postItemReader"
    class="org.springframework.batch.item.database.JdbcPagingItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="queryProvider">
        <bean
            class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="selectClause" value="select *" />
            <property name="fromClause" value="from OUTPUT_TABLE" />
            <property name="whereClause"
                value="FILE_NAME= :fileName AND TXN_ID IS NOT NULL AND FILE_DATA IS NOT NULL" />
            <property name="sortKey" value="CREATE_DT" />

        </bean>
    </property>
    <property name="parameterValues">
        <map>
            <entry key="fileName" value="#{jobParameters['fileName']}" />
        </map>
    </property>
    <property name="pageSize" value="100" />
    <property name="rowMapper">
        <bean class="com.test.mapper.OutputVOMapper" />
    </property>
</bean>

<!-- JobExecutionListener to perform business logic before and after the 
    job -->
<bean id="jobListener" class="com.test.SettlementJobListener">

    <property name="templateDir" value="templates" />
</bean>

<bean id="processEventHelper" class="com.wdpr.payment.helper.ProcessEventHelper" />

<!-- ItemWriter which writes data to database -->

<bean id="errorWriter"
    class="org.springframework.batch.item.database.JdbcBatchItemWriter">
    <property name="dataSource" ref="dataSource" />
    <property name="sql"
        value="UPDATE INPUT_TABLE SET ERR_CD=:batchErrorCode, ERR_DS=:batchErrorDesc WHERE PROC_ID=:rrn" />
    <property name="itemSqlParameterSourceProvider">
        <bean
            class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
</bean>
<bean id="recordWriter"
    class="org.springframework.batch.item.database.JdbcBatchItemWriter">
    <property name="dataSource" ref="dataSource" />
    <property name="sql"
        value="INSERT INTO OUTPUT_TABLE (CARD_SETTL_ID,TXN_ID,CREATE_DT,CREATE_PROC_ID,FILE_DATA,CLIENT_ID,FILE_NAME) values (:a,:b,:c,:d,:e,:f,:g)" />
    <property name="itemSqlParameterSourceProvider">
        <bean
            class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
    </property>
</bean>

<bean id="databaseItemWriter" class="com.test.SettlementDataWriter"
    scope="step">
    <property name="errorWriter" ref="errorWriter" />
    <property name="recordWriter" ref="recordWriter" />
</bean>

<bean id="itemProcessor" class="com.test.SettlementDataProcessor"
    scope="step" />

<bean id="preProcessor" class="com.test.PreProcessor" />
<!-- Step will need a transaction manager -->
<bean id="transactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" /> 

<bean id="statusItemWriter" class="com.test.StatusItemWriter" scope="step">
    <property name="processorHelper" ref="processorHelper" />
</bean>

<bean id="errorCheck" class="com.test.ErrorCheckTasklet"
    scope="step">
    <property name="jdbcTemplate" ref="jdbcTemplate" />
    <property name="sql"
        value="SELECT count(*) from INPUT_TABLE where FILE_NAME= ? AND ERR_CD IS NOT NULL" />
    <property name="footerCount"
        value="SELECT count(*) from OUTPUT_TABLE where FILE_NAME= ? AND FILE_DATA IS NOT NULL" />
    <property name="countCheck"
        value="SELECT count(*) from OUTPUT_TABLE where FILE_NAME= ? AND TXN_ID IS NOT NULL AND FILE_DATA IS NOT NULL" />    
    <!-- <property name="preparedStatementSetter" ref="parameterSetter" /> -->
    <property name="recordWriter" ref="recordWriter" />
</bean>

<bean id="preProcessReader" class="com.test.PreProcessReader"
    scope="step">
    <property name="jdbcTemplate" ref="jdbcTemplate" />
    <property name="sql1"
        value="select count(PROC_ID) from INPUT_TABLE where FILE_NAME=? AND rownum &lt; 101" />
    <property name="sql2"
        value="select count(distinct(PROC_ID)) from INPUT_TABLE where FILE_NAME=? AND rownum &lt; 101" />
    <property name="preparedStatementSetter" ref="parameterSetter" />
</bean>

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" >
    <property name="concurrencyLimit" value="100" />
</bean>

<!-- Actual Job -->
<batch:job id="settelmentJob">

    <batch:step id="step1" next="step2">
        <batch:tasklet transaction-manager="transactionManager">
            <batch:chunk reader="preProcessReader" processor="preProcessor"
                writer="recordWriter" commit-interval="1" />
        </batch:tasklet>
    </batch:step>

    <batch:step id="step2" next="step3" >
        <batch:tasklet transaction-manager="transactionManager"
            task-executor="taskExecutor" throttle-limit="100"  ><!-- throttle-limit="100" -->
            <batch:chunk reader="pagingItemReader" writer="databaseItemWriter"
                processor="itemProcessor"  commit-interval="100"  /> <!-- commit-interval="100"  -->
        </batch:tasklet>
    </batch:step>

    <batch:step id="step3" >
        <batch:tasklet transaction-manager="transactionManager" 
            ref="errorCheck" >
        </batch:tasklet>
    </batch:step>

    <batch:step id="step4">
        <batch:tasklet transaction-manager="transactionManager"
            task-executor="taskExecutor" throttle-limit="100" >
            <batch:chunk reader="postItemReader" writer="statusItemWriter"
                commit-interval="100" />

        </batch:tasklet>

        </batch:step>

    <batch:listeners>
        <batch:listener ref="jobListener" />
    </batch:listeners>
</batch:job>

<!-- JobRepository and JobLauncher are configuration/setup classes -->

<bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
</bean>

<bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
</bean>

数据源

bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${driverClassName}" />
<property name="url" value="${url}" />
<property name="username" value="${username}" />
<property name="password" value="${password}" />
<property name="initialSize" value="30" />
<property name="maxIdle" value="10" />
<property name="validationQuery" value="select 1 from dual" />
<property name="maxTotal" value="50"></property>
<property name="testWhileIdle" value="true"/>
<property name="timeBetweenEvictionRunsMillis" value="10000"/>
<property name="removeAbandonedOnBorrow" value="true"/>
4

2 回答 2

0

我相信问题在于您使用的是ResourcelessTransactionManager. 这有两个问题:

  1. 这意味着您并没有真正使用事务(因为它不适用于任何资源)。
  2. 这意味着 Spring Batch 在继续之前不会等待提交完成,因为没有调用真正的提交。

您已经在使用基于 Map 的 JobRepository。您应该为正在使用 Oracle 的 DataSource 使用 TransactionManager。这将解决您的问题。

于 2015-06-25T14:40:22.897 回答
0

我发现问题不在于提交,而是 jdbcpagination 配置返回的结果与查询不同。我创建了以下问题。

Spring批处理中的JdbcPagingItemReader没有给出正确的结果

于 2015-06-25T19:43:09.967 回答