2

我有一个 spring 批处理应用程序,它从 csv 文件中读取数据,传递所有行并处理它,传递所有已处理的行并将其写入数据库。非常经典。现在我的问题是 csv 文件太大,我有一个java heap space,所以我想我可以通过每 x 行处理文件来优化它,比如说每 10000 线(每 10000 线释放内存,而不是加载所有行记忆)。

有没有办法告诉 spring-batch 以递归方式处理一个步骤?或者有没有其他方法可以解决我的问题?

任何建议将不胜感激。谢谢

4

1 回答 1

6

这是将以下csv文件处理成bean的示例

headerA,headerB,headerC
col1,col2,col3

第一行(标题)被忽略,其他列直接映射到“匹配”对象。(这只是为了简洁而这样做)。

这是使用 Spring Batch Out Of The Box 组件的作业配置;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

    <batch:job id="fileJob">
        <batch:step id="fileJob.step1">
            <batch:tasklet>
                <batch:chunk reader="fileReader" writer="databaseWriter" commit-interval="10000"/>
            </batch:tasklet>
        </batch:step>
        <batch:validator>
            <bean class="org.springframework.batch.core.job.DefaultJobParametersValidator">
                <property name="requiredKeys" value="fileName"/>
            </bean>
        </batch:validator>
    </batch:job>

    <bean id="fileReader"
        class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
        <property name="lineMapper" ref="lineMapper"/>
        <property name="resource" value="file:#{jobParameters['fileName']}"/>
        <property name="linesToSkip" value="1"/>
    </bean>

    <bean id="lineMapper"
        class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
        <property name="fieldSetMapper" ref="fieldSetMapper"/>
        <property name="lineTokenizer" ref="lineTokenizer"/>
    </bean>


    <bean id="lineTokenizer"
        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
        <property name="delimiter" value=","/>
        <property name="names" value="col1,col2,col3"/>
    </bean>

    <bean id="fieldSetMapper"
        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
        <property name="targetType" value="de.incompleteco.spring.batch.domain.SimpleEntity"/>
    </bean>

    <bean id="databaseWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter">
        <property name="dataSource" ref="dataSource"/>
        <property name="itemSqlParameterSourceProvider">
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
        </property>
        <property name="sql" value="insert into simple_entity (col1,col2,col3) values (:col1,:col2,:col3)"/>
    </bean>
</beans>

有几个注意事项;

  1. 这个工作需要一个参数'fileName'来告诉fileReader在哪里可以找到文件。
  2. 有一个 jobParametersValidator 设置以确保参数在那里

这是批处理资源配置;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <batch:job-repository id="jobRepository"/>

    <bean id="jobExplorer"
        class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
        <property name="dataSource" ref="dataSource"/>
    </bean>
    <bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository"/>
        <property name="taskExecutor" ref="taskExecutor"/>
    </bean>

    <beans profile="junit">
        <jdbc:embedded-database id="dataSource" type="H2">
            <jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/>
            <jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
        </jdbc:embedded-database>

        <task:executor id="taskExecutor"/>

        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>
    </beans>
</beans>

这也是它的单元测试

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileOutputStream;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
@ActiveProfiles("junit")
public class FileJobIntegrationTest {

    @Autowired
    private Job job;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private DataSource dataSource;

    private int recordCount = 1000000;

    private String fileName = System.getProperty("java.io.tmpdir") + File.separator + "test.csv";

    @Before
    public void before() throws Exception {
        if (new File(fileName).exists()) {
            new File(fileName).delete();
        }//end if
    }

    @Test
    public void test() throws Exception {
        //create a file
        FileOutputStream fos = new FileOutputStream(fileName);
        fos.write("col1,col2,col3".getBytes());
        fos.flush();
        for (int i=0;i<=recordCount;i++) {
            fos.write(new String(i + "," + (i+1) + "," + (i+2) + "\n").getBytes());
            fos.flush();//flush it
        }//end for
        fos.close();
        //lets get the size of the file
        long length = new File(fileName).length();
        System.out.println("file size: " + ((length / 1024) / 1024));
        //execute the job
        JobParameters jobParameters = new JobParametersBuilder().addString("fileName",fileName).toJobParameters();
        JobExecution execution = jobLauncher.run(job,jobParameters);
        //monitor
        while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
            Thread.sleep(1000);
        }//end while
        //load again
        execution = jobExplorer.getJobExecution(execution.getId());
        //test
        assertEquals(ExitStatus.COMPLETED.getExitCode(),execution.getExitStatus().getExitCode());
        //lets see what's in the database
        int count = new JdbcTemplate(dataSource).queryForObject("select count(*) from simple_entity", Integer.class);
        //test
        assertTrue(count == recordCount);
    }

}
于 2013-05-30T01:31:00.470 回答